Was verursacht einen zufälligen Absturz bei boost :: coroutine?

Ich habe eine Multithread-Anwendung, die @ verwendboost :: asio undboost :: coroutine über seine Integration inboost :: asio. Jeder Thread hat ein eigenes io_service Objekt. Der einzige gemeinsame Status zwischen Threads sind Verbindungspools, die mit @ gesperrt sin mutex, wenn die Verbindung vom / zum Verbindungspool abgerufen oder zurückgegeben wird. Wenn es nicht genug Verbindungen im Pool gibt, drücke ich unendlichasio :: steady_tiemer in der internen Struktur des Pools und asynchron darauf warten und ich Nachgeben aus der Couroutine-Funktion. Wenn ein anderer Thread die Verbindung zum Pool zurückgibt, prüft er, ob es Wartezeiten gibt. Er erhält Wartezeiten aus der internen Struktur. Er erhält sein io_service object und postet ein Lambda, das den Timer aufweckt, um die suspendierte Coroutine fortzusetzen. Ich habe zufällige Abstürze in der Anwendung. Ich versuche das Problem mit @ zu untersuch valgrind. Es wurden einige Probleme gefunden, aber ich kann sie nicht verstehen, da sie in @ auftreteboost :: coroutine undboost :: asio Interna. Hier sind Fragmente aus meinem Code und aus valgrind Ausgabe. Kann jemand das Problem sehen und erklären?

Hier ist die Vorwahl:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

Hier ist dasprepare Funktionsimplementierung, die in Lambda aufgerufen wird:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

DasreturnQuota -Funktion ist eine rein virtuelle Methode desAvlRequestData class und ihre Implementierung für dasTravelportRequestData Klasse, die in allen meinen Tests verwendet wird, ist die folgende:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

Hier sinddrücke undPo Methoden des Verbindungspools.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

Dies ist die Implementierung von coroutineAsyncWait.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

Und schließlich der erste Teil des valgrind Ausgabe

== 8189 == Thread 41:
== 8189 == Ungültiger Wert für Größe 8
== 8189 == um 0x995F84: void boost :: coroutines :: detail :: trampoline_push_void, void, boost :: asio :: detail :: coro_entry_point, void (anonymer Namespace) :: executeRequests>> (std :: vector < (anonymer Namespace) :: AvlRequestContext, std :: allocator <(anonymer Namespace) :: AvlRequestContext>> &) :: {lambda (boost :: asio :: basic_yield_context>) # 1}> &, boost :: coroutines :: basic_standard_stack_allocator>> (long) (trampoline_push.hpp: 65)
== 8189 == Adresse 0x2e3b5528 ist nicht gestapelt, malloc'd oder (kürzlich) frei'd

Wenn ich benutze valgrind mit angehängtem Debugger stoppt es in der folgenden Funktion in trampoline_push.hpp imboost :: coroutine Bibliothek

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }

Antworten auf die Frage(2)

Ihre Antwort auf die Frage