std :: condition_variable - ожидание нескольких потоков, чтобы уведомить наблюдателя
моя проблема выглядит так:
У меня есть наблюдатель, который содержит std :: condition_variable и std :: mutex, мои объекты рабочего потока имеют указатель на наблюдателя. Каждый раз, когда рабочий поток завершает свою работу, он вызывает m_Observer-> NotifyOne (), который затем вызывает функцию notify_one () для condition_variable. Теперь я хочу запустить несколько рабочих потоков, каждый из которых имеет свою работу и разные (независимые) данные, и подождать, пока все они подадут сигнал (используя m_Observer-> NotifyOne ()) наблюдателю, чтобы я мог продолжить работу на основе результатов всех рабочих потоков.
Мой наблюдатель выглядит так:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
public:
IAsyncObserver()
{
m_bNotified = false;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
{
uint32_t uNotifyCount = 0;
while (uNotifyCount < _uNumOfNotifications)
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = false;
m_ObserverCV.wait(Lock);
if (m_bNotified)
++uNotifyCount;
}
}
}; // IAsyncObserver
где _uNumOfNotifications - количество рабочих потоков, которые я хочу ждать.
Теперь предполагается, что каждый рабочий поток запускает функцию симуляции, которая выполняет фактическую работу для одного временного шага / ненужного хранилища данных, а затем приостанавливает / ждет, пока наблюдатель не уведомит рабочего о продолжении.
Функция потока работника может выглядеть так:
do{
//suspend simulation
while (m_PauseSimulation.load())
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
if (m_RunSimulation.load() == false)
{
SignalObserver();
return;
}
}
//lock the data while simulating
{
std::lock_guard<std::mutex> lock(m_LockMutex);
//update simulation
Simulate(static_cast<float>(m_fDeltaTime));
m_PauseSimulation.store(true);
}
//notify the physics manager that work is done here
SignalObserver();
} while (m_RunSimulation.load());
SignalObserver () просто вызывает m_Observer-> NotifyOne ().
Теперь проблема в том, что через некоторое время потоки где-то зашли в тупик, и наблюдатель не уведомит их о том, чтобы продолжить следующий шаг времени. Проблема, вероятно, где-то в функции WaitForNotifications (), но я не уверен. Atm У меня только один рабочий поток, поэтому uNumOfNotifications = 1, но он все еще сталкивается с проблемой, когда он ожидает m_ObserverCV.wait (Lock) и m_CV.wait (wait), я даже не уверен, действительно ли это тупик или что-то в этом роде с условной переменной, потому что я пытаюсь получить к нему доступ из нескольких потоков.
В этот момент я хотел бы процитировать отца Неда Фландерса: «Мы ничего не пробовали и у нас нет идей!»
Спасибо за вашу помощь. Всегда совет ценится.
Fabian
РЕДАКТИРОВАТЬ:
Спасибо за всю полезную информацию и предложения. В итоге я реализовал вторую идею Майкла, так как не нашел ничего о std :: барьерах. Итак, вот что я сделал:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
uint32_t m_uNumOfNotifications;
uint32_t m_uNotificationCount;
public:
IAsyncObserver()
{
m_bNotified = false;
m_uNumOfNotifications = 0;
m_uNotificationCount = 0;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void SetBarrier(uint32_t _uNumOfNotifications = 1)
{
m_uNumOfNotifications = _uNumOfNotifications;
}
void NotifyBarrier()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
if (++m_uNotificationCount >= m_uNumOfNotifications)
{
m_bNotified = true;
m_ObserverCV.notify_one();
}
}
void WaitForNotifications()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_bNotified == false)
{
m_ObserverCV.wait(Lock);
}
m_uNotificationCount = 0;
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
}; // IAsyncObserver
В моей "основной" функции: где MassSpringSystem и RigidBodySystem являются моими рабочими.
//update systems here:
{
SetBarrier(m_uTotalNotifyCount);
{ //start MassSpringSystems
std::lock_guard<std::mutex> lock(m_LockMutex);
for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
{
MSS->SetDeltaTime(fDeltaTime);
MSS->Continue();
}
}
//ATTENTION this system works directly on the m_OctreeEntities!
{ //start RigidBodySystems
m_RigidBodySystem.SetDeltaTime(fDeltaTime);
m_RigidBodySystem.AddData(m_RigidBodies);
m_RigidBodySystem.Continue();
}
//wait for all systems to finish -> till they call SignalObserver
WaitForNotifications();
}
И в потоке функции работников, как и выше, но на этот раз SignalObserver вызывает NotifyBarrier ()
Теперь все отлично работает. Простое, но мощное решение, спасибо!