std :: condition_variable - Warten Sie, bis mehrere Threads den Beobachter benachrichtigt haben

mein Problem sieht so aus:

Ich habe einen Beobachter, der eine std :: condition_variable und einen std :: mutex enthält. Meine Worker-Thread-Objekte haben einen Zeiger auf den Beobachter. Jedes Mal, wenn ein Worker-Thread seinen Job beendet hat, ruft er m_Observer-> NotifyOne () auf, der dann die Funktion notify_one () der Bedingungsvariablen aufruft. Nun möchte ich eine Reihe von Worker-Threads mit jeweils unterschiedlichen Jobs und unterschiedlichen (unabhängigen) Daten starten und darauf warten, dass alle (mit m_Observer-> NotifyOne ()) den Beobachter benachrichtigen, damit ich in der Lage bin um die Arbeit basierend auf den Ergebnissen aller Worker-Threads fortzusetzen.

ein Beobachter sieht so aus:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

where _uNumOfNotifications ist die Anzahl der Arbeitsthreads, auf die ich warten möchte.

Jetzt soll jeder Worker-Thread eine Simulationsfunktion ausführen, die die eigentliche Arbeit für einen Zeitschritt / Datenmüll erledigt und dann pausiert / wartet, bis der Beobachter den Worker benachrichtigt, um fortzufahren.

Die Thread-Funktion eines Workers könnte folgendermaßen aussehen:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver () ruft einfach m_Observer-> NotifyOne () auf.

Jetzt ist das Problem, dass die Threads nach einiger Zeit irgendwo auf einen Deadlock stoßen und der Beobachter sie nicht benachrichtigt, um mit dem nächsten Zeitschritt fortzufahren. Das Problem liegt wahrscheinlich irgendwo in der WaitForNotifications () - Funktion, aber ich bin mir nicht sicher. Atm Ich habe nur einen Worker-Thread, also uNumOfNotifications = 1, aber es tritt immer noch das Problem auf, dass bei m_ObserverCV.wait (Lock) und m_CV.wait (wait) gewartet wird. Ich bin mir nicht einmal sicher, ob es wirklich ein Deadlock oder so ist mit der condition_variable, weil ich versuche, von mehreren Threads darauf zuzugreifen.

n dieser Stelle möchte ich Ned Flanders Vater zitieren: "Wir haben nichts ausprobiert und sind alle ideenlos!&quo

Danke für Ihre Hilfe. Jeder Tipp wird geschätzt.

Fabian

BEARBEITEN

ielen Dank für alle hilfreichen Informationen und Vorschläge. Am Ende habe ich die zweite Idee von Michael umgesetzt, da ich nichts über std :: barriers gefunden habe. Also hier ist was ich getan habe:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

In meiner "Haupt" -Funktion: wo MassSpringSystem und RigidBodySystem meine Arbeiter sind atm

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

Und in der Thread-Funktion der Worker wie oben, aber dieses Mal ruft SignalObserver NotifyBarrier () auf

Alles funktioniert jetzt gut. Eine einfache und dennoch leistungsstarke Lösung. Danke!