Core/TaskScheduler: Fix TSan false-positive and remove std::optional

This commit is contained in:
SirLynix 2024-02-02 21:19:17 +01:00
parent 06a42b4ecb
commit e3fa6fed89
3 changed files with 56 additions and 35 deletions

View File

@ -11,6 +11,11 @@
#include <random> #include <random>
#include <semaphore> #include <semaphore>
#include <thread> #include <thread>
#ifdef NAZARA_WITH_TSAN
#include <sanitizer/tsan_interface.h>
#endif
#include <Nazara/Core/Debug.hpp> #include <Nazara/Core/Debug.hpp>
namespace Nz namespace Nz
@ -89,12 +94,13 @@ namespace Nz
{ {
// FIXME: We can't use pop() because push() and pop() are not thread-safe (and push is called on another thread), but steal() is // FIXME: We can't use pop() because push() and pop() are not thread-safe (and push is called on another thread), but steal() is
// is it an issue? // is it an issue?
std::optional<TaskScheduler::Task*> task = m_tasks.steal(); TaskScheduler::Task* task = m_tasks.steal();
if (!task) if (!task)
{ {
for (unsigned int workerIndex : randomWorkerIndices) for (unsigned int workerIndex : randomWorkerIndices)
{ {
if (task = m_owner.GetWorker(workerIndex).StealTask()) task = m_owner.GetWorker(workerIndex).StealTask();
if (task)
break; break;
} }
} }
@ -107,8 +113,12 @@ namespace Nz
idle = false; idle = false;
} }
NAZARA_ASSUME(*task != nullptr); #ifdef NAZARA_WITH_TSAN
(**task)(); // Workaround for TSan false-positive
__tsan_acquire(taskPtr);
#endif
(*task)();
} }
else else
{ {
@ -126,7 +136,7 @@ namespace Nz
while (m_running.load(std::memory_order_relaxed)); while (m_running.load(std::memory_order_relaxed));
} }
std::optional<TaskScheduler::Task*> StealTask() TaskScheduler::Task* StealTask()
{ {
return m_tasks.steal(); return m_tasks.steal();
} }
@ -149,7 +159,7 @@ namespace Nz
std::atomic_bool m_running; std::atomic_bool m_running;
std::atomic_flag m_notifier; std::atomic_flag m_notifier;
std::thread m_thread; //< std::jthread is not yet widely implemented std::thread m_thread; //< std::jthread is not yet widely implemented
WorkStealingQueue<TaskScheduler::Task*> m_tasks; WorkStealingQueue<TaskScheduler::Task*, TaskScheduler::Task*> m_tasks;
TaskScheduler& m_owner; TaskScheduler& m_owner;
unsigned int m_workerIndex; unsigned int m_workerIndex;
}; };
@ -189,6 +199,11 @@ namespace Nz
std::size_t taskIndex; //< not used std::size_t taskIndex; //< not used
Task* taskPtr = m_tasks.Allocate(taskIndex, std::move(task)); Task* taskPtr = m_tasks.Allocate(taskIndex, std::move(task));
#ifdef NAZARA_WITH_TSAN
// Workaround for TSan false-positive
__tsan_release(taskPtr);
#endif
Worker& worker = m_workers[m_nextWorkerIndex++]; Worker& worker = m_workers[m_nextWorkerIndex++];
worker.AddTask(taskPtr); worker.AddTask(taskPtr);

View File

@ -1,5 +1,10 @@
#pragma once #pragma once
// This file comes from https://github.com/taskflow/work-stealing-queue
// and has been modified by Nazara author (SirLynix):
// - _top and _bottom atomics are now aligned to the double of the cacheline size
// - the queue allows to override the value type returned by pop/steal
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <optional> #include <optional>
@ -20,7 +25,7 @@ available at https://www.di.ens.fr/~zappa/readings/ppopp13.pdf.
Only the queue owner can perform pop and push operations, Only the queue owner can perform pop and push operations,
while others can steal data from the queue. while others can steal data from the queue.
*/ */
template <typename T> template <typename T, typename V = std::optional<T>>
class WorkStealingQueue { class WorkStealingQueue {
struct Array { struct Array {
@ -64,11 +69,11 @@ class WorkStealingQueue {
// avoids false sharing between _top and _bottom // avoids false sharing between _top and _bottom
#ifdef __cpp_lib_hardware_interference_size #ifdef __cpp_lib_hardware_interference_size
alignas(std::hardware_destructive_interference_size) std::atomic<int64_t> _top; alignas(std::hardware_destructive_interference_size * 2) std::atomic<int64_t> _top;
alignas(std::hardware_destructive_interference_size) std::atomic<int64_t> _bottom; alignas(std::hardware_destructive_interference_size * 2) std::atomic<int64_t> _bottom;
#else #else
alignas(64) std::atomic<int64_t> _top; alignas(64 * 2) std::atomic<int64_t> _top;
alignas(64) std::atomic<int64_t> _bottom; alignas(64 * 2) std::atomic<int64_t> _bottom;
#endif #endif
std::atomic<Array*> _array; std::atomic<Array*> _array;
std::vector<Array*> _garbage; std::vector<Array*> _garbage;
@ -120,22 +125,22 @@ class WorkStealingQueue {
@brief pops out an item from the queue @brief pops out an item from the queue
Only the owner thread can pop out an item from the queue. Only the owner thread can pop out an item from the queue.
The return can be a @std_nullopt if this operation failed (empty queue). The return can be a default-constructed V if this operation failed (empty queue).
*/ */
std::optional<T> pop(); V pop();
/** /**
@brief steals an item from the queue @brief steals an item from the queue
Any threads can try to steal an item from the queue. Any threads can try to steal an item from the queue.
The return can be a @std_nullopt if this operation failed (not necessary empty). The return can be a default-constructed V if this operation failed (not necessary empty).
*/ */
std::optional<T> steal(); V steal();
}; };
// Constructor // Constructor
template <typename T> template <typename T, typename V>
WorkStealingQueue<T>::WorkStealingQueue(int64_t c) { WorkStealingQueue<T, V>::WorkStealingQueue(int64_t c) {
assert(c && (!(c & (c-1)))); assert(c && (!(c & (c-1))));
_top.store(0, std::memory_order_relaxed); _top.store(0, std::memory_order_relaxed);
_bottom.store(0, std::memory_order_relaxed); _bottom.store(0, std::memory_order_relaxed);
@ -144,8 +149,8 @@ WorkStealingQueue<T>::WorkStealingQueue(int64_t c) {
} }
// Destructor // Destructor
template <typename T> template <typename T, typename V>
WorkStealingQueue<T>::~WorkStealingQueue() { WorkStealingQueue<T, V>::~WorkStealingQueue() {
for(auto a : _garbage) { for(auto a : _garbage) {
delete a; delete a;
} }
@ -153,25 +158,25 @@ WorkStealingQueue<T>::~WorkStealingQueue() {
} }
// Function: empty // Function: empty
template <typename T> template <typename T, typename V>
bool WorkStealingQueue<T>::empty() const noexcept { bool WorkStealingQueue<T, V>::empty() const noexcept {
int64_t b = _bottom.load(std::memory_order_relaxed); int64_t b = _bottom.load(std::memory_order_relaxed);
int64_t t = _top.load(std::memory_order_relaxed); int64_t t = _top.load(std::memory_order_relaxed);
return b <= t; return b <= t;
} }
// Function: size // Function: size
template <typename T> template <typename T, typename V>
size_t WorkStealingQueue<T>::size() const noexcept { size_t WorkStealingQueue<T, V>::size() const noexcept {
int64_t b = _bottom.load(std::memory_order_relaxed); int64_t b = _bottom.load(std::memory_order_relaxed);
int64_t t = _top.load(std::memory_order_relaxed); int64_t t = _top.load(std::memory_order_relaxed);
return static_cast<size_t>(b >= t ? b - t : 0); return static_cast<size_t>(b >= t ? b - t : 0);
} }
// Function: push // Function: push
template <typename T> template <typename T, typename V>
template <typename O> template <typename O>
void WorkStealingQueue<T>::push(O&& o) { void WorkStealingQueue<T, V>::push(O&& o) {
int64_t b = _bottom.load(std::memory_order_relaxed); int64_t b = _bottom.load(std::memory_order_relaxed);
int64_t t = _top.load(std::memory_order_acquire); int64_t t = _top.load(std::memory_order_acquire);
Array* a = _array.load(std::memory_order_relaxed); Array* a = _array.load(std::memory_order_relaxed);
@ -190,15 +195,15 @@ void WorkStealingQueue<T>::push(O&& o) {
} }
// Function: pop // Function: pop
template <typename T> template <typename T, typename V>
std::optional<T> WorkStealingQueue<T>::pop() { V WorkStealingQueue<T, V>::pop() {
int64_t b = _bottom.load(std::memory_order_relaxed) - 1; int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
Array* a = _array.load(std::memory_order_relaxed); Array* a = _array.load(std::memory_order_relaxed);
_bottom.store(b, std::memory_order_relaxed); _bottom.store(b, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst); std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t t = _top.load(std::memory_order_relaxed); int64_t t = _top.load(std::memory_order_relaxed);
std::optional<T> item; V item = {};
if(t <= b) { if(t <= b) {
item = a->pop(b); item = a->pop(b);
@ -207,7 +212,7 @@ std::optional<T> WorkStealingQueue<T>::pop() {
if(!_top.compare_exchange_strong(t, t+1, if(!_top.compare_exchange_strong(t, t+1,
std::memory_order_seq_cst, std::memory_order_seq_cst,
std::memory_order_relaxed)) { std::memory_order_relaxed)) {
item = std::nullopt; item = V{};
} }
_bottom.store(b + 1, std::memory_order_relaxed); _bottom.store(b + 1, std::memory_order_relaxed);
} }
@ -220,13 +225,13 @@ std::optional<T> WorkStealingQueue<T>::pop() {
} }
// Function: steal // Function: steal
template <typename T> template <typename T, typename V>
std::optional<T> WorkStealingQueue<T>::steal() { V WorkStealingQueue<T, V>::steal() {
int64_t t = _top.load(std::memory_order_acquire); int64_t t = _top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst); std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t b = _bottom.load(std::memory_order_acquire); int64_t b = _bottom.load(std::memory_order_acquire);
std::optional<T> item; V item = {};
if(t < b) { if(t < b) {
Array* a = _array.load(std::memory_order_consume); Array* a = _array.load(std::memory_order_consume);
@ -234,7 +239,7 @@ std::optional<T> WorkStealingQueue<T>::steal() {
if(!_top.compare_exchange_strong(t, t+1, if(!_top.compare_exchange_strong(t, t+1,
std::memory_order_seq_cst, std::memory_order_seq_cst,
std::memory_order_relaxed)) { std::memory_order_relaxed)) {
return std::nullopt; return V{};
} }
} }
@ -242,8 +247,8 @@ std::optional<T> WorkStealingQueue<T>::steal() {
} }
// Function: capacity // Function: capacity
template <typename T> template <typename T, typename V>
int64_t WorkStealingQueue<T>::capacity() const noexcept { int64_t WorkStealingQueue<T, V>::capacity() const noexcept {
return _array.load(std::memory_order_relaxed)->capacity(); return _array.load(std::memory_order_relaxed)->capacity();
} }

View File

@ -251,6 +251,7 @@ for opt, policy in table.orderpairs(sanitizers) do
option(opt, { description = "Enable " .. opt, default = false }) option(opt, { description = "Enable " .. opt, default = false })
if has_config(opt) then if has_config(opt) then
add_defines("NAZARA_WITH_" .. opt:upper())
set_policy("build.sanitizer." .. policy, true) set_policy("build.sanitizer." .. policy, true)
end end
end end