std :: condition_variable - ожидание нескольких потоков, чтобы уведомить наблюдателя

моя проблема выглядит так:

У меня есть наблюдатель, который содержит std :: condition_variable и std :: mutex, мои объекты рабочего потока имеют указатель на наблюдателя. Каждый раз, когда рабочий поток завершает свою работу, он вызывает m_Observer-> NotifyOne (), который затем вызывает функцию notify_one () для condition_variable. Теперь я хочу запустить несколько рабочих потоков, каждый из которых имеет свою работу и разные (независимые) данные, и подождать, пока все они подадут сигнал (используя m_Observer-> NotifyOne ()) наблюдателю, чтобы я мог продолжить работу на основе результатов всех рабочих потоков.

Мой наблюдатель выглядит так:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

где _uNumOfNotifications - количество рабочих потоков, которые я хочу ждать.

Теперь предполагается, что каждый рабочий поток запускает функцию симуляции, которая выполняет фактическую работу для одного временного шага / ненужного хранилища данных, а затем приостанавливает / ждет, пока наблюдатель не уведомит рабочего о продолжении.

Функция потока работника может выглядеть так:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver () просто вызывает m_Observer-> NotifyOne ().

Теперь проблема в том, что через некоторое время потоки где-то зашли в тупик, и наблюдатель не уведомит их о том, чтобы продолжить следующий шаг времени. Проблема, вероятно, где-то в функции WaitForNotifications (), но я не уверен. Atm У меня только один рабочий поток, поэтому uNumOfNotifications = 1, но он все еще сталкивается с проблемой, когда он ожидает m_ObserverCV.wait (Lock) и m_CV.wait (wait), я даже не уверен, действительно ли это тупик или что-то в этом роде с условной переменной, потому что я пытаюсь получить к нему доступ из нескольких потоков.

В этот момент я хотел бы процитировать отца Неда Фландерса: «Мы ничего не пробовали и у нас нет идей!»

Спасибо за вашу помощь. Всегда совет ценится.

Fabian

РЕДАКТИРОВАТЬ:

Спасибо за всю полезную информацию и предложения. В итоге я реализовал вторую идею Майкла, так как не нашел ничего о std :: барьерах. Итак, вот что я сделал:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

В моей "основной" функции: где MassSpringSystem и RigidBodySystem являются моими рабочими.

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

И в потоке функции работников, как и выше, но на этот раз SignalObserver вызывает NotifyBarrier ()

Теперь все отлично работает. Простое, но мощное решение, спасибо!

Ответы на вопрос(1)

Ваш ответ на вопрос