Added some comments (as requested)
Former-commit-id: c6c25caf017124c492efc54d5ba8499028896112
This commit is contained in:
parent
622fce6f9c
commit
53e2dfc6c9
|
|
@ -32,6 +32,7 @@ bool NzTaskSchedulerImpl::Initialize(unsigned int workerCount)
|
||||||
|
|
||||||
for (unsigned int i = 0; i < workerCount; ++i)
|
for (unsigned int i = 0; i < workerCount; ++i)
|
||||||
{
|
{
|
||||||
|
// On initialise les évènements, mutex et threads de chaque worker
|
||||||
Worker& worker = s_workers[i];
|
Worker& worker = s_workers[i];
|
||||||
InitializeCriticalSection(&worker.queueMutex);
|
InitializeCriticalSection(&worker.queueMutex);
|
||||||
worker.wakeEvent = CreateEventW(nullptr, false, false, nullptr);
|
worker.wakeEvent = CreateEventW(nullptr, false, false, nullptr);
|
||||||
|
|
@ -40,6 +41,7 @@ bool NzTaskSchedulerImpl::Initialize(unsigned int workerCount)
|
||||||
|
|
||||||
s_doneEvents[i] = CreateEventW(nullptr, true, false, nullptr);
|
s_doneEvents[i] = CreateEventW(nullptr, true, false, nullptr);
|
||||||
|
|
||||||
|
// Le thread va se lancer, signaler qu'il est prêt à travailler (s_doneEvents) et attendre d'être réveillé
|
||||||
workerIDs[i] = i;
|
workerIDs[i] = i;
|
||||||
s_workerThreads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, &WorkerProc, &workerIDs[i], 0, nullptr));
|
s_workerThreads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, &WorkerProc, &workerIDs[i], 0, nullptr));
|
||||||
}
|
}
|
||||||
|
|
@ -57,16 +59,19 @@ bool NzTaskSchedulerImpl::IsInitialized()
|
||||||
|
|
||||||
void NzTaskSchedulerImpl::Run(NzFunctor** tasks, unsigned int count)
|
void NzTaskSchedulerImpl::Run(NzFunctor** tasks, unsigned int count)
|
||||||
{
|
{
|
||||||
|
// On s'assure que des tâches ne sont pas déjà en cours
|
||||||
WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE);
|
WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE);
|
||||||
|
|
||||||
std::ldiv_t div = std::ldiv(count, s_workerCount); // Division et modulo en une opération, y'a pas de petit profit
|
std::ldiv_t div = std::ldiv(count, s_workerCount); // Division et modulo en une opération, y'a pas de petit profit
|
||||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||||
{
|
{
|
||||||
|
// On va maintenant répartir les tâches entre chaque worker et les envoyer dans la queue de chacun
|
||||||
Worker& worker = s_workers[i];
|
Worker& worker = s_workers[i];
|
||||||
unsigned int taskCount = (i == 0) ? div.quot + div.rem : div.quot;
|
unsigned int taskCount = (i == 0) ? div.quot + div.rem : div.quot;
|
||||||
for (unsigned int j = 0; j < taskCount; ++j)
|
for (unsigned int j = 0; j < taskCount; ++j)
|
||||||
worker.queue.push(*tasks++);
|
worker.queue.push(*tasks++);
|
||||||
|
|
||||||
|
// On stocke le nombre de tâches à côté dans un entier atomique pour éviter d'entrer inutilement dans une section critique
|
||||||
worker.workCount = taskCount;
|
worker.workCount = taskCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,6 +93,7 @@ void NzTaskSchedulerImpl::Uninitialize()
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// On commence par vider la queue de chaque worker pour s'assurer qu'ils s'arrêtent
|
||||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||||
{
|
{
|
||||||
Worker& worker = s_workers[i];
|
Worker& worker = s_workers[i];
|
||||||
|
|
@ -97,15 +103,18 @@ void NzTaskSchedulerImpl::Uninitialize()
|
||||||
EnterCriticalSection(&worker.queueMutex);
|
EnterCriticalSection(&worker.queueMutex);
|
||||||
|
|
||||||
std::queue<NzFunctor*> emptyQueue;
|
std::queue<NzFunctor*> emptyQueue;
|
||||||
std::swap(worker.queue, emptyQueue); // Et on vide la queue
|
std::swap(worker.queue, emptyQueue); // Et on vide la queue (merci std::swap)
|
||||||
|
|
||||||
LeaveCriticalSection(&worker.queueMutex);
|
LeaveCriticalSection(&worker.queueMutex);
|
||||||
|
|
||||||
|
// On réveille le worker pour qu'il sorte de la boucle et termine le thread
|
||||||
SetEvent(worker.wakeEvent);
|
SetEvent(worker.wakeEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// On attend que chaque thread se termine
|
||||||
WaitForMultipleObjects(s_workerCount, &s_workerThreads[0], true, INFINITE);
|
WaitForMultipleObjects(s_workerCount, &s_workerThreads[0], true, INFINITE);
|
||||||
|
|
||||||
|
// Et on libère les ressources
|
||||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||||
{
|
{
|
||||||
Worker& worker = s_workers[i];
|
Worker& worker = s_workers[i];
|
||||||
|
|
@ -142,19 +151,24 @@ NzFunctor* NzTaskSchedulerImpl::StealTask(unsigned int workerID)
|
||||||
shouldRetry = false;
|
shouldRetry = false;
|
||||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||||
{
|
{
|
||||||
|
// On ne vole pas la famille, ni soi-même.
|
||||||
if (i == workerID)
|
if (i == workerID)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
Worker& worker = s_workers[i];
|
Worker& worker = s_workers[i];
|
||||||
|
|
||||||
|
// Ce worker a-t-il encore des tâches dans sa file d'attente ?
|
||||||
if (worker.workCount > 0)
|
if (worker.workCount > 0)
|
||||||
{
|
{
|
||||||
NzFunctor* task = nullptr;
|
NzFunctor* task = nullptr;
|
||||||
|
|
||||||
|
// Est-ce qu'il utilise la queue maintenant ?
|
||||||
if (TryEnterCriticalSection(&worker.queueMutex))
|
if (TryEnterCriticalSection(&worker.queueMutex))
|
||||||
{
|
{
|
||||||
if (!worker.queue.empty())
|
// Non, super ! Profitons-en pour essayer de lui voler un job
|
||||||
|
if (!worker.queue.empty()) // On vérifie que la queue n'est pas vide (peut avoir changé avant le verrouillage)
|
||||||
{
|
{
|
||||||
|
// Et hop, on vole la tâche
|
||||||
task = worker.queue.front();
|
task = worker.queue.front();
|
||||||
worker.queue.pop();
|
worker.queue.pop();
|
||||||
worker.workCount = worker.queue.size();
|
worker.workCount = worker.queue.size();
|
||||||
|
|
@ -165,13 +179,15 @@ NzFunctor* NzTaskSchedulerImpl::StealTask(unsigned int workerID)
|
||||||
else
|
else
|
||||||
shouldRetry = true; // Il est encore possible d'avoir un job
|
shouldRetry = true; // Il est encore possible d'avoir un job
|
||||||
|
|
||||||
|
// Avons-nous notre tâche ?
|
||||||
if (task)
|
if (task)
|
||||||
return task;
|
return task; // Parfait, sortons de là !
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (shouldRetry);
|
while (shouldRetry);
|
||||||
|
|
||||||
|
// Bon à priori plus aucun worker n'a de tâche
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -199,11 +215,13 @@ unsigned int __stdcall NzTaskSchedulerImpl::WorkerProc(void* userdata)
|
||||||
LeaveCriticalSection(&worker.queueMutex);
|
LeaveCriticalSection(&worker.queueMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Que faire quand vous n'avez plus de travail ?
|
||||||
if (!task)
|
if (!task)
|
||||||
task = StealTask(workerID);
|
task = StealTask(workerID); // Voler le travail des autres !
|
||||||
|
|
||||||
if (task)
|
if (task)
|
||||||
{
|
{
|
||||||
|
// On exécute la tâche avant de la supprimer
|
||||||
task->Run();
|
task->Run();
|
||||||
delete task;
|
delete task;
|
||||||
}
|
}
|
||||||
|
|
@ -214,7 +232,9 @@ unsigned int __stdcall NzTaskSchedulerImpl::WorkerProc(void* userdata)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SetEvent(s_doneEvents[workerID]); // Au cas où un thread attendrait sur WaitForTasks() pendant qu'un autre appellerait Uninitialize()
|
// Au cas où un thread attendrait sur WaitForTasks() pendant qu'un autre appellerait Uninitialize()
|
||||||
|
// Ça ne devrait pas arriver, mais comme ça ne coûte pas grand chose..
|
||||||
|
SetEvent(s_doneEvents[workerID]);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue