diff --git a/include/Nazara/Core/TaskScheduler.hpp b/include/Nazara/Core/TaskScheduler.hpp index e693eb0b8..a73e2bdf1 100644 --- a/include/Nazara/Core/TaskScheduler.hpp +++ b/include/Nazara/Core/TaskScheduler.hpp @@ -8,6 +8,7 @@ #define NAZARA_CORE_TASKSCHEDULER_HPP #include +#include #include #include #include @@ -27,7 +28,7 @@ namespace Nz void AddTask(Task&& task); - unsigned int GetWorkerCount() const; + inline unsigned int GetWorkerCount() const; void WaitForTasks(); @@ -46,6 +47,8 @@ namespace Nz std::atomic_uint m_idleWorkerCount; std::size_t m_nextWorkerIndex; std::vector m_workers; + MemoryPool m_tasks; + unsigned int m_workerCount; }; } diff --git a/include/Nazara/Core/TaskScheduler.inl b/include/Nazara/Core/TaskScheduler.inl index a8eacde68..7244af508 100644 --- a/include/Nazara/Core/TaskScheduler.inl +++ b/include/Nazara/Core/TaskScheduler.inl @@ -6,6 +6,10 @@ namespace Nz { + inline unsigned int TaskScheduler::GetWorkerCount() const + { + return m_workerCount; + } } #include diff --git a/src/Nazara/Core/TaskScheduler.cpp b/src/Nazara/Core/TaskScheduler.cpp index 50b9f028e..d34767793 100644 --- a/src/Nazara/Core/TaskScheduler.cpp +++ b/src/Nazara/Core/TaskScheduler.cpp @@ -6,10 +6,10 @@ #include #include #include -#include -#include +#include #include #include +#include #include #include @@ -53,30 +53,28 @@ namespace Nz ~Worker() { m_running = false; - m_conditionVariable.notify_one(); + if (!m_notifier.test_and_set()) + m_notifier.notify_one(); m_thread.join(); } - bool AddTask(Task&& task) + void AddTask(TaskScheduler::Task* task) { - std::unique_lock lock(m_mutex, std::defer_lock); - if (!lock.try_lock()) - return false; - - m_tasks.push_back(std::move(task)); - lock.unlock(); - - m_conditionVariable.notify_one(); - return true; + m_tasks.push(task); + if (!m_notifier.test_and_set()) + m_notifier.notify_one(); } void Run() { + bool idle = true; + m_notifier.wait(false); // wait until task scheduler finishes initializing + StackArray randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_owner.GetWorkerCount() - 1); { unsigned int* indexPtr = randomWorkerIndices.data(); - for (unsigned int i = 0; i < randomWorkerIndices.size(); ++i) + for (unsigned int i = 0; i < m_owner.GetWorkerCount(); ++i) { if (i != m_workerIndex) *indexPtr++ = i; @@ -86,27 +84,9 @@ namespace Nz std::shuffle(randomWorkerIndices.begin(), randomWorkerIndices.end(), gen); } - bool idle = true; - for (;;) + do { - std::unique_lock lock(m_mutex); - - // Wait for tasks if we don't have any right now - if (m_tasks.empty()) - { - if (!idle) - { - m_owner.NotifyWorkerIdle(); - idle = true; - } - - m_conditionVariable.wait(lock, [this] { return !m_running || !m_tasks.empty(); }); - } - - if (!m_running) - break; - - auto ExecuteTask = [&](TaskScheduler::Task& task) + auto ExecuteTask = [&](TaskScheduler::Task* task) { if (idle) { @@ -114,50 +94,44 @@ namespace Nz idle = false; } - task(); + (*task)(); }; - if (!m_tasks.empty()) - { - TaskScheduler::Task task = std::move(m_tasks.front()); - m_tasks.erase(m_tasks.begin()); - - lock.unlock(); - - ExecuteTask(task); - } + // Wait for tasks if we don't have any right now + std::optional task = m_tasks.pop(); + if (task) + ExecuteTask(*task); else { - lock.unlock(); - - // Try to steal a task from another worker in a random order to avoid lock contention - TaskScheduler::Task task; + // Try to steal a task from another worker in a random order to avoid contention for (unsigned int workerIndex : randomWorkerIndices) { - if (m_owner.GetWorker(workerIndex).StealTask(&task)) + if (task = m_owner.GetWorker(workerIndex).StealTask()) { - ExecuteTask(task); + ExecuteTask(*task); break; } } - } - // Note: it's possible for a thread to reach this point without executing a task (for example if another worker stole its only remaining task) + if (!task) + { + if (!idle) + { + m_owner.NotifyWorkerIdle(); + idle = true; + } + + m_notifier.wait(false); + m_notifier.clear(); + } + } } + while (m_running); } - bool StealTask(TaskScheduler::Task* task) + std::optional StealTask() { - std::unique_lock lock(m_mutex, std::defer_lock); - if (!lock.try_lock()) - return false; - - if (m_tasks.empty()) - return false; - - *task = std::move(m_tasks.front()); - m_tasks.erase(m_tasks.begin()); - return true; + return m_tasks.steal(); } Worker& operator=(const Worker& worker) = delete; @@ -169,10 +143,9 @@ namespace Nz private: std::atomic_bool m_running; - std::condition_variable m_conditionVariable; - std::mutex m_mutex; + std::atomic_flag m_notifier; std::thread m_thread; - std::vector m_tasks; + WorkStealingQueue m_tasks; TaskScheduler& m_owner; unsigned int m_workerIndex; }; @@ -181,15 +154,17 @@ namespace Nz TaskScheduler::TaskScheduler(unsigned int workerCount) : m_idle(true), - m_nextWorkerIndex(0) + m_nextWorkerIndex(0), + m_tasks(256 * sizeof(Task)), + m_workerCount(workerCount) { - if (workerCount == 0) - workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u); + if (m_workerCount == 0) + m_workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u); - m_idleWorkerCount = workerCount; + m_idleWorkerCount = m_workerCount; - m_workers.reserve(workerCount); - for (unsigned int i = 0; i < workerCount; ++i) + m_workers.reserve(m_workerCount); + for (unsigned int i = 0; i < m_workerCount; ++i) m_workers.emplace_back(*this, i); } @@ -202,25 +177,19 @@ namespace Nz { m_idle = false; - for (;;) - { - Worker& randomWorker = m_workers[m_nextWorkerIndex]; - if (randomWorker.AddTask(std::move(task))) - break; + std::size_t taskIndex; //< not used - if (++m_nextWorkerIndex >= m_workers.size()) - m_nextWorkerIndex = 0; - } - } + Worker& worker = m_workers[m_nextWorkerIndex++]; + worker.AddTask(m_tasks.Allocate(taskIndex, std::move(task))); - unsigned int TaskScheduler::GetWorkerCount() const - { - return static_cast(m_workers.size()); + if (m_nextWorkerIndex >= m_workers.size()) + m_nextWorkerIndex = 0; } void TaskScheduler::WaitForTasks() { m_idle.wait(false); + m_tasks.Clear(); } auto TaskScheduler::GetWorker(unsigned int workerIndex) -> Worker& diff --git a/thirdparty/include/wsq.hpp b/thirdparty/include/wsq.hpp new file mode 100644 index 000000000..c73b9ffb9 --- /dev/null +++ b/thirdparty/include/wsq.hpp @@ -0,0 +1,249 @@ +#pragma once + +#include +#include +#include +#include +#include + +/** +@class: WorkStealingQueue + +@tparam T data type + +@brief Lock-free unbounded single-producer multiple-consumer queue. + +This class implements the work stealing queue described in the paper, +"Correct and Efficient Work-Stealing for Weak Memory Models," +available at https://www.di.ens.fr/~zappa/readings/ppopp13.pdf. + +Only the queue owner can perform pop and push operations, +while others can steal data from the queue. +*/ +template +class WorkStealingQueue { + + struct Array { + + int64_t C; + int64_t M; + std::atomic* S; + + explicit Array(int64_t c) : + C {c}, + M {c-1}, + S {new std::atomic[static_cast(C)]} { + } + + ~Array() { + delete [] S; + } + + int64_t capacity() const noexcept { + return C; + } + + template + void push(int64_t i, O&& o) noexcept { + S[i & M].store(std::forward(o), std::memory_order_relaxed); + } + + T pop(int64_t i) noexcept { + return S[i & M].load(std::memory_order_relaxed); + } + + Array* resize(int64_t b, int64_t t) { + Array* ptr = new Array {2*C}; + for(int64_t i=t; i!=b; ++i) { + ptr->push(i, pop(i)); + } + return ptr; + } + + }; + + // avoids false sharing between _top and _bottom +#ifdef __cpp_lib_hardware_interference_size + alignas(std::hardware_destructive_interference_size) std::atomic _top; + alignas(std::hardware_destructive_interference_size) std::atomic _bottom; +#else + alignas(64) std::atomic _top; + alignas(64) std::atomic _bottom; +#endif + std::atomic _array; + std::vector _garbage; + + public: + + /** + @brief constructs the queue with a given capacity + + @param capacity the capacity of the queue (must be power of 2) + */ + explicit WorkStealingQueue(int64_t capacity = 1024); + + /** + @brief destructs the queue + */ + ~WorkStealingQueue(); + + /** + @brief queries if the queue is empty at the time of this call + */ + bool empty() const noexcept; + + /** + @brief queries the number of items at the time of this call + */ + size_t size() const noexcept; + + /** + @brief queries the capacity of the queue + */ + int64_t capacity() const noexcept; + + /** + @brief inserts an item to the queue + + Only the owner thread can insert an item to the queue. + The operation can trigger the queue to resize its capacity + if more space is required. + + @tparam O data type + + @param item the item to perfect-forward to the queue + */ + template + void push(O&& item); + + /** + @brief pops out an item from the queue + + Only the owner thread can pop out an item from the queue. + The return can be a @std_nullopt if this operation failed (empty queue). + */ + std::optional pop(); + + /** + @brief steals an item from the queue + + Any threads can try to steal an item from the queue. + The return can be a @std_nullopt if this operation failed (not necessary empty). + */ + std::optional steal(); +}; + +// Constructor +template +WorkStealingQueue::WorkStealingQueue(int64_t c) { + assert(c && (!(c & (c-1)))); + _top.store(0, std::memory_order_relaxed); + _bottom.store(0, std::memory_order_relaxed); + _array.store(new Array{c}, std::memory_order_relaxed); + _garbage.reserve(32); +} + +// Destructor +template +WorkStealingQueue::~WorkStealingQueue() { + for(auto a : _garbage) { + delete a; + } + delete _array.load(); +} + +// Function: empty +template +bool WorkStealingQueue::empty() const noexcept { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_relaxed); + return b <= t; +} + +// Function: size +template +size_t WorkStealingQueue::size() const noexcept { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_relaxed); + return static_cast(b >= t ? b - t : 0); +} + +// Function: push +template +template +void WorkStealingQueue::push(O&& o) { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_acquire); + Array* a = _array.load(std::memory_order_relaxed); + + // queue is full + if(a->capacity() - 1 < (b - t)) { + Array* tmp = a->resize(b, t); + _garbage.push_back(a); + std::swap(a, tmp); + _array.store(a, std::memory_order_relaxed); + } + + a->push(b, std::forward(o)); + std::atomic_thread_fence(std::memory_order_release); + _bottom.store(b + 1, std::memory_order_relaxed); +} + +// Function: pop +template +std::optional WorkStealingQueue::pop() { + int64_t b = _bottom.load(std::memory_order_relaxed) - 1; + Array* a = _array.load(std::memory_order_relaxed); + _bottom.store(b, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t t = _top.load(std::memory_order_relaxed); + + std::optional item; + + if(t <= b) { + item = a->pop(b); + if(t == b) { + // the last item just got stolen + if(!_top.compare_exchange_strong(t, t+1, + std::memory_order_seq_cst, + std::memory_order_relaxed)) { + item = std::nullopt; + } + _bottom.store(b + 1, std::memory_order_relaxed); + } + } + else { + _bottom.store(b + 1, std::memory_order_relaxed); + } + + return item; +} + +// Function: steal +template +std::optional WorkStealingQueue::steal() { + int64_t t = _top.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t b = _bottom.load(std::memory_order_acquire); + + std::optional item; + + if(t < b) { + Array* a = _array.load(std::memory_order_consume); + item = a->pop(t); + if(!_top.compare_exchange_strong(t, t+1, + std::memory_order_seq_cst, + std::memory_order_relaxed)) { + return std::nullopt; + } + } + + return item; +} + +// Function: capacity +template +int64_t WorkStealingQueue::capacity() const noexcept { + return _array.load(std::memory_order_relaxed)->capacity(); +} +