From 9d669f722ec1bedfd20dd5f3fb43e91abf55f5b8 Mon Sep 17 00:00:00 2001 From: Lynix Date: Wed, 31 Jan 2024 16:42:25 +0100 Subject: [PATCH] Core: Rework TaskScheduler (WIP) --- include/Nazara/Core/TaskScheduler.hpp | 43 ++- include/Nazara/Core/TaskScheduler.inl | 43 --- src/Nazara/Core/Core.cpp | 1 - src/Nazara/Core/Posix/TaskSchedulerImpl.cpp | 249 ---------------- src/Nazara/Core/Posix/TaskSchedulerImpl.hpp | 68 ----- src/Nazara/Core/TaskScheduler.cpp | 300 +++++++++++++------- src/Nazara/Core/Win32/TaskSchedulerImpl.cpp | 250 ---------------- src/Nazara/Core/Win32/TaskSchedulerImpl.hpp | 51 ---- tests/ComputeParticlesTest/main.cpp | 31 +- 9 files changed, 239 insertions(+), 797 deletions(-) delete mode 100644 src/Nazara/Core/Posix/TaskSchedulerImpl.cpp delete mode 100644 src/Nazara/Core/Posix/TaskSchedulerImpl.hpp delete mode 100644 src/Nazara/Core/Win32/TaskSchedulerImpl.cpp delete mode 100644 src/Nazara/Core/Win32/TaskSchedulerImpl.hpp diff --git a/include/Nazara/Core/TaskScheduler.hpp b/include/Nazara/Core/TaskScheduler.hpp index b7474b6a0..a077eac30 100644 --- a/include/Nazara/Core/TaskScheduler.hpp +++ b/include/Nazara/Core/TaskScheduler.hpp @@ -8,28 +8,45 @@ #define NAZARA_CORE_TASKSCHEDULER_HPP #include -#include +#include +#include +#include +#include +#include namespace Nz { class NAZARA_CORE_API TaskScheduler { public: - TaskScheduler() = delete; - ~TaskScheduler() = delete; + using Task = std::function; - template static void AddTask(F function); - template static void AddTask(F function, Args&&... args); - template static void AddTask(void (C::*function)(), C* object); - static unsigned int GetWorkerCount(); - static bool Initialize(); - static void Run(); - static void SetWorkerCount(unsigned int workerCount); - static void Uninitialize(); - static void WaitForTasks(); + TaskScheduler(unsigned int workerCount = 0); + TaskScheduler(const TaskScheduler&) = delete; + TaskScheduler(TaskScheduler&&) = default; + ~TaskScheduler(); + + void AddTask(Task&& task); + + unsigned int GetWorkerCount() const; + + void WaitForTasks(); + + TaskScheduler& operator=(const TaskScheduler&) = delete; + TaskScheduler& operator=(TaskScheduler&&) = default; private: - static void AddTaskFunctor(AbstractFunctor* taskFunctor); + class Worker; + friend Worker; + + Worker& GetWorker(unsigned int workerIndex); + void NotifyWorkerActive(); + void NotifyWorkerIdle(); + + std::atomic_bool m_idle; + std::atomic_uint m_idleWorkerCount; + std::minstd_rand m_randomGenerator; + std::vector m_workers; }; } diff --git a/include/Nazara/Core/TaskScheduler.inl b/include/Nazara/Core/TaskScheduler.inl index 100373b41..a8eacde68 100644 --- a/include/Nazara/Core/TaskScheduler.inl +++ b/include/Nazara/Core/TaskScheduler.inl @@ -6,49 +6,6 @@ namespace Nz { - /*! - * \ingroup core - * \class Nz::TaskScheduler - * \brief Core class that represents a thread pool - */ - - /*! - * \brief Adds a task to the pending list - * - * \param function Task that the pool will execute - */ - - template - void TaskScheduler::AddTask(F function) - { - AddTaskFunctor(new FunctorWithoutArgs(function)); - } - - /*! - * \brief Adds a task to the pending list - * - * \param function Task that the pool will execute - * \param args Arguments of the function - */ - - template - void TaskScheduler::AddTask(F function, Args&&... args) - { - AddTaskFunctor(new FunctorWithArgs(function, std::forward(args)...)); - } - - /*! - * \brief Adds a task to the pending list - * - * \param function Task that the pool will execute - * \param object Object on which the method will be called - */ - - template - void TaskScheduler::AddTask(void (C::*function)(), C* object) - { - AddTaskFunctor(new MemberWithoutArgs(function, object)); - } } #include diff --git a/src/Nazara/Core/Core.cpp b/src/Nazara/Core/Core.cpp index 3a3297e2d..689f1a36b 100644 --- a/src/Nazara/Core/Core.cpp +++ b/src/Nazara/Core/Core.cpp @@ -33,7 +33,6 @@ namespace Nz { m_hardwareInfo.reset(); - TaskScheduler::Uninitialize(); LogUninit(); Log::Uninitialize(); } diff --git a/src/Nazara/Core/Posix/TaskSchedulerImpl.cpp b/src/Nazara/Core/Posix/TaskSchedulerImpl.cpp deleted file mode 100644 index 779104519..000000000 --- a/src/Nazara/Core/Posix/TaskSchedulerImpl.cpp +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com) -// This file is part of the "Nazara Engine - Core module" -// For conditions of distribution and use, see copyright notice in Config.hpp - -#include -#include -#include - -#if defined(NAZARA_PLATFORM_MACOS) - #include -#endif - -namespace Nz -{ - bool TaskSchedulerImpl::Initialize(unsigned int workerCount) - { - if (IsInitialized()) - return true; // Déjà initialisé - - #if NAZARA_CORE_SAFE - if (workerCount == 0) - { - NazaraError("invalid worker count ! (0)"); - return false; - } - #endif - - s_workerCount = workerCount; - s_isDone = false; - s_isWaiting = false; - s_shouldFinish = false; - - s_threads.reset(new pthread_t[workerCount]); - - // On initialise les conditions variables, mutex et barrière. - pthread_cond_init(&s_cvEmpty, nullptr); - pthread_cond_init(&s_cvNotEmpty, nullptr); - pthread_mutex_init(&s_mutexQueue, nullptr); - pthread_barrier_init(&s_barrier, nullptr, workerCount + 1); - - for (unsigned int i = 0; i < s_workerCount; ++i) - { - // Le thread va se lancer, attendre que tous se créent et attendre d'être réveillé. - pthread_create(&s_threads[i], nullptr, WorkerProc, nullptr); - } - - pthread_barrier_wait(&s_barrier); // On attend que les enfants soient bien créés. - - return true; - } - - bool TaskSchedulerImpl::IsInitialized() - { - return s_workerCount > 0; - } - - void TaskSchedulerImpl::Run(AbstractFunctor** tasks, unsigned int count) - { - // On s'assure que des tâches ne sont pas déjà en cours - Wait(); - - pthread_mutex_lock(&s_mutexQueue); - s_isDone = false; - - while (count--) - s_tasks.push(*tasks++); - - pthread_cond_signal(&s_cvNotEmpty); - pthread_mutex_unlock(&s_mutexQueue); - } - - void TaskSchedulerImpl::Uninitialize() - { - #ifdef NAZARA_CORE_SAFE - if (s_workerCount == 0) - { - NazaraError("task scheduler is not initialized"); - return; - } - #endif - - // On réveille les threads pour qu'ils sortent de la boucle et terminent. - pthread_mutex_lock(&s_mutexQueue); - // On commence par vider la queue et demander qu'ils s'arrêtent. - std::queue emptyQueue; - std::swap(s_tasks, emptyQueue); - s_shouldFinish = true; - pthread_cond_broadcast(&s_cvNotEmpty); - pthread_mutex_unlock(&s_mutexQueue); - - // On attend que chaque thread se termine - for (unsigned int i = 0; i < s_workerCount; ++i) - pthread_join(s_threads[i], nullptr); - - // Et on libère les ressources - pthread_barrier_destroy(&s_barrier); - pthread_cond_destroy(&s_cvEmpty); - pthread_cond_destroy(&s_cvNotEmpty); - pthread_mutex_destroy(&s_mutexQueue); - - s_workerCount = 0; - } - - void TaskSchedulerImpl::WaitForTasks() - { - #ifdef NAZARA_CORE_SAFE - if (s_workerCount == 0) - { - NazaraError("task scheduler is not initialized"); - return; - } - #endif - - Wait(); - } - - AbstractFunctor* TaskSchedulerImpl::PopQueue() - { - AbstractFunctor* task = nullptr; - - pthread_mutex_lock(&s_mutexQueue); - - if (!s_tasks.empty()) - { - task = s_tasks.front(); - s_tasks.pop(); - } - - pthread_mutex_unlock(&s_mutexQueue); - - return task; - } - - void TaskSchedulerImpl::Wait() - { - if (s_isDone) - return; - - pthread_mutex_lock(&s_mutexQueue); - s_isWaiting = true; - pthread_cond_broadcast(&s_cvNotEmpty); - pthread_cond_wait(&s_cvEmpty, &s_mutexQueue); - pthread_mutex_unlock(&s_mutexQueue); - - s_isDone = true; - } - - void* TaskSchedulerImpl::WorkerProc(void* /*userdata*/) - { - // On s'assure que tous les threads soient correctement lancés. - pthread_barrier_wait(&s_barrier); - - // On quitte s'il doit terminer. - while (!s_shouldFinish) - { - AbstractFunctor* task = PopQueue(); - - if (task) - { - // On exécute la tâche avant de la supprimer - task->Run(); - delete task; - } - else - { - pthread_mutex_lock(&s_mutexQueue); - if (s_tasks.empty()) - s_isDone = true; - - while (!(!s_tasks.empty() || s_isWaiting || s_shouldFinish)) - pthread_cond_wait(&s_cvNotEmpty, &s_mutexQueue); - - if (s_tasks.empty() && s_isWaiting) - { - // On prévient le thread qui attend que les tâches soient effectuées. - s_isWaiting = false; - pthread_cond_signal(&s_cvEmpty); - } - - pthread_mutex_unlock(&s_mutexQueue); - } - } - - return nullptr; - } - - std::queue TaskSchedulerImpl::s_tasks; - std::unique_ptr TaskSchedulerImpl::s_threads; - std::atomic TaskSchedulerImpl::s_isDone; - std::atomic TaskSchedulerImpl::s_isWaiting; - std::atomic TaskSchedulerImpl::s_shouldFinish; - unsigned int TaskSchedulerImpl::s_workerCount; - - pthread_mutex_t TaskSchedulerImpl::s_mutexQueue; - pthread_cond_t TaskSchedulerImpl::s_cvEmpty; - pthread_cond_t TaskSchedulerImpl::s_cvNotEmpty; - pthread_barrier_t TaskSchedulerImpl::s_barrier; - -#if defined(NAZARA_PLATFORM_MACOS) - //Code from https://blog.albertarmea.com/post/47089939939/using-pthreadbarrier-on-mac-os-x - int TaskSchedulerImpl::pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count) - { - if(count == 0) - { - errno = EINVAL; - return -1; - } - if(pthread_mutex_init(&barrier->mutex, 0) < 0) - { - return -1; - } - if(pthread_cond_init(&barrier->cond, 0) < 0) - { - pthread_mutex_destroy(&barrier->mutex); - return -1; - } - barrier->tripCount = count; - barrier->count = 0; - - return 0; - } - - int TaskSchedulerImpl::pthread_barrier_destroy(pthread_barrier_t *barrier) - { - pthread_cond_destroy(&barrier->cond); - pthread_mutex_destroy(&barrier->mutex); - return 0; - } - - int TaskSchedulerImpl::pthread_barrier_wait(pthread_barrier_t *barrier) - { - pthread_mutex_lock(&barrier->mutex); - ++(barrier->count); - if(barrier->count >= barrier->tripCount) - { - barrier->count = 0; - pthread_cond_broadcast(&barrier->cond); - pthread_mutex_unlock(&barrier->mutex); - return 1; - } - else - { - pthread_cond_wait(&barrier->cond, &(barrier->mutex)); - pthread_mutex_unlock(&barrier->mutex); - return 0; - } - } -#endif -} diff --git a/src/Nazara/Core/Posix/TaskSchedulerImpl.hpp b/src/Nazara/Core/Posix/TaskSchedulerImpl.hpp deleted file mode 100644 index 5b517f5e5..000000000 --- a/src/Nazara/Core/Posix/TaskSchedulerImpl.hpp +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com) -// This file is part of the "Nazara Engine - Core module" -// For conditions of distribution and use, see copyright notice in Config.hpp - -#pragma once - -#ifndef NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP -#define NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP - -#include -#include -#include -#include -#include - -#if defined(NAZARA_PLATFORM_MACOS) - typedef int pthread_barrierattr_t; - typedef struct - { - pthread_mutex_t mutex; - pthread_cond_t cond; - int count; - int tripCount; - } pthread_barrier_t; -#endif - -namespace Nz -{ - struct AbstractFunctor; - - class TaskSchedulerImpl - { - public: - TaskSchedulerImpl() = delete; - ~TaskSchedulerImpl() = delete; - - static bool Initialize(unsigned int workerCount); - static bool IsInitialized(); - static void Run(AbstractFunctor** tasks, unsigned int count); - static void Uninitialize(); - static void WaitForTasks(); - - private: - static AbstractFunctor* PopQueue(); - static void Wait(); - static void* WorkerProc(void* userdata); - - static std::queue s_tasks; - static std::unique_ptr s_threads; - static std::atomic s_isDone; - static std::atomic s_isWaiting; - static std::atomic s_shouldFinish; - static unsigned int s_workerCount; - - static pthread_mutex_t s_mutexQueue; - static pthread_cond_t s_cvEmpty; - static pthread_cond_t s_cvNotEmpty; - static pthread_barrier_t s_barrier; - -#if defined(NAZARA_PLATFORM_MACOS) - static int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count); - static int pthread_barrier_destroy(pthread_barrier_t *barrier); - static int pthread_barrier_wait(pthread_barrier_t *barrier); -#endif - }; -} - -#endif // NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP diff --git a/src/Nazara/Core/TaskScheduler.cpp b/src/Nazara/Core/TaskScheduler.cpp index 927b9a881..c9e76552d 100644 --- a/src/Nazara/Core/TaskScheduler.cpp +++ b/src/Nazara/Core/TaskScheduler.cpp @@ -4,140 +4,220 @@ #include #include -#include - -#if defined(NAZARA_PLATFORM_WINDOWS) - #include -#elif defined(NAZARA_PLATFORM_POSIX) - #include -#else - #error Lack of implementation: Task Scheduler -#endif - +#include +#include +#include +#include +#include +#include +#include #include namespace Nz { - namespace + NAZARA_WARNING_PUSH() + NAZARA_WARNING_MSVC_DISABLE(4324) + + class alignas(std::hardware_destructive_interference_size) TaskScheduler::Worker { - std::vector s_pendingWorks; - unsigned int s_workerCount = 0; + public: + Worker(TaskScheduler& owner, unsigned int workerIndex) : + m_owner(owner), + m_workerIndex(workerIndex) + { + m_thread = std::jthread([this](std::stop_token stopToken) + { + SetCurrentThreadName(fmt::format("NzWorker #{0}", m_workerIndex).c_str()); + Run(stopToken); + }); + } + + Worker(const Worker&) = delete; + + Worker(Worker&& worker) : + m_owner(worker.m_owner) + { + NAZARA_UNREACHABLE(); + } + + bool AddTask(Task&& task) + { + std::unique_lock lock(m_mutex, std::defer_lock); + if (!lock.try_lock()) + return false; + + m_tasks.push_back(std::move(task)); + lock.unlock(); + + m_conditionVariable.notify_one(); + return true; + } + + void Run(std::stop_token& stopToken) + { + StackArray randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_owner.GetWorkerCount() - 1); + { + unsigned int* indexPtr = randomWorkerIndices.data(); + for (unsigned int i = 0; i < randomWorkerIndices.size(); ++i) + { + if (i != m_workerIndex) + *indexPtr++ = i; + } + + std::minstd_rand gen(std::random_device{}()); + std::shuffle(randomWorkerIndices.begin(), randomWorkerIndices.end(), gen); + } + + bool idle = true; + for (;;) + { + std::unique_lock lock(m_mutex); + + // Wait for tasks if we don't have any right now + if (m_tasks.empty()) + { + if (!idle) + { + m_owner.NotifyWorkerIdle(); + idle = true; + } + + m_conditionVariable.wait(m_mutex, stopToken, [this] { return !m_tasks.empty(); }); + } + + if (stopToken.stop_requested()) + break; + + auto ExecuteTask = [&](TaskScheduler::Task& task) + { + if (idle) + { + m_owner.NotifyWorkerActive(); + idle = false; + } + + task(); + }; + + if (!m_tasks.empty()) + { + TaskScheduler::Task task = std::move(m_tasks.front()); + m_tasks.erase(m_tasks.begin()); + + lock.unlock(); + + ExecuteTask(task); + } + else + { + lock.unlock(); + + // Try to steal a task from another worker in a random order to avoid lock contention + TaskScheduler::Task task; + for (unsigned int workerIndex : randomWorkerIndices) + { + if (m_owner.GetWorker(workerIndex).StealTask(&task)) + { + ExecuteTask(task); + break; + } + } + } + + // Note: it's possible for a thread to reach this point without executing a task (for example if another worker stole its only remaining task) + } + } + + bool StealTask(TaskScheduler::Task* task) + { + std::unique_lock lock(m_mutex, std::defer_lock); + if (!lock.try_lock()) + return false; + + if (m_tasks.empty()) + return false; + + *task = std::move(m_tasks.front()); + m_tasks.erase(m_tasks.begin()); + return true; + } + + Worker& operator=(const Worker& worker) = delete; + + Worker& operator=(Worker&&) + { + NAZARA_UNREACHABLE(); + } + + private: + std::condition_variable_any m_conditionVariable; + std::mutex m_mutex; + std::jthread m_thread; + std::vector m_tasks; + TaskScheduler& m_owner; + unsigned int m_workerIndex; + }; + + NAZARA_WARNING_POP() + + TaskScheduler::TaskScheduler(unsigned int workerCount) : + m_idle(true), + m_randomGenerator(std::random_device{}()) + { + if (workerCount == 0) + workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u); + + m_idleWorkerCount = workerCount; + + m_workers.reserve(workerCount); + for (unsigned int i = 0; i < workerCount; ++i) + m_workers.emplace_back(*this, i); } - /*! - * \ingroup core - * \class Nz::TaskScheduler - * \brief Core class that represents a pool of threads - * - * \remark Initialized should be called first - */ - - /*! - * \brief Gets the number of threads - * \return Number of threads, if none, the number of logical threads on the processor is returned - */ - - unsigned int TaskScheduler::GetWorkerCount() + TaskScheduler::~TaskScheduler() { - return (s_workerCount > 0) ? s_workerCount : Core::Instance()->GetHardwareInfo().GetCpuThreadCount(); + m_workers.clear(); } - /*! - * \brief Initializes the TaskScheduler class - * \return true if everything is ok - */ - - bool TaskScheduler::Initialize() + void TaskScheduler::AddTask(Task&& task) { - return TaskSchedulerImpl::Initialize(GetWorkerCount()); - } + m_idle = false; - /*! - * \brief Runs the pending works - * - * \remark Produce a NazaraError if the class is not initialized - */ - - void TaskScheduler::Run() - { - if (!Initialize()) + std::uniform_int_distribution workerDis(0, static_cast(m_workers.size() - 1)); + for (;;) { - NazaraError("failed to initialize Task Scheduler"); - return; - } - - if (!s_pendingWorks.empty()) - { - TaskSchedulerImpl::Run(&s_pendingWorks[0], s_pendingWorks.size()); - s_pendingWorks.clear(); + Worker& randomWorker = m_workers[workerDis(m_randomGenerator)]; + if (randomWorker.AddTask(std::move(task))) + break; } } - /*! - * \brief Sets the number of workers - * - * \param workerCount Number of simulatnous threads handling the tasks - * - * \remark Produce a NazaraError if the class is not initialized and NAZARA_CORE_SAFE is defined - */ - - void TaskScheduler::SetWorkerCount(unsigned int workerCount) + unsigned int TaskScheduler::GetWorkerCount() const { - #ifdef NAZARA_CORE_SAFE - if (TaskSchedulerImpl::IsInitialized()) - { - NazaraError("worker count cannot be set while initialized"); - return; - } - #endif - - s_workerCount = workerCount; + return static_cast(m_workers.size()); } - /*! - * \brief Uninitializes the TaskScheduler class - */ - - void TaskScheduler::Uninitialize() - { - if (TaskSchedulerImpl::IsInitialized()) - TaskSchedulerImpl::Uninitialize(); - } - - /*! - * \brief Waits for tasks to be done - * - * \remark Produce a NazaraError if the class is not initialized - */ - void TaskScheduler::WaitForTasks() { - if (!Initialize()) - { - NazaraError("failed to initialize Task Scheduler"); - return; - } - - TaskSchedulerImpl::WaitForTasks(); + m_idle.wait(false); } - /*! - * \brief Adds a task on the pending list - * - * \param taskFunctor Functor represeting a task to be done - * - * \remark Produce a NazaraError if the class is not initialized - * \remark A task containing a call on this class is undefined behaviour - */ - - void TaskScheduler::AddTaskFunctor(AbstractFunctor* taskFunctor) + auto TaskScheduler::GetWorker(unsigned int workerIndex) -> Worker& { - if (!Initialize()) - { - NazaraError("failed to initialize Task Scheduler"); - return; - } + return m_workers[workerIndex]; + } - s_pendingWorks.push_back(taskFunctor); + void TaskScheduler::NotifyWorkerActive() + { + m_idleWorkerCount--; + } + + void TaskScheduler::NotifyWorkerIdle() + { + if (++m_idleWorkerCount == m_workers.size()) + { + m_idle = true; + m_idle.notify_one(); + } } } diff --git a/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp b/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp deleted file mode 100644 index f759e2085..000000000 --- a/src/Nazara/Core/Win32/TaskSchedulerImpl.cpp +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com) -// This file is part of the "Nazara Engine - Core module" -// For conditions of distribution and use, see copyright notice in Config.hpp - -#include -#include -#include -#include // std::ldiv -#include -#include - -namespace Nz -{ - bool TaskSchedulerImpl::Initialize(std::size_t workerCount) - { - if (IsInitialized()) - return true; // Déjà initialisé - - #if NAZARA_CORE_SAFE - if (workerCount == 0) - { - NazaraError("invalid worker count ! (0)"); - return false; - } - #endif - - s_workerCount = static_cast(workerCount); - s_doneEvents.reset(new HANDLE[workerCount]); - s_workers.reset(new Worker[workerCount]); - s_workerThreads.reset(new HANDLE[workerCount]); - - // L'identifiant de chaque worker doit rester en vie jusqu'à ce que chaque thread soit correctement lancé - std::unique_ptr workerIDs(new std::size_t[workerCount]); - - for (std::size_t i = 0; i < workerCount; ++i) - { - // On initialise les évènements, mutex et threads de chaque worker - Worker& worker = s_workers[i]; - InitializeCriticalSection(&worker.queueMutex); - worker.wakeEvent = CreateEventW(nullptr, false, false, nullptr); - worker.running = true; - worker.workCount = 0; - - s_doneEvents[i] = CreateEventW(nullptr, true, false, nullptr); - - // Le thread va se lancer, signaler qu'il est prêt à travailler (s_doneEvents) et attendre d'être réveillé - workerIDs[i] = i; - s_workerThreads[i] = reinterpret_cast(_beginthreadex(nullptr, 0, &WorkerProc, &workerIDs[i], 0, nullptr)); - } - - // On attend que les workers se mettent en attente - WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); - - return true; - } - - bool TaskSchedulerImpl::IsInitialized() - { - return s_workerCount > 0; - } - - void TaskSchedulerImpl::Run(AbstractFunctor** tasks, std::size_t count) - { - // On s'assure que des tâches ne sont pas déjà en cours - WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); - - std::lldiv_t div = std::lldiv(count, s_workerCount); // Division et modulo en une opération, y'a pas de petit profit - for (std::size_t i = 0; i < s_workerCount; ++i) - { - // On va maintenant répartir les tâches entre chaque worker et les envoyer dans la queue de chacun - Worker& worker = s_workers[i]; - std::size_t taskCount = (i == 0) ? div.quot + div.rem : div.quot; - for (std::size_t j = 0; j < taskCount; ++j) - worker.queue.push(*tasks++); - - // On stocke le nombre de tâches à côté dans un entier atomique pour éviter d'entrer inutilement dans une section critique - worker.workCount = taskCount; - } - - // On les lance une fois qu'ils sont tous initialisés (pour éviter qu'un worker ne passe en pause détectant une absence de travaux) - for (std::size_t i = 0; i < s_workerCount; ++i) - { - ResetEvent(s_doneEvents[i]); - SetEvent(s_workers[i].wakeEvent); - } - } - - void TaskSchedulerImpl::Uninitialize() - { - #ifdef NAZARA_CORE_SAFE - if (s_workerCount == 0) - { - NazaraError("task scheduler is not initialized"); - return; - } - #endif - - // On commence par vider la queue de chaque worker pour s'assurer qu'ils s'arrêtent - for (unsigned int i = 0; i < s_workerCount; ++i) - { - Worker& worker = s_workers[i]; - worker.running = false; - worker.workCount = 0; - - EnterCriticalSection(&worker.queueMutex); - - std::queue emptyQueue; - std::swap(worker.queue, emptyQueue); // Et on vide la queue (merci std::swap) - - LeaveCriticalSection(&worker.queueMutex); - - // On réveille le worker pour qu'il sorte de la boucle et termine le thread - SetEvent(worker.wakeEvent); - } - - // On attend que chaque thread se termine - WaitForMultipleObjects(s_workerCount, &s_workerThreads[0], true, INFINITE); - - // Et on libère les ressources - for (unsigned int i = 0; i < s_workerCount; ++i) - { - Worker& worker = s_workers[i]; - CloseHandle(s_doneEvents[i]); - CloseHandle(s_workerThreads[i]); - CloseHandle(worker.wakeEvent); - DeleteCriticalSection(&worker.queueMutex); - } - - s_doneEvents.reset(); - s_workers.reset(); - s_workerThreads.reset(); - s_workerCount = 0; - } - - void TaskSchedulerImpl::WaitForTasks() - { - #ifdef NAZARA_CORE_SAFE - if (s_workerCount == 0) - { - NazaraError("task scheduler is not initialized"); - return; - } - #endif - - WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE); - } - - AbstractFunctor* TaskSchedulerImpl::StealTask(std::size_t workerID) - { - bool shouldRetry; - do - { - shouldRetry = false; - for (std::size_t i = 0; i < s_workerCount; ++i) - { - // On ne vole pas la famille, ni soi-même. - if (i == workerID) - continue; - - Worker& worker = s_workers[i]; - - // Ce worker a-t-il encore des tâches dans sa file d'attente ? - if (worker.workCount > 0) - { - AbstractFunctor* task = nullptr; - - // Est-ce qu'il utilise la queue maintenant ? - if (TryEnterCriticalSection(&worker.queueMutex)) - { - // Non, super ! Profitons-en pour essayer de lui voler un job - if (!worker.queue.empty()) // On vérifie que la queue n'est pas vide (peut avoir changé avant le verrouillage) - { - // Et hop, on vole la tâche - task = worker.queue.front(); - worker.queue.pop(); - worker.workCount = worker.queue.size(); - } - - LeaveCriticalSection(&worker.queueMutex); - } - else - shouldRetry = true; // Il est encore possible d'avoir un job - - // Avons-nous notre tâche ? - if (task) - return task; // Parfait, sortons de là ! - } - } - } - while (shouldRetry); - - // Bon à priori plus aucun worker n'a de tâche - return nullptr; - } - - unsigned int __stdcall TaskSchedulerImpl::WorkerProc(void* userdata) - { - unsigned int workerID = *static_cast(userdata); - SetEvent(s_doneEvents[workerID]); - - Worker& worker = s_workers[workerID]; - WaitForSingleObject(worker.wakeEvent, INFINITE); - - while (worker.running) - { - AbstractFunctor* task = nullptr; - - if (worker.workCount > 0) // Permet d'éviter d'entrer inutilement dans une section critique - { - EnterCriticalSection(&worker.queueMutex); - if (!worker.queue.empty()) // Nécessaire car le workCount peut être tombé à zéro juste avant l'entrée dans la section critique - { - task = worker.queue.front(); - worker.queue.pop(); - worker.workCount = worker.queue.size(); - } - LeaveCriticalSection(&worker.queueMutex); - } - - // Que faire quand vous n'avez plus de travail ? - if (!task) - task = StealTask(workerID); // Voler le travail des autres ! - - if (task) - { - // On exécute la tâche avant de la supprimer - task->Run(); - delete task; - } - else - { - SetEvent(s_doneEvents[workerID]); - WaitForSingleObject(worker.wakeEvent, INFINITE); - } - } - - // Au cas où un thread attendrait sur WaitForTasks() pendant qu'un autre appellerait Uninitialize() - // Ça ne devrait pas arriver, mais comme ça ne coûte pas grand chose.. - SetEvent(s_doneEvents[workerID]); - - return 0; - } - - std::unique_ptr TaskSchedulerImpl::s_doneEvents; // Doivent être contigus - std::unique_ptr TaskSchedulerImpl::s_workers; - std::unique_ptr TaskSchedulerImpl::s_workerThreads; // Doivent être contigus - DWORD TaskSchedulerImpl::s_workerCount; -} - -#include diff --git a/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp b/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp deleted file mode 100644 index a0d0017af..000000000 --- a/src/Nazara/Core/Win32/TaskSchedulerImpl.hpp +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com) -// This file is part of the "Nazara Engine - Core module" -// For conditions of distribution and use, see copyright notice in Config.hpp - -#pragma once - -#ifndef NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP -#define NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP - -#include -#include -#include -#include -#include -#include - -namespace Nz -{ - class TaskSchedulerImpl - { - public: - TaskSchedulerImpl() = delete; - ~TaskSchedulerImpl() = delete; - - static bool Initialize(std::size_t workerCount); - static bool IsInitialized(); - static void Run(AbstractFunctor** tasks, std::size_t count); - static void Uninitialize(); - static void WaitForTasks(); - - private: - static AbstractFunctor* StealTask(std::size_t workerID); - static unsigned int __stdcall WorkerProc(void* userdata); - - struct Worker - { - std::atomic_size_t workCount; - std::queue queue; - CRITICAL_SECTION queueMutex; - HANDLE wakeEvent; - volatile bool running; - }; - - static std::unique_ptr s_doneEvents; // Doivent être contigus - static std::unique_ptr s_workers; - static std::unique_ptr s_workerThreads; // Doivent être contigus - static DWORD s_workerCount; -}; -} - -#endif // NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP diff --git a/tests/ComputeParticlesTest/main.cpp b/tests/ComputeParticlesTest/main.cpp index 01b77bd94..8085577b4 100644 --- a/tests/ComputeParticlesTest/main.cpp +++ b/tests/ComputeParticlesTest/main.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -239,6 +238,8 @@ int main() Nz::Time mouseSampleTimer = Nz::Time::Zero(); constexpr Nz::Time mouseSampleRate = Nz::Time::TickDuration(60); + Nz::TaskScheduler taskScheduler; + auto& eventHandler = window.GetEventHandler(); eventHandler.OnKeyReleased.Connect([&](const Nz::WindowEventHandler*, const Nz::WindowEvent::KeyEvent& key) { @@ -259,19 +260,25 @@ int main() Nz::SparsePtr particlePosPtr(particleBasePtr + particlePosOffset, particleSize); Nz::SparsePtr particleVelPtr(particleBasePtr + particleVelOffset, particleSize); -#ifndef NAZARA_PLATFORM_MACOS - std::for_each_n(std::execution::par_unseq, particleBasePtr, particleCount, [&](Nz::UInt8& hax) -#else - std::for_each_n(particleBasePtr, particleCount, [&](Nz::UInt8& hax) -#endif + unsigned int workerCount = taskScheduler.GetWorkerCount(); + std::size_t particlePerWorker = particleCount / workerCount; + std::size_t leftover = particleCount - particlePerWorker * workerCount; + + for (unsigned int i = 0; i < workerCount; ++i) { - static thread_local std::mt19937 rand_mt(std::random_device{}()); + taskScheduler.AddTask([&, offset = i * particlePerWorker, count = (i != workerCount - 1) ? particlePerWorker : particlePerWorker + leftover] + { + static thread_local std::mt19937 rand_mt(std::random_device{}()); - std::size_t index = &hax - particleBasePtr; //< HAAX - - particleVelPtr[index] += (particlePosPtr[index] - newMousePos).GetNormal() * 500.f; - particleVelPtr[index] += Nz::Vector2f(velDis(rand_mt), velDis(rand_mt)); - }); + for (std::size_t i = 0; i < count; ++i) + { + std::size_t index = offset + i; + particleVelPtr[index] += (particlePosPtr[index] - newMousePos).GetNormal() * 500.f; + particleVelPtr[index] += Nz::Vector2f(velDis(rand_mt), velDis(rand_mt)); + } + }); + } + taskScheduler.WaitForTasks(); particleBuffer->Unmap(); });