diff --git a/include/Nazara/Core/TaskScheduler.hpp b/include/Nazara/Core/TaskScheduler.hpp index 401542603..e1c2dc99b 100644 --- a/include/Nazara/Core/TaskScheduler.hpp +++ b/include/Nazara/Core/TaskScheduler.hpp @@ -9,7 +9,6 @@ #include #include -#include class NAZARA_API NzTaskScheduler { @@ -22,6 +21,7 @@ class NAZARA_API NzTaskScheduler template static void AddTask(void (C::*function)(), C* object); static unsigned int GetWorkerCount(); static bool Initialize(); + static void Run(); static void SetWorkerCount(unsigned int workerCount); static void Uninitialize(); static void WaitForTasks(); diff --git a/include/Nazara/Utility/Config.hpp b/include/Nazara/Utility/Config.hpp index fa2806b47..007fcfd50 100644 --- a/include/Nazara/Utility/Config.hpp +++ b/include/Nazara/Utility/Config.hpp @@ -36,7 +36,7 @@ #define NAZARA_UTILITY_MEMORYLEAKTRACKER 0 // Le skinning doit-il prendre avantage du multi-threading ? (Boost de performances sur les processeurs multi-coeurs) -#define NAZARA_UTILITY_MULTITHREADED_SKINNING 0 ///FIXME: Bug du TaskScheduler +#define NAZARA_UTILITY_MULTITHREADED_SKINNING 1 // Active les tests de sécurité basés sur le code (Conseillé pour le développement) #define NAZARA_UTILITY_SAFE 1 diff --git a/src/Nazara/Core/TaskScheduler.cpp b/src/Nazara/Core/TaskScheduler.cpp index 74a4d549f..1bae416c6 100644 --- a/src/Nazara/Core/TaskScheduler.cpp +++ b/src/Nazara/Core/TaskScheduler.cpp @@ -3,86 +3,23 @@ // For conditions of distribution and use, see copyright notice in Config.hpp #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -///FIXME: Revoir tout ça +#if defined(NAZARA_PLATFORM_WINDOWS) + #include +#elif defined(NAZARA_PLATFORM_POSIX) + #include +#else + #error Lack of implementation: Task Scheduler +#endif + +#include namespace { - struct TaskSchedulerImpl - { - std::queue tasks; - std::vector workers; - NzConditionVariable waiterConditionVariable; - NzConditionVariable workerConditionVariable; - NzMutex taskMutex; - NzMutex taskCountMutex; - NzMutex waiterConditionVariableMutex; - NzMutex workerConditionVariableMutex; - volatile bool running = true; - std::atomic taskCount; - }; - - TaskSchedulerImpl* s_impl = nullptr; + std::vector s_pendingWorks; unsigned int s_workerCount = 0; - - void WorkerFunc() - { - do - { - NzFunctor* task; - { - NzLockGuard lock(s_impl->taskMutex); - if (!s_impl->tasks.empty()) - { - task = s_impl->tasks.front(); - s_impl->tasks.pop(); - } - else - task = nullptr; - } - - // Avons-nous une tâche ? - if (task) - { - task->Run(); // Chouette ! Allons travailler gaiement - - delete task; // Sans oublier de supprimer la tâche - - s_impl->taskCountMutex.Lock(); - #ifdef NAZARA_DEBUG - if (s_impl->taskCount == 0) - NazaraInternalError("Task count is already 0"); - #endif - - if (--s_impl->taskCount == 0) - { - // On peut signaler la fin du travail - s_impl->waiterConditionVariableMutex.Lock(); - s_impl->waiterConditionVariable.Signal(); - s_impl->waiterConditionVariableMutex.Unlock(); - } - s_impl->taskCountMutex.Unlock(); - } - else - { - // Nous attendons qu'une nouvelle tâche arrive - s_impl->workerConditionVariableMutex.Lock(); - s_impl->workerConditionVariable.Wait(&s_impl->workerConditionVariableMutex); - s_impl->workerConditionVariableMutex.Unlock(); - } - } - while (s_impl->running); - } } unsigned int NzTaskScheduler::GetWorkerCount() @@ -92,102 +29,47 @@ unsigned int NzTaskScheduler::GetWorkerCount() bool NzTaskScheduler::Initialize() { - if (s_impl) - return true; // Déjà initialisé + return NzTaskSchedulerImpl::Initialize(GetWorkerCount()); +} - s_impl = new TaskSchedulerImpl; - - unsigned int workerCount = GetWorkerCount(); - - s_impl->workers.resize(workerCount); - for (unsigned int i = 0; i < workerCount; ++i) - s_impl->workers[i] = NzThread(WorkerFunc); - - return true; +void NzTaskScheduler::Run() +{ + NzTaskSchedulerImpl::Run(&s_pendingWorks[0], s_pendingWorks.size()); + s_pendingWorks.clear(); } void NzTaskScheduler::SetWorkerCount(unsigned int workerCount) -{ - s_workerCount = workerCount; - if (s_impl) - { - unsigned int newWorkerCount = GetWorkerCount(); - unsigned int oldWorkerCount = s_impl->workers.size(); - s_impl->workers.resize(newWorkerCount); - if (newWorkerCount > oldWorkerCount) - { - for (unsigned int i = oldWorkerCount-1; i < newWorkerCount; ++i) - s_impl->workers[i] = NzThread(WorkerFunc); - } - } -} - -void NzTaskScheduler::Uninitialize() -{ - if (s_impl) - { - s_impl->running = false; - - // S'il reste des tâches en cours, on les libère - { - NzLockGuard lock(s_impl->taskMutex); - while (!s_impl->tasks.empty()) - { - delete s_impl->tasks.front(); - s_impl->tasks.pop(); - } - } - - // Ensuite on réveille les threads pour qu'ils s'arrêtent d'eux-même - s_impl->workerConditionVariableMutex.Lock(); - s_impl->workerConditionVariable.SignalAll(); - s_impl->workerConditionVariableMutex.Unlock(); - - for (NzThread& thread : s_impl->workers) - thread.Join(); - - delete s_impl; - s_impl = nullptr; - } -} - -void NzTaskScheduler::WaitForTasks() { #ifdef NAZARA_CORE_SAFE - if (!s_impl) + if (NzTaskSchedulerImpl::IsInitialized()) { - NazaraError("Task scheduler is not initialized"); + NazaraError("Worker count cannot be set while initialized"); return; } #endif - // Tout d'abord, il y a-t-il des tâches en attente ? - if (s_impl->tasks.empty()) - return; + s_workerCount = workerCount; +} - s_impl->taskCount = s_impl->tasks.size(); +void NzTaskScheduler::Uninitialize() +{ + NzTaskSchedulerImpl::Uninitialize(); +} - // On verrouille d'abord la mutex entourant le signal (Pour ne pas perdre le signal en chemin) - s_impl->waiterConditionVariableMutex.Lock(); - - // Et ensuite seulement on réveille les worker - s_impl->workerConditionVariableMutex.Lock(); - s_impl->workerConditionVariable.SignalAll(); - s_impl->workerConditionVariableMutex.Unlock(); - - s_impl->waiterConditionVariable.Wait(&s_impl->waiterConditionVariableMutex); - s_impl->waiterConditionVariableMutex.Unlock(); +void NzTaskScheduler::WaitForTasks() +{ + NzTaskSchedulerImpl::WaitForTasks(); } void NzTaskScheduler::AddTaskFunctor(NzFunctor* taskFunctor) { #ifdef NAZARA_CORE_SAFE - if (!s_impl) + if (!NzTaskSchedulerImpl::IsInitialized()) { NazaraError("Task scheduler is not initialized"); return; } #endif - s_impl->tasks.push(taskFunctor); + s_pendingWorks.push_back(taskFunctor); } diff --git a/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp b/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp new file mode 100644 index 000000000..4a098d489 --- /dev/null +++ b/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp @@ -0,0 +1,206 @@ +// Copyright (C) 2013 Jérôme Leclercq +// This file is part of the "Nazara Engine - Core module" +// For conditions of distribution and use, see copyright notice in Config.hpp + +#include +#include +#include +#include +#include + +bool NzTaskSchedulerImpl::Initialize(unsigned int workerCount) +{ + if (s_workerCount > 0) + return true; // Déjà initialisé + + #if NAZARA_CORE_SAFE + if (workerCount == 0) + { + NazaraError("Invalid worker count ! (0)"); + return false; + } + #endif + + s_workerCount = workerCount; + s_doneEvents.reset(new HANDLE[workerCount]); + s_workers.reset(new Worker[workerCount]); + s_workerThreads.reset(new HANDLE[workerCount]); + + std::unique_ptr workerIDs(new unsigned int[workerCount]); + + for (unsigned int i = 0; i < workerCount; ++i) + { + Worker& worker = s_workers[i]; + InitializeCriticalSection(&worker.queueMutex); + worker.wakeEvent = CreateEventA(nullptr, false, false, nullptr); + worker.running = true; + worker.workCount = 0; + + s_doneEvents[i] = CreateEventA(nullptr, true, false, nullptr); + + workerIDs[i] = i; + s_workerThreads[i] = reinterpret_cast(_beginthreadex(nullptr, 0, &WorkerProc, &workerIDs[i], 0, nullptr)); + } + + WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); + + return true; +} + +bool NzTaskSchedulerImpl::IsInitialized() +{ + return s_workerCount > 0; +} + +void NzTaskSchedulerImpl::Run(NzFunctor** tasks, unsigned int count) +{ + WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); + + std::ldiv_t div = std::ldiv(count, s_workerCount); // Division et modulo en une opération, y'a pas de petit profit + for (unsigned int i = 0; i < s_workerCount; ++i) + { + Worker& worker = s_workers[i]; + unsigned int taskCount = (i == 0) ? div.quot + div.rem : div.quot; + for (unsigned int j = 0; j < taskCount; ++j) + worker.queue.push(*tasks++); + + worker.workCount = taskCount; + } + + for (unsigned int i = 0; i < s_workerCount; ++i) + { + ResetEvent(s_doneEvents[i]); + SetEvent(s_workers[i].wakeEvent); + } +} + +void NzTaskSchedulerImpl::Uninitialize() +{ + #ifdef NAZARA_CORE_SAFE + if (s_workerCount == 0) + { + NazaraError("Task scheduler is not initialized"); + return; + } + #endif + + for (unsigned int i = 0; i < s_workerCount; ++i) + { + Worker& worker = s_workers[i]; + worker.running = false; + + EnterCriticalSection(&worker.queueMutex); + + std::queue emptyQueue; + std::swap(worker.queue, emptyQueue); // Et on vide la queue + + LeaveCriticalSection(&worker.queueMutex); + + SetEvent(worker.wakeEvent); + } + + WaitForMultipleObjects(s_workerCount, &s_workerThreads[0], true, INFINITE); + + for (unsigned int i = 0; i < s_workerCount; ++i) + CloseHandle(s_workerThreads[i]); +} + +void NzTaskSchedulerImpl::WaitForTasks() +{ + #ifdef NAZARA_CORE_SAFE + if (s_workerCount == 0) + { + NazaraError("Task scheduler is not initialized"); + return; + } + #endif + + WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); +} + +NzFunctor* NzTaskSchedulerImpl::StealTask(unsigned int workerID) +{ + bool shouldRetry; + do + { + shouldRetry = false; + for (unsigned int i = 0; i < s_workerCount; ++i) + { + if (i == workerID) + continue; + + Worker& worker = s_workers[i]; + + if (worker.workCount > 0) + { + NzFunctor* task; + if (TryEnterCriticalSection(&worker.queueMutex)) + { + if (!worker.queue.empty()) + { + task = worker.queue.front(); + worker.queue.pop(); + worker.workCount = worker.queue.size(); + } + else + task = nullptr; + + LeaveCriticalSection(&worker.queueMutex); + } + else + shouldRetry = true; // Il est encore possible d'avoir un job + + if (task) + return task; + } + } + } + while (shouldRetry); + + return nullptr; +} + +unsigned int __stdcall NzTaskSchedulerImpl::WorkerProc(void* userdata) +{ + unsigned int workerID = *reinterpret_cast(userdata); + SetEvent(s_doneEvents[workerID]); + + Worker& worker = s_workers[workerID]; + while (worker.running) + { + NzFunctor* task = nullptr; + + if (worker.workCount > 0) // Permet d'éviter d'entrer inutilement dans une section critique + { + EnterCriticalSection(&worker.queueMutex); + if (!worker.queue.empty()) + { + task = worker.queue.front(); + worker.queue.pop(); + worker.workCount = worker.queue.size(); + } + LeaveCriticalSection(&worker.queueMutex); + } + + if (!task) + task = StealTask(workerID); + + if (task) + { + task->Run(); + delete task; + } + else + { + SetEvent(s_doneEvents[workerID]); + WaitForSingleObject(worker.wakeEvent, INFINITE); + } + } + + return 0; +} + +std::unique_ptr NzTaskSchedulerImpl::s_doneEvents; // Doivent être contigus +std::unique_ptr NzTaskSchedulerImpl::s_workers; +std::unique_ptr NzTaskSchedulerImpl::s_workerThreads; // Doivent être contigus +unsigned int NzTaskSchedulerImpl::s_workerCount; diff --git a/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp b/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp new file mode 100644 index 000000000..604377a42 --- /dev/null +++ b/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp @@ -0,0 +1,48 @@ +// Copyright (C) 2013 Jérôme Leclercq +// This file is part of the "Nazara Engine - Core module" +// For conditions of distribution and use, see copyright notice in Config.hpp + +#pragma once + +#ifndef NAZARA_TASKSCHEDULERIMPL_HPP +#define NAZARA_TASKSCHEDULERIMPL_HPP + +#include +#include +#include +#include +#include +#include + +class NzTaskSchedulerImpl +{ + public: + NzTaskSchedulerImpl() = delete; + ~NzTaskSchedulerImpl() = delete; + + static bool Initialize(unsigned int workerCount); + static bool IsInitialized(); + static void Run(NzFunctor** tasks, unsigned int count); + static void Uninitialize(); + static void WaitForTasks(); + + private: + static NzFunctor* StealTask(unsigned int workerID); + static unsigned int __stdcall WorkerProc(void* userdata); + + struct Worker + { + std::atomic_uint workCount; + std::queue queue; + CRITICAL_SECTION queueMutex; + HANDLE wakeEvent; + volatile bool running; + }; + + static std::unique_ptr s_doneEvents; // Doivent être contigus + static std::unique_ptr s_workers; + static std::unique_ptr s_workerThreads; // Doivent être contigus + static unsigned int s_workerCount; +}; + +#endif // NAZARA_TASKSCHEDULERIMPL_HPP diff --git a/src/Nazara/Utility/SkeletalMesh.cpp b/src/Nazara/Utility/SkeletalMesh.cpp index eea9680d9..4654f1998 100644 --- a/src/Nazara/Utility/SkeletalMesh.cpp +++ b/src/Nazara/Utility/SkeletalMesh.cpp @@ -377,6 +377,7 @@ void NzSkeletalMesh::Skin(NzMeshVertex* outputBuffer, const NzSkeleton* skeleton for (unsigned int i = 0; i < workerCount; ++i) NzTaskScheduler::AddTask(Skin_PositionNormalTangent, skinningInfos, i*div.quot, (i == workerCount-1) ? div.quot + div.rem : div.quot); + NzTaskScheduler::Run(); NzTaskScheduler::WaitForTasks(); #else Skin_PositionNormalTangent(skinningInfos, 0, m_impl->vertexCount); diff --git a/src/Nazara/Utility/Utility.cpp b/src/Nazara/Utility/Utility.cpp index 1d64aab57..8ccd29afe 100644 --- a/src/Nazara/Utility/Utility.cpp +++ b/src/Nazara/Utility/Utility.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include