From a4827a99a0f6ed278a1d7357484783cb447e4b20 Mon Sep 17 00:00:00 2001 From: SirLynix Date: Mon, 5 Feb 2024 15:59:45 +0100 Subject: [PATCH] Core/TaskScheduler: Make implementation private --- include/Nazara/Core/TaskScheduler.hpp | 16 +--- include/Nazara/Core/TaskScheduler.inl | 4 - src/Nazara/Core/TaskScheduler.cpp | 107 +++++++++++++------------- 3 files changed, 55 insertions(+), 72 deletions(-) diff --git a/include/Nazara/Core/TaskScheduler.hpp b/include/Nazara/Core/TaskScheduler.hpp index 3b732f060..9f63bd6be 100644 --- a/include/Nazara/Core/TaskScheduler.hpp +++ b/include/Nazara/Core/TaskScheduler.hpp @@ -8,9 +8,7 @@ #define NAZARA_CORE_TASKSCHEDULER_HPP #include -#include #include -#include #include #include @@ -28,7 +26,7 @@ namespace Nz void AddTask(Task&& task); - inline unsigned int GetWorkerCount() const; + unsigned int GetWorkerCount() const; void WaitForTasks(); @@ -36,17 +34,9 @@ namespace Nz TaskScheduler& operator=(TaskScheduler&&) = delete; private: + struct Data; class Worker; - friend Worker; - - Worker& GetWorker(unsigned int workerIndex); - void NotifyTaskCompletion(); - - std::atomic_uint m_remainingTasks; - std::size_t m_nextWorkerIndex; - std::vector m_workers; - MemoryPool m_tasks; - unsigned int m_workerCount; + std::unique_ptr m_data; }; } diff --git a/include/Nazara/Core/TaskScheduler.inl b/include/Nazara/Core/TaskScheduler.inl index 7244af508..a8eacde68 100644 --- a/include/Nazara/Core/TaskScheduler.inl +++ b/include/Nazara/Core/TaskScheduler.inl @@ -6,10 +6,6 @@ namespace Nz { - inline unsigned int TaskScheduler::GetWorkerCount() const - { - return m_workerCount; - } } #include diff --git a/src/Nazara/Core/TaskScheduler.cpp b/src/Nazara/Core/TaskScheduler.cpp index dd2e6348d..1034cec6c 100644 --- a/src/Nazara/Core/TaskScheduler.cpp +++ b/src/Nazara/Core/TaskScheduler.cpp @@ -33,12 +33,20 @@ namespace Nz #endif } + struct TaskScheduler::Data + { + std::atomic_uint remainingTasks = 0; + std::size_t nextWorkerIndex = 0; + std::vector workers; + unsigned int workerCount; + }; + class alignas(NAZARA_ANONYMOUS_NAMESPACE_PREFIX(hardware_destructive_interference_size * 2)) TaskScheduler::Worker { public: - Worker(TaskScheduler& owner, unsigned int workerIndex) : + Worker(TaskScheduler::Data& data, unsigned int workerIndex) : m_running(true), - m_owner(owner), + m_data(data), m_workerIndex(workerIndex) { m_thread = std::thread([this] @@ -52,7 +60,7 @@ namespace Nz // "Implement" movement to make the compiler happy Worker(Worker&& worker) : - m_owner(worker.m_owner) + m_data(worker.m_data) { NAZARA_UNREACHABLE(); } @@ -62,22 +70,28 @@ namespace Nz m_thread.join(); } - void AddTask(TaskScheduler::Task* task) + void AddTask(TaskScheduler::Task&& task) { - m_tasks.enqueue(task); + m_tasks.enqueue(std::move(task)); WakeUp(); } + void NotifyTaskCompletion() + { + if (--m_data.remainingTasks == 0) + m_data.remainingTasks.notify_one(); + } + void Run() { // Wait until task scheduler started m_notifier.wait(false); m_notifier.clear(); - StackArray randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_owner.GetWorkerCount() - 1); + StackArray randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_data.workerCount - 1); { unsigned int* indexPtr = randomWorkerIndices.data(); - for (unsigned int i = 0; i < m_owner.GetWorkerCount(); ++i) + for (unsigned int i = 0; i < m_data.workerCount; ++i) { if (i != m_workerIndex) *indexPtr++ = i; @@ -90,12 +104,12 @@ namespace Nz while (m_running.load(std::memory_order_relaxed)) { // Get a task - TaskScheduler::Task* task = nullptr; + TaskScheduler::Task task; if (!m_tasks.try_dequeue(task)) { for (unsigned int workerIndex : randomWorkerIndices) { - task = m_owner.GetWorker(workerIndex).StealTask(); + task = m_data.workers[workerIndex].StealTask(); if (task) break; } @@ -108,9 +122,9 @@ namespace Nz __tsan_acquire(task); #endif - (*task)(); + task(); - m_owner.NotifyTaskCompletion(); + NotifyTaskCompletion(); } else { @@ -128,9 +142,9 @@ namespace Nz m_notifier.notify_one(); } - TaskScheduler::Task* StealTask() + TaskScheduler::Task StealTask() { - TaskScheduler::Task* task = nullptr; + TaskScheduler::Task task; m_tasks.try_dequeue(task); return task; } @@ -153,57 +167,53 @@ namespace Nz std::atomic_bool m_running; std::atomic_flag m_notifier; std::thread m_thread; //< std::jthread is not yet widely implemented - moodycamel::ConcurrentQueue m_tasks; - TaskScheduler& m_owner; + moodycamel::ConcurrentQueue m_tasks; + TaskScheduler::Data& m_data; unsigned int m_workerIndex; }; NAZARA_WARNING_POP() - TaskScheduler::TaskScheduler(unsigned int workerCount) : - m_remainingTasks(0), - m_nextWorkerIndex(0), - m_tasks(256 * sizeof(Task)), - m_workerCount(workerCount) + TaskScheduler::TaskScheduler(unsigned int workerCount) { - if (m_workerCount == 0) - m_workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u); + if (workerCount == 0) + workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u); - m_workers.reserve(m_workerCount); - for (unsigned int i = 0; i < m_workerCount; ++i) - m_workers.emplace_back(*this, i); + m_data = std::make_unique(); + m_data->workerCount = workerCount; - for (Worker& worker : m_workers) + m_data->workers.reserve(workerCount); + for (unsigned int i = 0; i < workerCount; ++i) + m_data->workers.emplace_back(*m_data, i); + + for (Worker& worker : m_data->workers) worker.WakeUp(); } TaskScheduler::~TaskScheduler() { // Wake up workers and tell them to exit - for (Worker& worker : m_workers) + for (Worker& worker : m_data->workers) worker.Shutdown(); // wait for them to have exited - m_workers.clear(); + m_data->workers.clear(); } void TaskScheduler::AddTask(Task&& task) { - std::size_t taskIndex; //< not used - Task* taskPtr = m_tasks.Allocate(taskIndex, std::move(task)); + m_data->remainingTasks++; -#ifdef NAZARA_WITH_TSAN - // Workaround for TSan false-positive - __tsan_release(taskPtr); -#endif + Worker& worker = m_data->workers[m_data->nextWorkerIndex++]; + worker.AddTask(std::move(task)); - m_remainingTasks++; + if (m_data->nextWorkerIndex >= m_data->workers.size()) + m_data->nextWorkerIndex = 0; + } - Worker& worker = m_workers[m_nextWorkerIndex++]; - worker.AddTask(taskPtr); - - if (m_nextWorkerIndex >= m_workers.size()) - m_nextWorkerIndex = 0; + unsigned int TaskScheduler::GetWorkerCount() const + { + return m_data->workerCount; } void TaskScheduler::WaitForTasks() @@ -212,26 +222,13 @@ namespace Nz for (;;) { // Load and test current value - unsigned int remainingTasks = m_remainingTasks.load(); + unsigned int remainingTasks = m_data->remainingTasks.load(); if (remainingTasks == 0) break; // If task count isn't 0, wait until it's signaled // (we need to retest remainingTasks because a worker can signal m_remainingTasks while we're still adding tasks) - m_remainingTasks.wait(remainingTasks); + m_data->remainingTasks.wait(remainingTasks); } - - m_tasks.Clear(); - } - - auto TaskScheduler::GetWorker(unsigned int workerIndex) -> Worker& - { - return m_workers[workerIndex]; - } - - void TaskScheduler::NotifyTaskCompletion() - { - if (--m_remainingTasks == 0) - m_remainingTasks.notify_one(); } }