Core/TaskScheduler: Use WorkStealingQueue::pop
steal() can incorrectly return nullptr even if the list is not empty in case of concurrent access, but push and pop are not threadsafe so we use a spinlock to prevent concurrent uses
This commit is contained in:
parent
2e56bb4db4
commit
c303bf9283
|
|
@ -7,6 +7,7 @@
|
||||||
#include <Nazara/Core/ThreadExt.hpp>
|
#include <Nazara/Core/ThreadExt.hpp>
|
||||||
#include <NazaraUtils/StackArray.hpp>
|
#include <NazaraUtils/StackArray.hpp>
|
||||||
#include <wsq.hpp>
|
#include <wsq.hpp>
|
||||||
|
#include <mutex>
|
||||||
#include <new>
|
#include <new>
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <semaphore>
|
#include <semaphore>
|
||||||
|
|
@ -30,6 +31,43 @@ namespace Nz
|
||||||
#else
|
#else
|
||||||
constexpr std::size_t hardware_destructive_interference_size = 64;
|
constexpr std::size_t hardware_destructive_interference_size = 64;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
class Spinlock
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Spinlock() = default;
|
||||||
|
Spinlock(const Spinlock&) = delete;
|
||||||
|
Spinlock(Spinlock&&) = delete;
|
||||||
|
~Spinlock() = default;
|
||||||
|
|
||||||
|
void lock()
|
||||||
|
{
|
||||||
|
while (m_flag.test_and_set());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool try_lock(unsigned int maxLockCount = 1)
|
||||||
|
{
|
||||||
|
unsigned int lockCount = 0;
|
||||||
|
while (m_flag.test_and_set())
|
||||||
|
{
|
||||||
|
if (++lockCount >= maxLockCount)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock()
|
||||||
|
{
|
||||||
|
m_flag.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
Spinlock& operator=(const Spinlock&) = delete;
|
||||||
|
Spinlock& operator=(Spinlock&&) = delete;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::atomic_flag m_flag;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
class alignas(NAZARA_ANONYMOUS_NAMESPACE_PREFIX(hardware_destructive_interference_size)) TaskScheduler::Worker
|
class alignas(NAZARA_ANONYMOUS_NAMESPACE_PREFIX(hardware_destructive_interference_size)) TaskScheduler::Worker
|
||||||
|
|
@ -67,7 +105,10 @@ namespace Nz
|
||||||
|
|
||||||
void AddTask(TaskScheduler::Task* task)
|
void AddTask(TaskScheduler::Task* task)
|
||||||
{
|
{
|
||||||
m_tasks.push(task);
|
std::unique_lock lock(m_queueSpinlock);
|
||||||
|
{
|
||||||
|
m_tasks.push(task);
|
||||||
|
}
|
||||||
WakeUp();
|
WakeUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -92,9 +133,13 @@ namespace Nz
|
||||||
bool idle = false;
|
bool idle = false;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
// FIXME: We can't use pop() because push() and pop() are not thread-safe (and push is called on another thread), but steal() is
|
// Get a task
|
||||||
// is it an issue?
|
TaskScheduler::Task* task;
|
||||||
TaskScheduler::Task* task = m_tasks.steal();
|
{
|
||||||
|
std::unique_lock lock(m_queueSpinlock);
|
||||||
|
task = m_tasks.pop();
|
||||||
|
}
|
||||||
|
|
||||||
if (!task)
|
if (!task)
|
||||||
{
|
{
|
||||||
for (unsigned int workerIndex : randomWorkerIndices)
|
for (unsigned int workerIndex : randomWorkerIndices)
|
||||||
|
|
@ -115,10 +160,15 @@ namespace Nz
|
||||||
|
|
||||||
#ifdef NAZARA_WITH_TSAN
|
#ifdef NAZARA_WITH_TSAN
|
||||||
// Workaround for TSan false-positive
|
// Workaround for TSan false-positive
|
||||||
__tsan_acquire(taskPtr);
|
__tsan_acquire(task);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
(*task)();
|
(*task)();
|
||||||
|
|
||||||
|
#ifdef NAZARA_WITH_TSAN
|
||||||
|
// Workaround for TSan false-positive
|
||||||
|
__tsan_release(task);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
@ -159,6 +209,7 @@ namespace Nz
|
||||||
std::atomic_bool m_running;
|
std::atomic_bool m_running;
|
||||||
std::atomic_flag m_notifier;
|
std::atomic_flag m_notifier;
|
||||||
std::thread m_thread; //< std::jthread is not yet widely implemented
|
std::thread m_thread; //< std::jthread is not yet widely implemented
|
||||||
|
NAZARA_ANONYMOUS_NAMESPACE_PREFIX(Spinlock) m_queueSpinlock;
|
||||||
WorkStealingQueue<TaskScheduler::Task*, TaskScheduler::Task*> m_tasks;
|
WorkStealingQueue<TaskScheduler::Task*, TaskScheduler::Task*> m_tasks;
|
||||||
TaskScheduler& m_owner;
|
TaskScheduler& m_owner;
|
||||||
unsigned int m_workerIndex;
|
unsigned int m_workerIndex;
|
||||||
|
|
@ -214,6 +265,13 @@ namespace Nz
|
||||||
void TaskScheduler::WaitForTasks()
|
void TaskScheduler::WaitForTasks()
|
||||||
{
|
{
|
||||||
m_idle.wait(false);
|
m_idle.wait(false);
|
||||||
|
|
||||||
|
#ifdef NAZARA_WITH_TSAN
|
||||||
|
// Workaround for TSan false-positive
|
||||||
|
for (Task& task : m_tasks)
|
||||||
|
__tsan_acquire(&task);
|
||||||
|
#endif
|
||||||
|
|
||||||
m_tasks.Clear();
|
m_tasks.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,8 @@ SCENARIO("TaskScheduler", "[CORE][TaskScheduler]")
|
||||||
}
|
}
|
||||||
scheduler.WaitForTasks();
|
scheduler.WaitForTasks();
|
||||||
|
|
||||||
CHECK(count == taskCount);
|
unsigned int c = count.load(); //< load it once before checking to avoid race condition when testing and printing
|
||||||
|
CHECK(c == taskCount);
|
||||||
|
|
||||||
for (std::size_t i = 0; i < taskCount; ++i)
|
for (std::size_t i = 0; i < taskCount; ++i)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue