¿Qué causa un bloqueo aleatorio en boost :: coroutine?

Tengo una aplicación multiproceso que usaimpulso :: asio yimpulso :: corutina a través de su integración enimpulso :: asio. Cada hilo tiene su propioio_service objeto. El único estado compartido entre subprocesos son agrupaciones de conexiones que están bloqueadas conmutex cuando la conexión se obtiene o se devuelve desde / al grupo de conexiones. Cuando no hay suficientes conexiones en la piscina, presiono infinitoasio :: steady_tiemer en la estructura interna de la piscina y asincrónicamente esperando en ella y yoflexible desde la función de couroutine. Cuando otro subproceso devuelve la conexión al grupo, comprueba si hay temporizadores de espera, obtiene el temporizador de espera de la estructura interna, obtiene suio_service objeto y publica una lambda que despierta el temporizador para reanudar la corutina suspendida. Tengo bloqueos aleatorios en la aplicación. Intento investigar el problema convalgrind. Encontró algunos problemas, pero no puedo entenderlos porque suceden enimpulso :: corutina yimpulso :: asio internos. Aquí hay fragmentos de mi código y devalgrind salida. ¿Alguien puede ver y explicar el problema?

Aquí está el código de llamada:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

Aquí está elprepare implementación de la función que se llama en lambda generado:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

losreturnQuota la función es puro método virtual de laAvlRequestData clase y su implementación para elTravelportRequestData La clase que se usa en todas mis pruebas es la siguiente:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

Aquí estánempujar ypopular métodos del conjunto de conexiones.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

Esta es la implementación decoroutineAsyncWait.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

Y finalmente la primera parte de lavalgrind salida:

== 8189 == Tema 41:
== 8189 == Lectura inválida del tamaño 8
== 8189 == en 0x995F84: refuerzo vacío :: corutinas :: detalle :: trampoline_push_void, vacío, impulso :: asio :: detalle :: coro_entry_point, vacío (espacio de nombres anónimo) :: executeRequests>> (std :: vector <( espacio de nombres anónimo) :: AvlRequestContext, std :: allocator <(espacio de nombres anónimo) :: AvlRequestContext>> &) :: {lambda (boost :: asio :: basic_yield_context>) # 1}> &, boost :: coroutines :: basic_standard_stack_allocator >> (largo) (trampoline_push.hpp: 65)
== 8189 == La dirección 0x2e3b5528 no está apilada, mal colocada o (recientemente) libre

Cuando usovalgrind con el depurador adjunto se detiene en la siguiente función entrampoline_push.hpp enimpulso :: corutina biblioteca.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }

Respuestas a la pregunta(1)

Su respuesta a la pregunta