Core: Rework TaskScheduler (WIP)
This commit is contained in:
parent
2b88f50c21
commit
9d669f722e
|
|
@ -8,28 +8,45 @@
|
|||
#define NAZARA_CORE_TASKSCHEDULER_HPP
|
||||
|
||||
#include <NazaraUtils/Prerequisites.hpp>
|
||||
#include <Nazara/Core/Functor.hpp>
|
||||
#include <Nazara/Core/Config.hpp>
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
class NAZARA_CORE_API TaskScheduler
|
||||
{
|
||||
public:
|
||||
TaskScheduler() = delete;
|
||||
~TaskScheduler() = delete;
|
||||
using Task = std::function<void()>;
|
||||
|
||||
template<typename F> static void AddTask(F function);
|
||||
template<typename F, typename... Args> static void AddTask(F function, Args&&... args);
|
||||
template<typename C> static void AddTask(void (C::*function)(), C* object);
|
||||
static unsigned int GetWorkerCount();
|
||||
static bool Initialize();
|
||||
static void Run();
|
||||
static void SetWorkerCount(unsigned int workerCount);
|
||||
static void Uninitialize();
|
||||
static void WaitForTasks();
|
||||
TaskScheduler(unsigned int workerCount = 0);
|
||||
TaskScheduler(const TaskScheduler&) = delete;
|
||||
TaskScheduler(TaskScheduler&&) = default;
|
||||
~TaskScheduler();
|
||||
|
||||
void AddTask(Task&& task);
|
||||
|
||||
unsigned int GetWorkerCount() const;
|
||||
|
||||
void WaitForTasks();
|
||||
|
||||
TaskScheduler& operator=(const TaskScheduler&) = delete;
|
||||
TaskScheduler& operator=(TaskScheduler&&) = default;
|
||||
|
||||
private:
|
||||
static void AddTaskFunctor(AbstractFunctor* taskFunctor);
|
||||
class Worker;
|
||||
friend Worker;
|
||||
|
||||
Worker& GetWorker(unsigned int workerIndex);
|
||||
void NotifyWorkerActive();
|
||||
void NotifyWorkerIdle();
|
||||
|
||||
std::atomic_bool m_idle;
|
||||
std::atomic_uint m_idleWorkerCount;
|
||||
std::minstd_rand m_randomGenerator;
|
||||
std::vector<Worker> m_workers;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,49 +6,6 @@
|
|||
|
||||
namespace Nz
|
||||
{
|
||||
/*!
|
||||
* \ingroup core
|
||||
* \class Nz::TaskScheduler
|
||||
* \brief Core class that represents a thread pool
|
||||
*/
|
||||
|
||||
/*!
|
||||
* \brief Adds a task to the pending list
|
||||
*
|
||||
* \param function Task that the pool will execute
|
||||
*/
|
||||
|
||||
template<typename F>
|
||||
void TaskScheduler::AddTask(F function)
|
||||
{
|
||||
AddTaskFunctor(new FunctorWithoutArgs<F>(function));
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Adds a task to the pending list
|
||||
*
|
||||
* \param function Task that the pool will execute
|
||||
* \param args Arguments of the function
|
||||
*/
|
||||
|
||||
template<typename F, typename... Args>
|
||||
void TaskScheduler::AddTask(F function, Args&&... args)
|
||||
{
|
||||
AddTaskFunctor(new FunctorWithArgs<F, Args...>(function, std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Adds a task to the pending list
|
||||
*
|
||||
* \param function Task that the pool will execute
|
||||
* \param object Object on which the method will be called
|
||||
*/
|
||||
|
||||
template<typename C>
|
||||
void TaskScheduler::AddTask(void (C::*function)(), C* object)
|
||||
{
|
||||
AddTaskFunctor(new MemberWithoutArgs<C>(function, object));
|
||||
}
|
||||
}
|
||||
|
||||
#include <Nazara/Core/DebugOff.hpp>
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ namespace Nz
|
|||
{
|
||||
m_hardwareInfo.reset();
|
||||
|
||||
TaskScheduler::Uninitialize();
|
||||
LogUninit();
|
||||
Log::Uninitialize();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,249 +0,0 @@
|
|||
// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com)
|
||||
// This file is part of the "Nazara Engine - Core module"
|
||||
// For conditions of distribution and use, see copyright notice in Config.hpp
|
||||
|
||||
#include <Nazara/Core/Posix/TaskSchedulerImpl.hpp>
|
||||
#include <Nazara/Core/Functor.hpp>
|
||||
#include <Nazara/Core/Debug.hpp>
|
||||
|
||||
#if defined(NAZARA_PLATFORM_MACOS)
|
||||
#include <errno.h>
|
||||
#endif
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
bool TaskSchedulerImpl::Initialize(unsigned int workerCount)
|
||||
{
|
||||
if (IsInitialized())
|
||||
return true; // Déjà initialisé
|
||||
|
||||
#if NAZARA_CORE_SAFE
|
||||
if (workerCount == 0)
|
||||
{
|
||||
NazaraError("invalid worker count ! (0)");
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
s_workerCount = workerCount;
|
||||
s_isDone = false;
|
||||
s_isWaiting = false;
|
||||
s_shouldFinish = false;
|
||||
|
||||
s_threads.reset(new pthread_t[workerCount]);
|
||||
|
||||
// On initialise les conditions variables, mutex et barrière.
|
||||
pthread_cond_init(&s_cvEmpty, nullptr);
|
||||
pthread_cond_init(&s_cvNotEmpty, nullptr);
|
||||
pthread_mutex_init(&s_mutexQueue, nullptr);
|
||||
pthread_barrier_init(&s_barrier, nullptr, workerCount + 1);
|
||||
|
||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
// Le thread va se lancer, attendre que tous se créent et attendre d'être réveillé.
|
||||
pthread_create(&s_threads[i], nullptr, WorkerProc, nullptr);
|
||||
}
|
||||
|
||||
pthread_barrier_wait(&s_barrier); // On attend que les enfants soient bien créés.
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TaskSchedulerImpl::IsInitialized()
|
||||
{
|
||||
return s_workerCount > 0;
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::Run(AbstractFunctor** tasks, unsigned int count)
|
||||
{
|
||||
// On s'assure que des tâches ne sont pas déjà en cours
|
||||
Wait();
|
||||
|
||||
pthread_mutex_lock(&s_mutexQueue);
|
||||
s_isDone = false;
|
||||
|
||||
while (count--)
|
||||
s_tasks.push(*tasks++);
|
||||
|
||||
pthread_cond_signal(&s_cvNotEmpty);
|
||||
pthread_mutex_unlock(&s_mutexQueue);
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::Uninitialize()
|
||||
{
|
||||
#ifdef NAZARA_CORE_SAFE
|
||||
if (s_workerCount == 0)
|
||||
{
|
||||
NazaraError("task scheduler is not initialized");
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
// On réveille les threads pour qu'ils sortent de la boucle et terminent.
|
||||
pthread_mutex_lock(&s_mutexQueue);
|
||||
// On commence par vider la queue et demander qu'ils s'arrêtent.
|
||||
std::queue<AbstractFunctor*> emptyQueue;
|
||||
std::swap(s_tasks, emptyQueue);
|
||||
s_shouldFinish = true;
|
||||
pthread_cond_broadcast(&s_cvNotEmpty);
|
||||
pthread_mutex_unlock(&s_mutexQueue);
|
||||
|
||||
// On attend que chaque thread se termine
|
||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||
pthread_join(s_threads[i], nullptr);
|
||||
|
||||
// Et on libère les ressources
|
||||
pthread_barrier_destroy(&s_barrier);
|
||||
pthread_cond_destroy(&s_cvEmpty);
|
||||
pthread_cond_destroy(&s_cvNotEmpty);
|
||||
pthread_mutex_destroy(&s_mutexQueue);
|
||||
|
||||
s_workerCount = 0;
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::WaitForTasks()
|
||||
{
|
||||
#ifdef NAZARA_CORE_SAFE
|
||||
if (s_workerCount == 0)
|
||||
{
|
||||
NazaraError("task scheduler is not initialized");
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
Wait();
|
||||
}
|
||||
|
||||
AbstractFunctor* TaskSchedulerImpl::PopQueue()
|
||||
{
|
||||
AbstractFunctor* task = nullptr;
|
||||
|
||||
pthread_mutex_lock(&s_mutexQueue);
|
||||
|
||||
if (!s_tasks.empty())
|
||||
{
|
||||
task = s_tasks.front();
|
||||
s_tasks.pop();
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&s_mutexQueue);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::Wait()
|
||||
{
|
||||
if (s_isDone)
|
||||
return;
|
||||
|
||||
pthread_mutex_lock(&s_mutexQueue);
|
||||
s_isWaiting = true;
|
||||
pthread_cond_broadcast(&s_cvNotEmpty);
|
||||
pthread_cond_wait(&s_cvEmpty, &s_mutexQueue);
|
||||
pthread_mutex_unlock(&s_mutexQueue);
|
||||
|
||||
s_isDone = true;
|
||||
}
|
||||
|
||||
void* TaskSchedulerImpl::WorkerProc(void* /*userdata*/)
|
||||
{
|
||||
// On s'assure que tous les threads soient correctement lancés.
|
||||
pthread_barrier_wait(&s_barrier);
|
||||
|
||||
// On quitte s'il doit terminer.
|
||||
while (!s_shouldFinish)
|
||||
{
|
||||
AbstractFunctor* task = PopQueue();
|
||||
|
||||
if (task)
|
||||
{
|
||||
// On exécute la tâche avant de la supprimer
|
||||
task->Run();
|
||||
delete task;
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_mutex_lock(&s_mutexQueue);
|
||||
if (s_tasks.empty())
|
||||
s_isDone = true;
|
||||
|
||||
while (!(!s_tasks.empty() || s_isWaiting || s_shouldFinish))
|
||||
pthread_cond_wait(&s_cvNotEmpty, &s_mutexQueue);
|
||||
|
||||
if (s_tasks.empty() && s_isWaiting)
|
||||
{
|
||||
// On prévient le thread qui attend que les tâches soient effectuées.
|
||||
s_isWaiting = false;
|
||||
pthread_cond_signal(&s_cvEmpty);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&s_mutexQueue);
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::queue<AbstractFunctor*> TaskSchedulerImpl::s_tasks;
|
||||
std::unique_ptr<pthread_t[]> TaskSchedulerImpl::s_threads;
|
||||
std::atomic<bool> TaskSchedulerImpl::s_isDone;
|
||||
std::atomic<bool> TaskSchedulerImpl::s_isWaiting;
|
||||
std::atomic<bool> TaskSchedulerImpl::s_shouldFinish;
|
||||
unsigned int TaskSchedulerImpl::s_workerCount;
|
||||
|
||||
pthread_mutex_t TaskSchedulerImpl::s_mutexQueue;
|
||||
pthread_cond_t TaskSchedulerImpl::s_cvEmpty;
|
||||
pthread_cond_t TaskSchedulerImpl::s_cvNotEmpty;
|
||||
pthread_barrier_t TaskSchedulerImpl::s_barrier;
|
||||
|
||||
#if defined(NAZARA_PLATFORM_MACOS)
|
||||
//Code from https://blog.albertarmea.com/post/47089939939/using-pthreadbarrier-on-mac-os-x
|
||||
int TaskSchedulerImpl::pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
|
||||
{
|
||||
if(count == 0)
|
||||
{
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if(pthread_mutex_init(&barrier->mutex, 0) < 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
if(pthread_cond_init(&barrier->cond, 0) < 0)
|
||||
{
|
||||
pthread_mutex_destroy(&barrier->mutex);
|
||||
return -1;
|
||||
}
|
||||
barrier->tripCount = count;
|
||||
barrier->count = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int TaskSchedulerImpl::pthread_barrier_destroy(pthread_barrier_t *barrier)
|
||||
{
|
||||
pthread_cond_destroy(&barrier->cond);
|
||||
pthread_mutex_destroy(&barrier->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int TaskSchedulerImpl::pthread_barrier_wait(pthread_barrier_t *barrier)
|
||||
{
|
||||
pthread_mutex_lock(&barrier->mutex);
|
||||
++(barrier->count);
|
||||
if(barrier->count >= barrier->tripCount)
|
||||
{
|
||||
barrier->count = 0;
|
||||
pthread_cond_broadcast(&barrier->cond);
|
||||
pthread_mutex_unlock(&barrier->mutex);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_cond_wait(&barrier->cond, &(barrier->mutex));
|
||||
pthread_mutex_unlock(&barrier->mutex);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com)
|
||||
// This file is part of the "Nazara Engine - Core module"
|
||||
// For conditions of distribution and use, see copyright notice in Config.hpp
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP
|
||||
#define NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP
|
||||
|
||||
#include <NazaraUtils/Prerequisites.hpp>
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <pthread.h>
|
||||
|
||||
#if defined(NAZARA_PLATFORM_MACOS)
|
||||
typedef int pthread_barrierattr_t;
|
||||
typedef struct
|
||||
{
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
int count;
|
||||
int tripCount;
|
||||
} pthread_barrier_t;
|
||||
#endif
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
struct AbstractFunctor;
|
||||
|
||||
class TaskSchedulerImpl
|
||||
{
|
||||
public:
|
||||
TaskSchedulerImpl() = delete;
|
||||
~TaskSchedulerImpl() = delete;
|
||||
|
||||
static bool Initialize(unsigned int workerCount);
|
||||
static bool IsInitialized();
|
||||
static void Run(AbstractFunctor** tasks, unsigned int count);
|
||||
static void Uninitialize();
|
||||
static void WaitForTasks();
|
||||
|
||||
private:
|
||||
static AbstractFunctor* PopQueue();
|
||||
static void Wait();
|
||||
static void* WorkerProc(void* userdata);
|
||||
|
||||
static std::queue<AbstractFunctor*> s_tasks;
|
||||
static std::unique_ptr<pthread_t[]> s_threads;
|
||||
static std::atomic<bool> s_isDone;
|
||||
static std::atomic<bool> s_isWaiting;
|
||||
static std::atomic<bool> s_shouldFinish;
|
||||
static unsigned int s_workerCount;
|
||||
|
||||
static pthread_mutex_t s_mutexQueue;
|
||||
static pthread_cond_t s_cvEmpty;
|
||||
static pthread_cond_t s_cvNotEmpty;
|
||||
static pthread_barrier_t s_barrier;
|
||||
|
||||
#if defined(NAZARA_PLATFORM_MACOS)
|
||||
static int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count);
|
||||
static int pthread_barrier_destroy(pthread_barrier_t *barrier);
|
||||
static int pthread_barrier_wait(pthread_barrier_t *barrier);
|
||||
#endif
|
||||
};
|
||||
}
|
||||
|
||||
#endif // NAZARA_CORE_POSIX_TASKSCHEDULERIMPL_HPP
|
||||
|
|
@ -4,140 +4,220 @@
|
|||
|
||||
#include <Nazara/Core/TaskScheduler.hpp>
|
||||
#include <Nazara/Core/Core.hpp>
|
||||
#include <Nazara/Core/Error.hpp>
|
||||
|
||||
#if defined(NAZARA_PLATFORM_WINDOWS)
|
||||
#include <Nazara/Core/Win32/TaskSchedulerImpl.hpp>
|
||||
#elif defined(NAZARA_PLATFORM_POSIX)
|
||||
#include <Nazara/Core/Posix/TaskSchedulerImpl.hpp>
|
||||
#else
|
||||
#error Lack of implementation: Task Scheduler
|
||||
#endif
|
||||
|
||||
#include <Nazara/Core/ThreadExt.hpp>
|
||||
#include <NazaraUtils/StackArray.hpp>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <random>
|
||||
#include <stop_token>
|
||||
#include <thread>
|
||||
#include <Nazara/Core/Debug.hpp>
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
namespace
|
||||
NAZARA_WARNING_PUSH()
|
||||
NAZARA_WARNING_MSVC_DISABLE(4324)
|
||||
|
||||
class alignas(std::hardware_destructive_interference_size) TaskScheduler::Worker
|
||||
{
|
||||
std::vector<AbstractFunctor*> s_pendingWorks;
|
||||
unsigned int s_workerCount = 0;
|
||||
public:
|
||||
Worker(TaskScheduler& owner, unsigned int workerIndex) :
|
||||
m_owner(owner),
|
||||
m_workerIndex(workerIndex)
|
||||
{
|
||||
m_thread = std::jthread([this](std::stop_token stopToken)
|
||||
{
|
||||
SetCurrentThreadName(fmt::format("NzWorker #{0}", m_workerIndex).c_str());
|
||||
Run(stopToken);
|
||||
});
|
||||
}
|
||||
|
||||
Worker(const Worker&) = delete;
|
||||
|
||||
Worker(Worker&& worker) :
|
||||
m_owner(worker.m_owner)
|
||||
{
|
||||
NAZARA_UNREACHABLE();
|
||||
}
|
||||
|
||||
bool AddTask(Task&& task)
|
||||
{
|
||||
std::unique_lock lock(m_mutex, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return false;
|
||||
|
||||
m_tasks.push_back(std::move(task));
|
||||
lock.unlock();
|
||||
|
||||
m_conditionVariable.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
void Run(std::stop_token& stopToken)
|
||||
{
|
||||
StackArray<unsigned int> randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_owner.GetWorkerCount() - 1);
|
||||
{
|
||||
unsigned int* indexPtr = randomWorkerIndices.data();
|
||||
for (unsigned int i = 0; i < randomWorkerIndices.size(); ++i)
|
||||
{
|
||||
if (i != m_workerIndex)
|
||||
*indexPtr++ = i;
|
||||
}
|
||||
|
||||
std::minstd_rand gen(std::random_device{}());
|
||||
std::shuffle(randomWorkerIndices.begin(), randomWorkerIndices.end(), gen);
|
||||
}
|
||||
|
||||
bool idle = true;
|
||||
for (;;)
|
||||
{
|
||||
std::unique_lock lock(m_mutex);
|
||||
|
||||
// Wait for tasks if we don't have any right now
|
||||
if (m_tasks.empty())
|
||||
{
|
||||
if (!idle)
|
||||
{
|
||||
m_owner.NotifyWorkerIdle();
|
||||
idle = true;
|
||||
}
|
||||
|
||||
m_conditionVariable.wait(m_mutex, stopToken, [this] { return !m_tasks.empty(); });
|
||||
}
|
||||
|
||||
if (stopToken.stop_requested())
|
||||
break;
|
||||
|
||||
auto ExecuteTask = [&](TaskScheduler::Task& task)
|
||||
{
|
||||
if (idle)
|
||||
{
|
||||
m_owner.NotifyWorkerActive();
|
||||
idle = false;
|
||||
}
|
||||
|
||||
task();
|
||||
};
|
||||
|
||||
if (!m_tasks.empty())
|
||||
{
|
||||
TaskScheduler::Task task = std::move(m_tasks.front());
|
||||
m_tasks.erase(m_tasks.begin());
|
||||
|
||||
lock.unlock();
|
||||
|
||||
ExecuteTask(task);
|
||||
}
|
||||
else
|
||||
{
|
||||
lock.unlock();
|
||||
|
||||
// Try to steal a task from another worker in a random order to avoid lock contention
|
||||
TaskScheduler::Task task;
|
||||
for (unsigned int workerIndex : randomWorkerIndices)
|
||||
{
|
||||
if (m_owner.GetWorker(workerIndex).StealTask(&task))
|
||||
{
|
||||
ExecuteTask(task);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Note: it's possible for a thread to reach this point without executing a task (for example if another worker stole its only remaining task)
|
||||
}
|
||||
}
|
||||
|
||||
bool StealTask(TaskScheduler::Task* task)
|
||||
{
|
||||
std::unique_lock lock(m_mutex, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return false;
|
||||
|
||||
if (m_tasks.empty())
|
||||
return false;
|
||||
|
||||
*task = std::move(m_tasks.front());
|
||||
m_tasks.erase(m_tasks.begin());
|
||||
return true;
|
||||
}
|
||||
|
||||
Worker& operator=(const Worker& worker) = delete;
|
||||
|
||||
Worker& operator=(Worker&&)
|
||||
{
|
||||
NAZARA_UNREACHABLE();
|
||||
}
|
||||
|
||||
private:
|
||||
std::condition_variable_any m_conditionVariable;
|
||||
std::mutex m_mutex;
|
||||
std::jthread m_thread;
|
||||
std::vector<TaskScheduler::Task> m_tasks;
|
||||
TaskScheduler& m_owner;
|
||||
unsigned int m_workerIndex;
|
||||
};
|
||||
|
||||
NAZARA_WARNING_POP()
|
||||
|
||||
TaskScheduler::TaskScheduler(unsigned int workerCount) :
|
||||
m_idle(true),
|
||||
m_randomGenerator(std::random_device{}())
|
||||
{
|
||||
if (workerCount == 0)
|
||||
workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u);
|
||||
|
||||
m_idleWorkerCount = workerCount;
|
||||
|
||||
m_workers.reserve(workerCount);
|
||||
for (unsigned int i = 0; i < workerCount; ++i)
|
||||
m_workers.emplace_back(*this, i);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \ingroup core
|
||||
* \class Nz::TaskScheduler
|
||||
* \brief Core class that represents a pool of threads
|
||||
*
|
||||
* \remark Initialized should be called first
|
||||
*/
|
||||
|
||||
/*!
|
||||
* \brief Gets the number of threads
|
||||
* \return Number of threads, if none, the number of logical threads on the processor is returned
|
||||
*/
|
||||
|
||||
unsigned int TaskScheduler::GetWorkerCount()
|
||||
TaskScheduler::~TaskScheduler()
|
||||
{
|
||||
return (s_workerCount > 0) ? s_workerCount : Core::Instance()->GetHardwareInfo().GetCpuThreadCount();
|
||||
m_workers.clear();
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Initializes the TaskScheduler class
|
||||
* \return true if everything is ok
|
||||
*/
|
||||
|
||||
bool TaskScheduler::Initialize()
|
||||
void TaskScheduler::AddTask(Task&& task)
|
||||
{
|
||||
return TaskSchedulerImpl::Initialize(GetWorkerCount());
|
||||
}
|
||||
m_idle = false;
|
||||
|
||||
/*!
|
||||
* \brief Runs the pending works
|
||||
*
|
||||
* \remark Produce a NazaraError if the class is not initialized
|
||||
*/
|
||||
|
||||
void TaskScheduler::Run()
|
||||
{
|
||||
if (!Initialize())
|
||||
std::uniform_int_distribution<unsigned int> workerDis(0, static_cast<unsigned int>(m_workers.size() - 1));
|
||||
for (;;)
|
||||
{
|
||||
NazaraError("failed to initialize Task Scheduler");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!s_pendingWorks.empty())
|
||||
{
|
||||
TaskSchedulerImpl::Run(&s_pendingWorks[0], s_pendingWorks.size());
|
||||
s_pendingWorks.clear();
|
||||
Worker& randomWorker = m_workers[workerDis(m_randomGenerator)];
|
||||
if (randomWorker.AddTask(std::move(task)))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Sets the number of workers
|
||||
*
|
||||
* \param workerCount Number of simulatnous threads handling the tasks
|
||||
*
|
||||
* \remark Produce a NazaraError if the class is not initialized and NAZARA_CORE_SAFE is defined
|
||||
*/
|
||||
|
||||
void TaskScheduler::SetWorkerCount(unsigned int workerCount)
|
||||
unsigned int TaskScheduler::GetWorkerCount() const
|
||||
{
|
||||
#ifdef NAZARA_CORE_SAFE
|
||||
if (TaskSchedulerImpl::IsInitialized())
|
||||
{
|
||||
NazaraError("worker count cannot be set while initialized");
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
s_workerCount = workerCount;
|
||||
return static_cast<unsigned int>(m_workers.size());
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Uninitializes the TaskScheduler class
|
||||
*/
|
||||
|
||||
void TaskScheduler::Uninitialize()
|
||||
{
|
||||
if (TaskSchedulerImpl::IsInitialized())
|
||||
TaskSchedulerImpl::Uninitialize();
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Waits for tasks to be done
|
||||
*
|
||||
* \remark Produce a NazaraError if the class is not initialized
|
||||
*/
|
||||
|
||||
void TaskScheduler::WaitForTasks()
|
||||
{
|
||||
if (!Initialize())
|
||||
{
|
||||
NazaraError("failed to initialize Task Scheduler");
|
||||
return;
|
||||
}
|
||||
|
||||
TaskSchedulerImpl::WaitForTasks();
|
||||
m_idle.wait(false);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Adds a task on the pending list
|
||||
*
|
||||
* \param taskFunctor Functor represeting a task to be done
|
||||
*
|
||||
* \remark Produce a NazaraError if the class is not initialized
|
||||
* \remark A task containing a call on this class is undefined behaviour
|
||||
*/
|
||||
|
||||
void TaskScheduler::AddTaskFunctor(AbstractFunctor* taskFunctor)
|
||||
auto TaskScheduler::GetWorker(unsigned int workerIndex) -> Worker&
|
||||
{
|
||||
if (!Initialize())
|
||||
{
|
||||
NazaraError("failed to initialize Task Scheduler");
|
||||
return;
|
||||
}
|
||||
return m_workers[workerIndex];
|
||||
}
|
||||
|
||||
s_pendingWorks.push_back(taskFunctor);
|
||||
void TaskScheduler::NotifyWorkerActive()
|
||||
{
|
||||
m_idleWorkerCount--;
|
||||
}
|
||||
|
||||
void TaskScheduler::NotifyWorkerIdle()
|
||||
{
|
||||
if (++m_idleWorkerCount == m_workers.size())
|
||||
{
|
||||
m_idle = true;
|
||||
m_idle.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,250 +0,0 @@
|
|||
// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com)
|
||||
// This file is part of the "Nazara Engine - Core module"
|
||||
// For conditions of distribution and use, see copyright notice in Config.hpp
|
||||
|
||||
#include <Nazara/Core/Win32/TaskSchedulerImpl.hpp>
|
||||
#include <Nazara/Core/Config.hpp>
|
||||
#include <Nazara/Core/Error.hpp>
|
||||
#include <cstdlib> // std::ldiv
|
||||
#include <process.h>
|
||||
#include <Nazara/Core/Debug.hpp>
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
bool TaskSchedulerImpl::Initialize(std::size_t workerCount)
|
||||
{
|
||||
if (IsInitialized())
|
||||
return true; // Déjà initialisé
|
||||
|
||||
#if NAZARA_CORE_SAFE
|
||||
if (workerCount == 0)
|
||||
{
|
||||
NazaraError("invalid worker count ! (0)");
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
s_workerCount = static_cast<DWORD>(workerCount);
|
||||
s_doneEvents.reset(new HANDLE[workerCount]);
|
||||
s_workers.reset(new Worker[workerCount]);
|
||||
s_workerThreads.reset(new HANDLE[workerCount]);
|
||||
|
||||
// L'identifiant de chaque worker doit rester en vie jusqu'à ce que chaque thread soit correctement lancé
|
||||
std::unique_ptr<std::size_t[]> workerIDs(new std::size_t[workerCount]);
|
||||
|
||||
for (std::size_t i = 0; i < workerCount; ++i)
|
||||
{
|
||||
// On initialise les évènements, mutex et threads de chaque worker
|
||||
Worker& worker = s_workers[i];
|
||||
InitializeCriticalSection(&worker.queueMutex);
|
||||
worker.wakeEvent = CreateEventW(nullptr, false, false, nullptr);
|
||||
worker.running = true;
|
||||
worker.workCount = 0;
|
||||
|
||||
s_doneEvents[i] = CreateEventW(nullptr, true, false, nullptr);
|
||||
|
||||
// Le thread va se lancer, signaler qu'il est prêt à travailler (s_doneEvents) et attendre d'être réveillé
|
||||
workerIDs[i] = i;
|
||||
s_workerThreads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, &WorkerProc, &workerIDs[i], 0, nullptr));
|
||||
}
|
||||
|
||||
// On attend que les workers se mettent en attente
|
||||
WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TaskSchedulerImpl::IsInitialized()
|
||||
{
|
||||
return s_workerCount > 0;
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::Run(AbstractFunctor** tasks, std::size_t count)
|
||||
{
|
||||
// On s'assure que des tâches ne sont pas déjà en cours
|
||||
WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE);
|
||||
|
||||
std::lldiv_t div = std::lldiv(count, s_workerCount); // Division et modulo en une opération, y'a pas de petit profit
|
||||
for (std::size_t i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
// On va maintenant répartir les tâches entre chaque worker et les envoyer dans la queue de chacun
|
||||
Worker& worker = s_workers[i];
|
||||
std::size_t taskCount = (i == 0) ? div.quot + div.rem : div.quot;
|
||||
for (std::size_t j = 0; j < taskCount; ++j)
|
||||
worker.queue.push(*tasks++);
|
||||
|
||||
// On stocke le nombre de tâches à côté dans un entier atomique pour éviter d'entrer inutilement dans une section critique
|
||||
worker.workCount = taskCount;
|
||||
}
|
||||
|
||||
// On les lance une fois qu'ils sont tous initialisés (pour éviter qu'un worker ne passe en pause détectant une absence de travaux)
|
||||
for (std::size_t i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
ResetEvent(s_doneEvents[i]);
|
||||
SetEvent(s_workers[i].wakeEvent);
|
||||
}
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::Uninitialize()
|
||||
{
|
||||
#ifdef NAZARA_CORE_SAFE
|
||||
if (s_workerCount == 0)
|
||||
{
|
||||
NazaraError("task scheduler is not initialized");
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
// On commence par vider la queue de chaque worker pour s'assurer qu'ils s'arrêtent
|
||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
Worker& worker = s_workers[i];
|
||||
worker.running = false;
|
||||
worker.workCount = 0;
|
||||
|
||||
EnterCriticalSection(&worker.queueMutex);
|
||||
|
||||
std::queue<AbstractFunctor*> emptyQueue;
|
||||
std::swap(worker.queue, emptyQueue); // Et on vide la queue (merci std::swap)
|
||||
|
||||
LeaveCriticalSection(&worker.queueMutex);
|
||||
|
||||
// On réveille le worker pour qu'il sorte de la boucle et termine le thread
|
||||
SetEvent(worker.wakeEvent);
|
||||
}
|
||||
|
||||
// On attend que chaque thread se termine
|
||||
WaitForMultipleObjects(s_workerCount, &s_workerThreads[0], true, INFINITE);
|
||||
|
||||
// Et on libère les ressources
|
||||
for (unsigned int i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
Worker& worker = s_workers[i];
|
||||
CloseHandle(s_doneEvents[i]);
|
||||
CloseHandle(s_workerThreads[i]);
|
||||
CloseHandle(worker.wakeEvent);
|
||||
DeleteCriticalSection(&worker.queueMutex);
|
||||
}
|
||||
|
||||
s_doneEvents.reset();
|
||||
s_workers.reset();
|
||||
s_workerThreads.reset();
|
||||
s_workerCount = 0;
|
||||
}
|
||||
|
||||
void TaskSchedulerImpl::WaitForTasks()
|
||||
{
|
||||
#ifdef NAZARA_CORE_SAFE
|
||||
if (s_workerCount == 0)
|
||||
{
|
||||
NazaraError("task scheduler is not initialized");
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
WaitForMultipleObjects(s_workerCount, &s_doneEvents[0], true, INFINITE);
|
||||
}
|
||||
|
||||
AbstractFunctor* TaskSchedulerImpl::StealTask(std::size_t workerID)
|
||||
{
|
||||
bool shouldRetry;
|
||||
do
|
||||
{
|
||||
shouldRetry = false;
|
||||
for (std::size_t i = 0; i < s_workerCount; ++i)
|
||||
{
|
||||
// On ne vole pas la famille, ni soi-même.
|
||||
if (i == workerID)
|
||||
continue;
|
||||
|
||||
Worker& worker = s_workers[i];
|
||||
|
||||
// Ce worker a-t-il encore des tâches dans sa file d'attente ?
|
||||
if (worker.workCount > 0)
|
||||
{
|
||||
AbstractFunctor* task = nullptr;
|
||||
|
||||
// Est-ce qu'il utilise la queue maintenant ?
|
||||
if (TryEnterCriticalSection(&worker.queueMutex))
|
||||
{
|
||||
// Non, super ! Profitons-en pour essayer de lui voler un job
|
||||
if (!worker.queue.empty()) // On vérifie que la queue n'est pas vide (peut avoir changé avant le verrouillage)
|
||||
{
|
||||
// Et hop, on vole la tâche
|
||||
task = worker.queue.front();
|
||||
worker.queue.pop();
|
||||
worker.workCount = worker.queue.size();
|
||||
}
|
||||
|
||||
LeaveCriticalSection(&worker.queueMutex);
|
||||
}
|
||||
else
|
||||
shouldRetry = true; // Il est encore possible d'avoir un job
|
||||
|
||||
// Avons-nous notre tâche ?
|
||||
if (task)
|
||||
return task; // Parfait, sortons de là !
|
||||
}
|
||||
}
|
||||
}
|
||||
while (shouldRetry);
|
||||
|
||||
// Bon à priori plus aucun worker n'a de tâche
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
unsigned int __stdcall TaskSchedulerImpl::WorkerProc(void* userdata)
|
||||
{
|
||||
unsigned int workerID = *static_cast<unsigned int*>(userdata);
|
||||
SetEvent(s_doneEvents[workerID]);
|
||||
|
||||
Worker& worker = s_workers[workerID];
|
||||
WaitForSingleObject(worker.wakeEvent, INFINITE);
|
||||
|
||||
while (worker.running)
|
||||
{
|
||||
AbstractFunctor* task = nullptr;
|
||||
|
||||
if (worker.workCount > 0) // Permet d'éviter d'entrer inutilement dans une section critique
|
||||
{
|
||||
EnterCriticalSection(&worker.queueMutex);
|
||||
if (!worker.queue.empty()) // Nécessaire car le workCount peut être tombé à zéro juste avant l'entrée dans la section critique
|
||||
{
|
||||
task = worker.queue.front();
|
||||
worker.queue.pop();
|
||||
worker.workCount = worker.queue.size();
|
||||
}
|
||||
LeaveCriticalSection(&worker.queueMutex);
|
||||
}
|
||||
|
||||
// Que faire quand vous n'avez plus de travail ?
|
||||
if (!task)
|
||||
task = StealTask(workerID); // Voler le travail des autres !
|
||||
|
||||
if (task)
|
||||
{
|
||||
// On exécute la tâche avant de la supprimer
|
||||
task->Run();
|
||||
delete task;
|
||||
}
|
||||
else
|
||||
{
|
||||
SetEvent(s_doneEvents[workerID]);
|
||||
WaitForSingleObject(worker.wakeEvent, INFINITE);
|
||||
}
|
||||
}
|
||||
|
||||
// Au cas où un thread attendrait sur WaitForTasks() pendant qu'un autre appellerait Uninitialize()
|
||||
// Ça ne devrait pas arriver, mais comme ça ne coûte pas grand chose..
|
||||
SetEvent(s_doneEvents[workerID]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::unique_ptr<HANDLE[]> TaskSchedulerImpl::s_doneEvents; // Doivent être contigus
|
||||
std::unique_ptr<TaskSchedulerImpl::Worker[]> TaskSchedulerImpl::s_workers;
|
||||
std::unique_ptr<HANDLE[]> TaskSchedulerImpl::s_workerThreads; // Doivent être contigus
|
||||
DWORD TaskSchedulerImpl::s_workerCount;
|
||||
}
|
||||
|
||||
#include <Nazara/Core/AntiWindows.hpp>
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
// Copyright (C) 2024 Jérôme "SirLynix" Leclercq (lynix680@gmail.com)
|
||||
// This file is part of the "Nazara Engine - Core module"
|
||||
// For conditions of distribution and use, see copyright notice in Config.hpp
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP
|
||||
#define NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP
|
||||
|
||||
#include <NazaraUtils/Prerequisites.hpp>
|
||||
#include <Nazara/Core/Functor.hpp>
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <Windows.h>
|
||||
|
||||
namespace Nz
|
||||
{
|
||||
class TaskSchedulerImpl
|
||||
{
|
||||
public:
|
||||
TaskSchedulerImpl() = delete;
|
||||
~TaskSchedulerImpl() = delete;
|
||||
|
||||
static bool Initialize(std::size_t workerCount);
|
||||
static bool IsInitialized();
|
||||
static void Run(AbstractFunctor** tasks, std::size_t count);
|
||||
static void Uninitialize();
|
||||
static void WaitForTasks();
|
||||
|
||||
private:
|
||||
static AbstractFunctor* StealTask(std::size_t workerID);
|
||||
static unsigned int __stdcall WorkerProc(void* userdata);
|
||||
|
||||
struct Worker
|
||||
{
|
||||
std::atomic_size_t workCount;
|
||||
std::queue<AbstractFunctor*> queue;
|
||||
CRITICAL_SECTION queueMutex;
|
||||
HANDLE wakeEvent;
|
||||
volatile bool running;
|
||||
};
|
||||
|
||||
static std::unique_ptr<HANDLE[]> s_doneEvents; // Doivent être contigus
|
||||
static std::unique_ptr<Worker[]> s_workers;
|
||||
static std::unique_ptr<HANDLE[]> s_workerThreads; // Doivent être contigus
|
||||
static DWORD s_workerCount;
|
||||
};
|
||||
}
|
||||
|
||||
#endif // NAZARA_CORE_WIN32_TASKSCHEDULERIMPL_HPP
|
||||
|
|
@ -9,7 +9,6 @@
|
|||
#include <Nazara/Utility.hpp>
|
||||
#include <array>
|
||||
#include <chrono>
|
||||
#include <execution>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
|
@ -239,6 +238,8 @@ int main()
|
|||
Nz::Time mouseSampleTimer = Nz::Time::Zero();
|
||||
constexpr Nz::Time mouseSampleRate = Nz::Time::TickDuration(60);
|
||||
|
||||
Nz::TaskScheduler taskScheduler;
|
||||
|
||||
auto& eventHandler = window.GetEventHandler();
|
||||
eventHandler.OnKeyReleased.Connect([&](const Nz::WindowEventHandler*, const Nz::WindowEvent::KeyEvent& key)
|
||||
{
|
||||
|
|
@ -259,19 +260,25 @@ int main()
|
|||
Nz::SparsePtr<Nz::Vector2f> particlePosPtr(particleBasePtr + particlePosOffset, particleSize);
|
||||
Nz::SparsePtr<Nz::Vector2f> particleVelPtr(particleBasePtr + particleVelOffset, particleSize);
|
||||
|
||||
#ifndef NAZARA_PLATFORM_MACOS
|
||||
std::for_each_n(std::execution::par_unseq, particleBasePtr, particleCount, [&](Nz::UInt8& hax)
|
||||
#else
|
||||
std::for_each_n(particleBasePtr, particleCount, [&](Nz::UInt8& hax)
|
||||
#endif
|
||||
unsigned int workerCount = taskScheduler.GetWorkerCount();
|
||||
std::size_t particlePerWorker = particleCount / workerCount;
|
||||
std::size_t leftover = particleCount - particlePerWorker * workerCount;
|
||||
|
||||
for (unsigned int i = 0; i < workerCount; ++i)
|
||||
{
|
||||
static thread_local std::mt19937 rand_mt(std::random_device{}());
|
||||
taskScheduler.AddTask([&, offset = i * particlePerWorker, count = (i != workerCount - 1) ? particlePerWorker : particlePerWorker + leftover]
|
||||
{
|
||||
static thread_local std::mt19937 rand_mt(std::random_device{}());
|
||||
|
||||
std::size_t index = &hax - particleBasePtr; //< HAAX
|
||||
|
||||
particleVelPtr[index] += (particlePosPtr[index] - newMousePos).GetNormal() * 500.f;
|
||||
particleVelPtr[index] += Nz::Vector2f(velDis(rand_mt), velDis(rand_mt));
|
||||
});
|
||||
for (std::size_t i = 0; i < count; ++i)
|
||||
{
|
||||
std::size_t index = offset + i;
|
||||
particleVelPtr[index] += (particlePosPtr[index] - newMousePos).GetNormal() * 500.f;
|
||||
particleVelPtr[index] += Nz::Vector2f(velDis(rand_mt), velDis(rand_mt));
|
||||
}
|
||||
});
|
||||
}
|
||||
taskScheduler.WaitForTasks();
|
||||
|
||||
particleBuffer->Unmap();
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in New Issue