std :: condition_variable - Espera varios hilos para notificar al observador

mi problema se ve así:

Tengo un observador que contiene std :: condition_variable y std :: mutex, mis objetos de subproceso de trabajo tienen un puntero al observador. Cada vez que un subproceso de trabajo finaliza su trabajo, llama a m_Observer-> NotifyOne () que luego llama a la función notify_one () de la variable_condición. Ahora lo que quiero hacer es iniciar un grupo de subprocesos de trabajo, cada uno con un trabajo diferente y datos diferentes (independientes) y esperar a que todos ellos señalen (usando m_Observer-> NotifyOne ()) el observador para que pueda continuar trabajando en función de los resultados de todos los hilos de trabajo.

Mi observador se ve así:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

donde _uNumOfNotifications es el número de subprocesos de trabajo que quiero esperar.

Ahora se supone que cada subproceso de trabajador ejecuta una función de simulación que hace el trabajo real durante un paso de tiempo / basura de datos y luego hace una pausa / espera hasta que el observador notifique al trabajador que continúe.

La función de hilo de un trabajador podría verse así:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver () solo llama a m_Observer-> NotifyOne ().

Ahora el problema es que después de algún tiempo los hilos se encuentran en algún punto muerto y el observador no les notifica que continúen con el siguiente paso. El problema probablemente esté en algún lugar de la función WaitForNotifications (), pero no estoy seguro. Atm Solo tengo un subproceso de trabajo, por lo que uNumOfNotifications = 1, pero aún se encuentra con el problema donde espera en m_ObserverCV.wait (Bloquear) y m_CV.wait (esperar), ni siquiera estoy seguro de si realmente es un punto muerto o algo así con condition_variable porque intento acceder desde varios subprocesos.

En este punto, me gustaría citar al padre de Ned Flanders: "¡No intentamos nada y nos hemos quedado sin ideas!"

Gracias por tu ayuda. Alguna propina es apreciada.

Fabian

EDITAR:

Gracias por toda la información útil y sugerencias. Terminé implementando la segunda idea de Michael ya que no encontré nada sobre std :: barreras. Así que aquí está lo que hice:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

En mi función "principal": donde MassSpringSystem y RigidBodySystem son mis cajeros automáticos

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

Y en la función de subproceso de los trabajadores al igual que arriba, pero esta vez SignalObserver llama a NotifyBarrier ()

Todo funciona bien ahora. Una solución simple pero poderosa, ¡Gracias!

Respuestas a la pregunta(1)

Su respuesta a la pregunta