std :: condition_variable - Aguarde vários threads para notificar o observador

meu problema fica assim:

Eu tenho um observador que contém um std :: condition_variable e um std :: mutex, meus objetos de thread de trabalho têm um ponteiro para o observador. Cada vez que um encadeamento de trabalhador termina seu trabalho, ele chama m_Observer-> NotifyOne (), que chama a função notify_one () da variável condition_variable. Agora, o que eu quero fazer é iniciar um grupo de threads de trabalho, cada um com um trabalho diferente e dados diferentes (independentes) e aguardar que todos eles sinalizem (usando m_Observer-> NotifyOne ()) o observador para que eu possa para continuar o trabalho com base nos resultados de todos os threads de trabalho.

Meu observador fica assim:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

onde _uNumOfNotifications é o número de threads de trabalho em que quero aguardar.

Agora, cada encadeamento de trabalhador deve executar uma função de simulação que faça o trabalho real de um intervalo de tempo / lixo de dados e depois pause / espere até que o observador notifique o trabalhador para continuar.

A função de thread de um trabalhador pode ser assim:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver () apenas chama m_Observer-> NotifyOne ().

Agora, o problema é que, após algum tempo, os encadeamentos chegam a um impasse em algum lugar e o observador não os notifica para continuar na próxima etapa. O problema provavelmente está em algum lugar na função WaitForNotifications (), mas não tenho certeza. Atm Eu só tenho um thread de trabalho, então uNumOfNotifications = 1, mas ele ainda corre para o problema em que espera em m_ObserverCV.wait (Lock) e m_CV.wait (espera), nem tenho certeza se é realmente um impasse ou algo assim com o condition_variable porque eu tento acessá-lo de vários segmentos.

Neste ponto, eu gostaria de citar o pai de Ned Flanders: "Não tentamos nada e estamos sem ideias!"

Obrigado pela ajuda. Sempre dica é apreciada.

Fabian

EDITAR:

Obrigado por todas as informações e sugestões úteis. Acabei implementando a segunda idéia de Michael, já que não encontrei nada sobre as barreiras std ::. Então aqui está o que eu fiz:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

Na minha função "principal": onde MassSpringSystem e RigidBodySystem são meus funcionários

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

E na função de thread dos trabalhadores, como acima, mas desta vez o SignalObserver chama NotifyBarrier ()

Tudo funciona bem agora. Uma solução simples, mas poderosa, obrigado!

questionAnswers(1)

yourAnswerToTheQuestion