std :: condition_variable - Aguarde vários threads para notificar o observador
meu problema fica assim:
Eu tenho um observador que contém um std :: condition_variable e um std :: mutex, meus objetos de thread de trabalho têm um ponteiro para o observador. Cada vez que um encadeamento de trabalhador termina seu trabalho, ele chama m_Observer-> NotifyOne (), que chama a função notify_one () da variável condition_variable. Agora, o que eu quero fazer é iniciar um grupo de threads de trabalho, cada um com um trabalho diferente e dados diferentes (independentes) e aguardar que todos eles sinalizem (usando m_Observer-> NotifyOne ()) o observador para que eu possa para continuar o trabalho com base nos resultados de todos os threads de trabalho.
Meu observador fica assim:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
public:
IAsyncObserver()
{
m_bNotified = false;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
{
uint32_t uNotifyCount = 0;
while (uNotifyCount < _uNumOfNotifications)
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = false;
m_ObserverCV.wait(Lock);
if (m_bNotified)
++uNotifyCount;
}
}
}; // IAsyncObserver
onde _uNumOfNotifications é o número de threads de trabalho em que quero aguardar.
Agora, cada encadeamento de trabalhador deve executar uma função de simulação que faça o trabalho real de um intervalo de tempo / lixo de dados e depois pause / espere até que o observador notifique o trabalhador para continuar.
A função de thread de um trabalhador pode ser assim:
do{
//suspend simulation
while (m_PauseSimulation.load())
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
if (m_RunSimulation.load() == false)
{
SignalObserver();
return;
}
}
//lock the data while simulating
{
std::lock_guard<std::mutex> lock(m_LockMutex);
//update simulation
Simulate(static_cast<float>(m_fDeltaTime));
m_PauseSimulation.store(true);
}
//notify the physics manager that work is done here
SignalObserver();
} while (m_RunSimulation.load());
SignalObserver () apenas chama m_Observer-> NotifyOne ().
Agora, o problema é que, após algum tempo, os encadeamentos chegam a um impasse em algum lugar e o observador não os notifica para continuar na próxima etapa. O problema provavelmente está em algum lugar na função WaitForNotifications (), mas não tenho certeza. Atm Eu só tenho um thread de trabalho, então uNumOfNotifications = 1, mas ele ainda corre para o problema em que espera em m_ObserverCV.wait (Lock) e m_CV.wait (espera), nem tenho certeza se é realmente um impasse ou algo assim com o condition_variable porque eu tento acessá-lo de vários segmentos.
Neste ponto, eu gostaria de citar o pai de Ned Flanders: "Não tentamos nada e estamos sem ideias!"
Obrigado pela ajuda. Sempre dica é apreciada.
Fabian
EDITAR:
Obrigado por todas as informações e sugestões úteis. Acabei implementando a segunda idéia de Michael, já que não encontrei nada sobre as barreiras std ::. Então aqui está o que eu fiz:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
uint32_t m_uNumOfNotifications;
uint32_t m_uNotificationCount;
public:
IAsyncObserver()
{
m_bNotified = false;
m_uNumOfNotifications = 0;
m_uNotificationCount = 0;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void SetBarrier(uint32_t _uNumOfNotifications = 1)
{
m_uNumOfNotifications = _uNumOfNotifications;
}
void NotifyBarrier()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
if (++m_uNotificationCount >= m_uNumOfNotifications)
{
m_bNotified = true;
m_ObserverCV.notify_one();
}
}
void WaitForNotifications()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_bNotified == false)
{
m_ObserverCV.wait(Lock);
}
m_uNotificationCount = 0;
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
}; // IAsyncObserver
Na minha função "principal": onde MassSpringSystem e RigidBodySystem são meus funcionários
//update systems here:
{
SetBarrier(m_uTotalNotifyCount);
{ //start MassSpringSystems
std::lock_guard<std::mutex> lock(m_LockMutex);
for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
{
MSS->SetDeltaTime(fDeltaTime);
MSS->Continue();
}
}
//ATTENTION this system works directly on the m_OctreeEntities!
{ //start RigidBodySystems
m_RigidBodySystem.SetDeltaTime(fDeltaTime);
m_RigidBodySystem.AddData(m_RigidBodies);
m_RigidBodySystem.Continue();
}
//wait for all systems to finish -> till they call SignalObserver
WaitForNotifications();
}
E na função de thread dos trabalhadores, como acima, mas desta vez o SignalObserver chama NotifyBarrier ()
Tudo funciona bem agora. Uma solução simples, mas poderosa, obrigado!