Core/TaskScheduler: Rework using atomics and WorkStealingQueue

This commit is contained in:
SirLynix
2024-02-02 14:27:18 +01:00
parent 5db0c4ed09
commit 7f1ef0fe41
4 changed files with 310 additions and 85 deletions

View File

@@ -6,10 +6,10 @@
#include <Nazara/Core/Core.hpp>
#include <Nazara/Core/ThreadExt.hpp>
#include <NazaraUtils/StackArray.hpp>
#include <condition_variable>
#include <mutex>
#include <wsq.hpp>
#include <new>
#include <random>
#include <semaphore>
#include <thread>
#include <Nazara/Core/Debug.hpp>
@@ -53,30 +53,28 @@ namespace Nz
~Worker()
{
m_running = false;
m_conditionVariable.notify_one();
if (!m_notifier.test_and_set())
m_notifier.notify_one();
m_thread.join();
}
bool AddTask(Task&& task)
void AddTask(TaskScheduler::Task* task)
{
std::unique_lock lock(m_mutex, std::defer_lock);
if (!lock.try_lock())
return false;
m_tasks.push_back(std::move(task));
lock.unlock();
m_conditionVariable.notify_one();
return true;
m_tasks.push(task);
if (!m_notifier.test_and_set())
m_notifier.notify_one();
}
void Run()
{
bool idle = true;
m_notifier.wait(false); // wait until task scheduler finishes initializing
StackArray<unsigned int> randomWorkerIndices = NazaraStackArrayNoInit(unsigned int, m_owner.GetWorkerCount() - 1);
{
unsigned int* indexPtr = randomWorkerIndices.data();
for (unsigned int i = 0; i < randomWorkerIndices.size(); ++i)
for (unsigned int i = 0; i < m_owner.GetWorkerCount(); ++i)
{
if (i != m_workerIndex)
*indexPtr++ = i;
@@ -86,27 +84,9 @@ namespace Nz
std::shuffle(randomWorkerIndices.begin(), randomWorkerIndices.end(), gen);
}
bool idle = true;
for (;;)
do
{
std::unique_lock lock(m_mutex);
// Wait for tasks if we don't have any right now
if (m_tasks.empty())
{
if (!idle)
{
m_owner.NotifyWorkerIdle();
idle = true;
}
m_conditionVariable.wait(lock, [this] { return !m_running || !m_tasks.empty(); });
}
if (!m_running)
break;
auto ExecuteTask = [&](TaskScheduler::Task& task)
auto ExecuteTask = [&](TaskScheduler::Task* task)
{
if (idle)
{
@@ -114,50 +94,44 @@ namespace Nz
idle = false;
}
task();
(*task)();
};
if (!m_tasks.empty())
{
TaskScheduler::Task task = std::move(m_tasks.front());
m_tasks.erase(m_tasks.begin());
lock.unlock();
ExecuteTask(task);
}
// Wait for tasks if we don't have any right now
std::optional<TaskScheduler::Task*> task = m_tasks.pop();
if (task)
ExecuteTask(*task);
else
{
lock.unlock();
// Try to steal a task from another worker in a random order to avoid lock contention
TaskScheduler::Task task;
// Try to steal a task from another worker in a random order to avoid contention
for (unsigned int workerIndex : randomWorkerIndices)
{
if (m_owner.GetWorker(workerIndex).StealTask(&task))
if (task = m_owner.GetWorker(workerIndex).StealTask())
{
ExecuteTask(task);
ExecuteTask(*task);
break;
}
}
}
// Note: it's possible for a thread to reach this point without executing a task (for example if another worker stole its only remaining task)
if (!task)
{
if (!idle)
{
m_owner.NotifyWorkerIdle();
idle = true;
}
m_notifier.wait(false);
m_notifier.clear();
}
}
}
while (m_running);
}
bool StealTask(TaskScheduler::Task* task)
std::optional<TaskScheduler::Task*> StealTask()
{
std::unique_lock lock(m_mutex, std::defer_lock);
if (!lock.try_lock())
return false;
if (m_tasks.empty())
return false;
*task = std::move(m_tasks.front());
m_tasks.erase(m_tasks.begin());
return true;
return m_tasks.steal();
}
Worker& operator=(const Worker& worker) = delete;
@@ -169,10 +143,9 @@ namespace Nz
private:
std::atomic_bool m_running;
std::condition_variable m_conditionVariable;
std::mutex m_mutex;
std::atomic_flag m_notifier;
std::thread m_thread;
std::vector<TaskScheduler::Task> m_tasks;
WorkStealingQueue<TaskScheduler::Task*> m_tasks;
TaskScheduler& m_owner;
unsigned int m_workerIndex;
};
@@ -181,15 +154,17 @@ namespace Nz
TaskScheduler::TaskScheduler(unsigned int workerCount) :
m_idle(true),
m_nextWorkerIndex(0)
m_nextWorkerIndex(0),
m_tasks(256 * sizeof(Task)),
m_workerCount(workerCount)
{
if (workerCount == 0)
workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u);
if (m_workerCount == 0)
m_workerCount = std::max(Core::Instance()->GetHardwareInfo().GetCpuThreadCount(), 1u);
m_idleWorkerCount = workerCount;
m_idleWorkerCount = m_workerCount;
m_workers.reserve(workerCount);
for (unsigned int i = 0; i < workerCount; ++i)
m_workers.reserve(m_workerCount);
for (unsigned int i = 0; i < m_workerCount; ++i)
m_workers.emplace_back(*this, i);
}
@@ -202,25 +177,19 @@ namespace Nz
{
m_idle = false;
for (;;)
{
Worker& randomWorker = m_workers[m_nextWorkerIndex];
if (randomWorker.AddTask(std::move(task)))
break;
std::size_t taskIndex; //< not used
if (++m_nextWorkerIndex >= m_workers.size())
m_nextWorkerIndex = 0;
}
}
Worker& worker = m_workers[m_nextWorkerIndex++];
worker.AddTask(m_tasks.Allocate(taskIndex, std::move(task)));
unsigned int TaskScheduler::GetWorkerCount() const
{
return static_cast<unsigned int>(m_workers.size());
if (m_nextWorkerIndex >= m_workers.size())
m_nextWorkerIndex = 0;
}
void TaskScheduler::WaitForTasks()
{
m_idle.wait(false);
m_tasks.Clear();
}
auto TaskScheduler::GetWorker(unsigned int workerIndex) -> Worker&