From 3eae055d3a24a2969ecf90e99d1b943d78fded00 Mon Sep 17 00:00:00 2001 From: SirLynix Date: Mon, 5 Feb 2024 15:08:36 +0100 Subject: [PATCH] Core/TaskScheduler: Replace atomic queue from wsq to concurrentqueue This is because wsq makes stealing work by stealing from the opposite end of the queue which will be a problem in the long term --- src/Nazara/Core/TaskScheduler.cpp | 60 +------ thirdparty/include/wsq.hpp | 254 ------------------------------ xmake.lua | 11 +- 3 files changed, 17 insertions(+), 308 deletions(-) delete mode 100644 thirdparty/include/wsq.hpp diff --git a/src/Nazara/Core/TaskScheduler.cpp b/src/Nazara/Core/TaskScheduler.cpp index 6f1fcacf9..dd2e6348d 100644 --- a/src/Nazara/Core/TaskScheduler.cpp +++ b/src/Nazara/Core/TaskScheduler.cpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -31,43 +31,6 @@ namespace Nz #else constexpr std::size_t hardware_destructive_interference_size = 64; #endif - - class Spinlock - { - public: - Spinlock() = default; - Spinlock(const Spinlock&) = delete; - Spinlock(Spinlock&&) = delete; - ~Spinlock() = default; - - void lock() - { - while (m_flag.test_and_set()); - } - - bool try_lock(unsigned int maxLockCount = 1) - { - unsigned int lockCount = 0; - while (m_flag.test_and_set()) - { - if (++lockCount >= maxLockCount) - return false; - } - - return true; - } - - void unlock() - { - m_flag.clear(); - } - - Spinlock& operator=(const Spinlock&) = delete; - Spinlock& operator=(Spinlock&&) = delete; - - private: - std::atomic_flag m_flag; - }; } class alignas(NAZARA_ANONYMOUS_NAMESPACE_PREFIX(hardware_destructive_interference_size * 2)) TaskScheduler::Worker @@ -101,10 +64,7 @@ namespace Nz void AddTask(TaskScheduler::Task* task) { - std::unique_lock lock(m_queueSpinlock); - { - m_tasks.push(task); - } + m_tasks.enqueue(task); WakeUp(); } @@ -130,13 +90,8 @@ namespace Nz while (m_running.load(std::memory_order_relaxed)) { // Get a task - TaskScheduler::Task* task; - { - std::unique_lock lock(m_queueSpinlock); - task = m_tasks.pop(); - } - - if (!task) + TaskScheduler::Task* task = nullptr; + if (!m_tasks.try_dequeue(task)) { for (unsigned int workerIndex : randomWorkerIndices) { @@ -175,7 +130,9 @@ namespace Nz TaskScheduler::Task* StealTask() { - return m_tasks.steal(); + TaskScheduler::Task* task = nullptr; + m_tasks.try_dequeue(task); + return task; } void WakeUp() @@ -196,8 +153,7 @@ namespace Nz std::atomic_bool m_running; std::atomic_flag m_notifier; std::thread m_thread; //< std::jthread is not yet widely implemented - NAZARA_ANONYMOUS_NAMESPACE_PREFIX(Spinlock) m_queueSpinlock; - WorkStealingQueue m_tasks; + moodycamel::ConcurrentQueue m_tasks; TaskScheduler& m_owner; unsigned int m_workerIndex; }; diff --git a/thirdparty/include/wsq.hpp b/thirdparty/include/wsq.hpp deleted file mode 100644 index 312f51a52..000000000 --- a/thirdparty/include/wsq.hpp +++ /dev/null @@ -1,254 +0,0 @@ -#pragma once - -// This file comes from https://github.com/taskflow/work-stealing-queue -// and has been modified by Nazara author (SirLynix): -// - _top and _bottom atomics are now aligned to the double of the cacheline size -// - the queue allows to override the value type returned by pop/steal - -#include -#include -#include -#include -#include - -/** -@class: WorkStealingQueue - -@tparam T data type - -@brief Lock-free unbounded single-producer multiple-consumer queue. - -This class implements the work stealing queue described in the paper, -"Correct and Efficient Work-Stealing for Weak Memory Models," -available at https://www.di.ens.fr/~zappa/readings/ppopp13.pdf. - -Only the queue owner can perform pop and push operations, -while others can steal data from the queue. -*/ -template > -class WorkStealingQueue { - - struct Array { - - int64_t C; - int64_t M; - std::atomic* S; - - explicit Array(int64_t c) : - C {c}, - M {c-1}, - S {new std::atomic[static_cast(C)]} { - } - - ~Array() { - delete [] S; - } - - int64_t capacity() const noexcept { - return C; - } - - template - void push(int64_t i, O&& o) noexcept { - S[i & M].store(std::forward(o), std::memory_order_relaxed); - } - - T pop(int64_t i) noexcept { - return S[i & M].load(std::memory_order_relaxed); - } - - Array* resize(int64_t b, int64_t t) { - Array* ptr = new Array {2*C}; - for(int64_t i=t; i!=b; ++i) { - ptr->push(i, pop(i)); - } - return ptr; - } - - }; - - // avoids false sharing between _top and _bottom -#ifdef __cpp_lib_hardware_interference_size - alignas(std::hardware_destructive_interference_size * 2) std::atomic _top; - alignas(std::hardware_destructive_interference_size * 2) std::atomic _bottom; -#else - alignas(64 * 2) std::atomic _top; - alignas(64 * 2) std::atomic _bottom; -#endif - std::atomic _array; - std::vector _garbage; - - public: - - /** - @brief constructs the queue with a given capacity - - @param capacity the capacity of the queue (must be power of 2) - */ - explicit WorkStealingQueue(int64_t capacity = 1024); - - /** - @brief destructs the queue - */ - ~WorkStealingQueue(); - - /** - @brief queries if the queue is empty at the time of this call - */ - bool empty() const noexcept; - - /** - @brief queries the number of items at the time of this call - */ - size_t size() const noexcept; - - /** - @brief queries the capacity of the queue - */ - int64_t capacity() const noexcept; - - /** - @brief inserts an item to the queue - - Only the owner thread can insert an item to the queue. - The operation can trigger the queue to resize its capacity - if more space is required. - - @tparam O data type - - @param item the item to perfect-forward to the queue - */ - template - void push(O&& item); - - /** - @brief pops out an item from the queue - - Only the owner thread can pop out an item from the queue. - The return can be a default-constructed V if this operation failed (empty queue). - */ - V pop(); - - /** - @brief steals an item from the queue - - Any threads can try to steal an item from the queue. - The return can be a default-constructed V if this operation failed (not necessary empty). - */ - V steal(); -}; - -// Constructor -template -WorkStealingQueue::WorkStealingQueue(int64_t c) { - assert(c && (!(c & (c-1)))); - _top.store(0, std::memory_order_relaxed); - _bottom.store(0, std::memory_order_relaxed); - _array.store(new Array{c}, std::memory_order_relaxed); - _garbage.reserve(32); -} - -// Destructor -template -WorkStealingQueue::~WorkStealingQueue() { - for(auto a : _garbage) { - delete a; - } - delete _array.load(); -} - -// Function: empty -template -bool WorkStealingQueue::empty() const noexcept { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_relaxed); - return b <= t; -} - -// Function: size -template -size_t WorkStealingQueue::size() const noexcept { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_relaxed); - return static_cast(b >= t ? b - t : 0); -} - -// Function: push -template -template -void WorkStealingQueue::push(O&& o) { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_acquire); - Array* a = _array.load(std::memory_order_relaxed); - - // queue is full - if(a->capacity() - 1 < (b - t)) { - Array* tmp = a->resize(b, t); - _garbage.push_back(a); - std::swap(a, tmp); - _array.store(a, std::memory_order_relaxed); - } - - a->push(b, std::forward(o)); - std::atomic_thread_fence(std::memory_order_release); - _bottom.store(b + 1, std::memory_order_relaxed); -} - -// Function: pop -template -V WorkStealingQueue::pop() { - int64_t b = _bottom.load(std::memory_order_relaxed) - 1; - Array* a = _array.load(std::memory_order_relaxed); - _bottom.store(b, std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_seq_cst); - int64_t t = _top.load(std::memory_order_relaxed); - - V item = {}; - - if(t <= b) { - item = a->pop(b); - if(t == b) { - // the last item just got stolen - if(!_top.compare_exchange_strong(t, t+1, - std::memory_order_seq_cst, - std::memory_order_relaxed)) { - item = V{}; - } - _bottom.store(b + 1, std::memory_order_relaxed); - } - } - else { - _bottom.store(b + 1, std::memory_order_relaxed); - } - - return item; -} - -// Function: steal -template -V WorkStealingQueue::steal() { - int64_t t = _top.load(std::memory_order_acquire); - std::atomic_thread_fence(std::memory_order_seq_cst); - int64_t b = _bottom.load(std::memory_order_acquire); - - V item = {}; - - if(t < b) { - Array* a = _array.load(std::memory_order_consume); - item = a->pop(t); - if(!_top.compare_exchange_strong(t, t+1, - std::memory_order_seq_cst, - std::memory_order_relaxed)) { - return V{}; - } - } - - return item; -} - -// Function: capacity -template -int64_t WorkStealingQueue::capacity() const noexcept { - return _array.load(std::memory_order_relaxed)->capacity(); -} - diff --git a/xmake.lua b/xmake.lua index fbf1aaf87..af1615088 100644 --- a/xmake.lua +++ b/xmake.lua @@ -113,7 +113,7 @@ local modules = { remove_files("src/Nazara/Core/Posix/TimeImpl.cpp") end end, - Packages = { "entt", "frozen", "utfcpp" }, + Packages = { "concurrentqueue", "entt", "frozen", "utfcpp" }, PublicPackages = { "nazarautils" } }, Graphics = { @@ -275,7 +275,14 @@ end add_repositories("nazara-engine-repo https://github.com/NazaraEngine/xmake-repo") -add_requires("entt 3.13.0", "fmt", "frozen", "nazarautils >=2024.01.25", "utfcpp") +add_requires( + "concurrentqueue", + "entt 3.13.0", + "fmt", + "frozen", + "nazarautils >=2024.01.25", + "utfcpp" +) -- Module dependencies if has_config("audio") then