Stabilized TaskScheduler (Still buggy)

Former-commit-id: a604c6616065342b21d2c11c27974ec11935a852
This commit is contained in:
Lynix 2013-03-02 00:26:48 +01:00
parent b1da998d24
commit eca2ec3115
2 changed files with 44 additions and 33 deletions

View File

@ -9,6 +9,7 @@
#include <Nazara/Prerequesites.hpp>
#include <Nazara/Core/Functor.hpp>
#include <Nazara/Core/Thread.hpp>
class NAZARA_API NzTaskScheduler
{
@ -20,7 +21,7 @@ class NAZARA_API NzTaskScheduler
template<typename F, typename... Args> static void AddTask(F function, Args... args);
template<typename C> static void AddTask(void (C::*function)(), C* object);
static unsigned int GetWorkerCount();
static bool Initialize();
static bool Initialize(unsigned int workerCount = NzThread::HardwareConcurrency());
static void Uninitialize();
static void WaitForTasks();

View File

@ -12,6 +12,8 @@
#include <vector>
#include <Nazara/Core/Debug.hpp>
///FIXME: Revoir tout ça
namespace
{
struct TaskSchedulerImpl
@ -21,9 +23,11 @@ namespace
NzConditionVariable waiterConditionVariable;
NzConditionVariable workerConditionVariable;
NzMutex taskMutex;
NzMutex taskCountMutex;
NzMutex waiterConditionVariableMutex;
NzMutex workerConditionVariableMutex;
volatile bool running = true;
unsigned int taskCount; ///TODO: Atomic
};
TaskSchedulerImpl* s_impl = nullptr;
@ -50,14 +54,24 @@ namespace
task->Run(); // Chouette ! Allons travailler gaiement
delete task; // Sans oublier de supprimer la tâche
s_impl->taskCountMutex.Lock();
#ifdef NAZARA_DEBUG
if (s_impl->taskCount == 0)
NazaraInternalError("Task count is already 0");
#endif
if (--s_impl->taskCount == 0)
{
// On peut signaler la fin du travail
s_impl->waiterConditionVariableMutex.Lock();
s_impl->waiterConditionVariable.Signal();
s_impl->waiterConditionVariableMutex.Unlock();
}
s_impl->taskCountMutex.Unlock();
}
else
{
// On peut signaler à tout le monde qu'il n'y a plus de tâches
s_impl->waiterConditionVariableMutex.Lock();
s_impl->waiterConditionVariable.SignalAll();
s_impl->waiterConditionVariableMutex.Unlock();
// Nous attendons qu'une nouvelle tâche arrive
s_impl->workerConditionVariableMutex.Lock();
s_impl->workerConditionVariable.Wait(&s_impl->workerConditionVariableMutex);
@ -81,19 +95,16 @@ unsigned int NzTaskScheduler::GetWorkerCount()
return s_impl->workers.size();
}
bool NzTaskScheduler::Initialize()
bool NzTaskScheduler::Initialize(unsigned int workerCount)
{
if (s_impl)
return true; // Déjà initialisé
s_impl = new TaskSchedulerImpl;
s_impl->workers.reserve(workerCount);
unsigned int workerCount = NzThread::HardwareConcurrency();
for (unsigned int i = 0; i < workerCount; ++i)
{
NzThread* thread = new NzThread(WorkerFunc);
s_impl->workers.push_back(thread);
}
s_impl->workers.push_back(new NzThread(WorkerFunc));
return true;
}
@ -104,6 +115,17 @@ void NzTaskScheduler::Uninitialize()
{
s_impl->running = false;
// S'il reste des tâches en cours, on les libère
{
NzLockGuard lock(s_impl->taskMutex);
while (!s_impl->tasks.empty())
{
delete s_impl->tasks.front();
s_impl->tasks.pop();
}
}
// Ensuite on réveille les threads pour qu'ils s'arrêtent d'eux-même
s_impl->workerConditionVariableMutex.Lock();
s_impl->workerConditionVariable.SignalAll();
s_impl->workerConditionVariableMutex.Unlock();
@ -114,13 +136,6 @@ void NzTaskScheduler::Uninitialize()
delete thread;
}
// S'il reste des tâches en cours, on les libère
while (!s_impl->tasks.empty())
{
delete s_impl->tasks.front();
s_impl->tasks.pop();
}
delete s_impl;
s_impl = nullptr;
}
@ -136,18 +151,20 @@ void NzTaskScheduler::WaitForTasks()
}
#endif
s_impl->taskMutex.Lock();
// Tout d'abord, il y a-t-il des tâches en attente ?
if (s_impl->tasks.empty())
{
s_impl->taskMutex.Unlock();
return;
}
s_impl->taskCount = s_impl->tasks.size();
// On verrouille d'abord la mutex entourant le signal (Pour ne pas perdre le signal en chemin)
s_impl->waiterConditionVariableMutex.Lock();
// Et ensuite seulement on déverrouille la mutex des tâches
s_impl->taskMutex.Unlock();
// Et ensuite seulement on réveille les worker
s_impl->workerConditionVariableMutex.Lock();
s_impl->workerConditionVariable.SignalAll();
s_impl->workerConditionVariableMutex.Unlock();
s_impl->waiterConditionVariable.Wait(&s_impl->waiterConditionVariableMutex);
s_impl->waiterConditionVariableMutex.Unlock();
}
@ -162,12 +179,5 @@ void NzTaskScheduler::AddTaskFunctor(NzFunctor* taskFunctor)
}
#endif
{
NzLockGuard lock(s_impl->taskMutex);
s_impl->tasks.push(taskFunctor);
}
s_impl->workerConditionVariableMutex.Lock();
s_impl->workerConditionVariable.Signal();
s_impl->workerConditionVariableMutex.Unlock();
}