618 lines
17 KiB
C
618 lines
17 KiB
C
#include "work.h"
|
|
#include "intrinsics.h"
|
|
#include "sys.h"
|
|
#include "arena.h"
|
|
#include "scratch.h"
|
|
#include "memory.h"
|
|
#include "string.h"
|
|
#include "log.h"
|
|
#include "thread_local.h"
|
|
#include "atomic.h"
|
|
#include "app.h"
|
|
|
|
/* Terminology:
|
|
*
|
|
* Task: Single unit of stuff to be done (a function with a data pointer)
|
|
*
|
|
* Work: A group of tasks (doesn't have to be homogeneous) bundled together.
|
|
* Work is "complete" when all of its tasks are complete.
|
|
*
|
|
* Work Slate: A list of tasks used as a building-tool for constructing work.
|
|
*
|
|
* Worker: A thread that can do work. "work_startup" will create a certain
|
|
* amount of dedicated worker threads. Note that non-worker threads can
|
|
* also do work themselves (IE: callers of "work_wait")
|
|
*/
|
|
|
|
/* NOTE:
|
|
* Functions suffixed with "assume_locked" require `G.mutex` to be
|
|
* locked & unlocked by the caller.
|
|
*/
|
|
|
|
struct worker {
|
|
struct sys_thread thread;
|
|
struct worker *next;
|
|
};
|
|
|
|
struct work_task;
|
|
|
|
struct work {
|
|
enum work_priority priority;
|
|
enum work_status status;
|
|
u32 workers;
|
|
|
|
struct sys_condition_variable condition_variable_finished;
|
|
|
|
struct work *prev_scheduled;
|
|
struct work *next_scheduled;
|
|
struct work *next_free;
|
|
|
|
struct work_task *task_head; /* Unstarted task head */
|
|
u32 tasks_incomplete;
|
|
|
|
u64 gen;
|
|
};
|
|
|
|
struct work_task {
|
|
void *data;
|
|
work_task_func *func;
|
|
struct work *work;
|
|
|
|
struct work_task *next_in_work;
|
|
struct work_task *next_free;
|
|
};
|
|
|
|
/* ========================== *
|
|
* Global state
|
|
* ========================== */
|
|
|
|
GLOBAL struct {
|
|
struct arena arena;
|
|
|
|
b32 workers_shutdown;
|
|
struct sys_mutex mutex;
|
|
struct sys_condition_variable cv;
|
|
|
|
u32 worker_count;
|
|
u32 idle_worker_count;
|
|
struct worker *worker_head;
|
|
|
|
/* TODO: Make below pointers volatile? */
|
|
|
|
struct work_task *free_task_head;
|
|
|
|
struct work *free_work_head;
|
|
struct work *scheduled_work_head;
|
|
|
|
/* Pointers to the last piece of work of each priority in the scheduled
|
|
* work list (used for O(1) insertion) */
|
|
struct work *scheduled_work_priority_tails[NUM_WORK_PRIORITIES];
|
|
} G = { 0 }, DEBUG_ALIAS(G, G_work);
|
|
|
|
/* ========================== *
|
|
* Thread local state
|
|
* ========================== */
|
|
|
|
struct worker_ctx {
|
|
b32 is_worker;
|
|
};
|
|
|
|
GLOBAL THREAD_LOCAL_VAR_DEF(tl_worker_ctx, struct worker_ctx, NULL, NULL);
|
|
|
|
/* ========================== *
|
|
* Startup
|
|
* ========================== */
|
|
|
|
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown);
|
|
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data);
|
|
|
|
struct work_startup_receipt work_startup(u32 num_worker_threads)
|
|
{
|
|
struct temp_arena scratch = scratch_begin_no_conflict();
|
|
|
|
if (num_worker_threads <= 0) {
|
|
sys_panic(STR("Tried to start up worker pool with 0 threads"));
|
|
}
|
|
|
|
G.arena = arena_alloc(GIGABYTE(64));
|
|
G.mutex = sys_mutex_alloc();
|
|
G.cv = sys_condition_variable_alloc();
|
|
G.worker_count = num_worker_threads;
|
|
G.idle_worker_count = num_worker_threads;
|
|
app_register_exit_callback(&work_shutdown);
|
|
|
|
/* Initialize threads */
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
struct worker *prev = NULL;
|
|
for (u32 i = 0; i < num_worker_threads; ++i) {
|
|
struct string thread_name = string_format(scratch.arena,
|
|
STR("[P0] Worker %F"),
|
|
FMT_UINT(i));
|
|
|
|
struct worker *worker = arena_push_zero(&G.arena, struct worker);
|
|
worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name);
|
|
if (prev) {
|
|
prev->next = worker;
|
|
} else {
|
|
G.worker_head = worker;
|
|
}
|
|
prev = worker;
|
|
}
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
|
|
scratch_end(scratch);
|
|
|
|
return (struct work_startup_receipt) { 0 };
|
|
}
|
|
|
|
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown)
|
|
{
|
|
__prof;
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
G.workers_shutdown = true;
|
|
sys_condition_variable_broadcast(&G.cv);
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
for (struct worker *worker = G.worker_head; (worker = worker->next);) {
|
|
sys_thread_wait_release(&worker->thread);
|
|
}
|
|
}
|
|
|
|
/* ========================== *
|
|
* Internal work / task allocation
|
|
* ========================== */
|
|
|
|
INTERNAL struct work *work_alloc_assume_locked(void)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
struct work *work = NULL;
|
|
|
|
/* Allocate work */
|
|
if (G.free_work_head) {
|
|
/* Reuse from free list */
|
|
work = G.free_work_head;
|
|
G.free_work_head = work->next_free;
|
|
*work = (struct work) {
|
|
.condition_variable_finished = work->condition_variable_finished,
|
|
.gen = work->gen
|
|
};
|
|
} else {
|
|
/* Make new */
|
|
work = arena_push(&G.arena, struct work);
|
|
*work = (struct work) {
|
|
.condition_variable_finished = sys_condition_variable_alloc(),
|
|
.gen = 1
|
|
};
|
|
}
|
|
return work;
|
|
}
|
|
|
|
INTERNAL void work_release_assume_locked(struct work *work)
|
|
{
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
work->next_free = G.free_work_head;
|
|
G.free_work_head = work;
|
|
++work->gen;
|
|
}
|
|
|
|
INTERNAL struct work_handle work_to_handle_assume_locked(struct work *work)
|
|
{
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
return (struct work_handle) {
|
|
.work = work,
|
|
.gen = work->gen
|
|
};
|
|
}
|
|
|
|
INTERNAL struct work_task *task_alloc_assume_locked(void)
|
|
{
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
struct work_task *task = NULL;
|
|
|
|
/* Allocate task */
|
|
if (G.free_task_head) {
|
|
/* Reuse from free list */
|
|
task = G.free_task_head;
|
|
G.free_task_head = task->next_free;
|
|
*task = (struct work_task) { 0 };
|
|
} else {
|
|
/* Make new */
|
|
task = arena_push_zero(&G.arena, struct work_task);
|
|
}
|
|
|
|
return task;
|
|
}
|
|
|
|
INTERNAL void task_release_assume_locked(struct work_task *task)
|
|
{
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
task->next_free = G.free_task_head;
|
|
G.free_task_head = task;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Work scheduling / insertion
|
|
* ========================== */
|
|
|
|
INTERNAL void work_schedule_assume_locked(struct work *work)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
enum work_priority priority = work->priority;
|
|
|
|
if (G.scheduled_work_head) {
|
|
struct work *head = G.scheduled_work_head;
|
|
|
|
if (head->priority >= priority) {
|
|
/* Head is lower priority, insert work as new head */
|
|
G.scheduled_work_head = work;
|
|
work->next_scheduled = head;
|
|
head->prev_scheduled = work;
|
|
} else {
|
|
/* Find higher priority */
|
|
struct work *tail = NULL;
|
|
for (i32 i = priority; i >= 0; --i) {
|
|
tail = G.scheduled_work_priority_tails[i];
|
|
if (tail) {
|
|
break;
|
|
}
|
|
}
|
|
/* Hook work */
|
|
work->next_scheduled = tail->next_scheduled;
|
|
work->prev_scheduled = tail;
|
|
tail->next_scheduled = work;
|
|
}
|
|
} else {
|
|
G.scheduled_work_head = work;
|
|
}
|
|
|
|
G.scheduled_work_priority_tails[priority] = work;
|
|
|
|
WRITE_BARRIER();
|
|
for (u64 i = 0; i < work->tasks_incomplete; ++i) {
|
|
sys_condition_variable_signal(&G.cv);
|
|
}
|
|
}
|
|
|
|
INTERNAL void work_unschedule_assume_locked(struct work *work)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
|
|
struct work *prev = (struct work *)work->prev_scheduled;
|
|
struct work *next = (struct work *)work->next_scheduled;
|
|
|
|
/* Remove from priority tails array */
|
|
enum work_priority priority = work->priority;
|
|
struct work *priority_tail = G.scheduled_work_priority_tails[priority];
|
|
if (priority_tail == work && (!prev || prev->priority == priority)) {
|
|
G.scheduled_work_priority_tails[priority] = prev;
|
|
}
|
|
|
|
/* Unhook work */
|
|
if (prev) {
|
|
prev->next_scheduled = next;
|
|
}
|
|
if (next) {
|
|
next->prev_scheduled = prev;
|
|
}
|
|
if (work == G.scheduled_work_head) {
|
|
G.scheduled_work_head = next;
|
|
}
|
|
}
|
|
|
|
/* ========================== *
|
|
* Task dequeuing
|
|
* ========================== */
|
|
|
|
INTERNAL struct work_task *work_dequeue_task_assume_locked(struct work *work)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
struct work_task *task = work->task_head;
|
|
if (task) {
|
|
work->task_head = task->next_in_work;
|
|
if (!work->task_head) {
|
|
/* Unschedule work if last task */
|
|
work_unschedule_assume_locked(work);
|
|
}
|
|
}
|
|
return task;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Work doing
|
|
* ========================== */
|
|
|
|
/* NOTE: This function will release `work` there are no more tasks once completed.
|
|
* Returns `true` if work was finished & released. */
|
|
INTERNAL b32 work_exec_single_task_maybe_release_assume_locked(struct work *work)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
|
|
struct work_task *task = work_dequeue_task_assume_locked(work);
|
|
b32 more_tasks = work->task_head != NULL;
|
|
|
|
if (task) {
|
|
work->status = WORK_STATUS_IN_PROGRESS;
|
|
|
|
++work->workers;
|
|
/* Do task (temporarily unlock) */
|
|
sys_mutex_unlock(&G.mutex);
|
|
{
|
|
task->func(task->data);
|
|
}
|
|
sys_mutex_lock(&G.mutex);
|
|
--work->workers;
|
|
--work->tasks_incomplete;
|
|
task_release_assume_locked(task);
|
|
|
|
|
|
if (work->tasks_incomplete == 0) {
|
|
/* Signal finished */
|
|
work->status = WORK_STATUS_DONE;
|
|
sys_condition_variable_broadcast(&work->condition_variable_finished);
|
|
|
|
/* Release */
|
|
work_release_assume_locked(work);
|
|
}
|
|
}
|
|
|
|
return more_tasks;
|
|
}
|
|
|
|
INTERNAL void work_exec_remaining_tasks_maybe_release_assume_locked(struct work *work)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
|
|
b32 more_tasks = true;
|
|
while (more_tasks) {
|
|
more_tasks = work_exec_single_task_maybe_release_assume_locked(work);
|
|
}
|
|
|
|
}
|
|
|
|
/* ========================== *
|
|
* Work thread proc
|
|
* ========================== */
|
|
|
|
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data)
|
|
{
|
|
(UNUSED)thread_data;
|
|
|
|
struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx);
|
|
*ctx = (struct worker_ctx) {
|
|
.is_worker = true
|
|
};
|
|
|
|
while (true) {
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
/* Check for exit */
|
|
if (G.workers_shutdown) {
|
|
sys_mutex_unlock(&G.mutex);
|
|
break;
|
|
}
|
|
|
|
if (!G.scheduled_work_head) {
|
|
/* Wait for work */
|
|
sys_condition_variable_wait(&G.cv, &G.mutex);
|
|
}
|
|
|
|
/* Do work from top */
|
|
if (G.scheduled_work_head) {
|
|
struct work *work = G.scheduled_work_head;
|
|
if (work) {
|
|
__profscope(work_pool_task);
|
|
--G.idle_worker_count;
|
|
work_exec_single_task_maybe_release_assume_locked((struct work *)work);
|
|
++G.idle_worker_count;
|
|
}
|
|
}
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
}
|
|
}
|
|
|
|
/* ========================== *
|
|
* Work pushing interface
|
|
* ========================== */
|
|
|
|
/* If `help` is true, then the calling thread will start picking up tasks immediately (before other workers can see it) */
|
|
INTERNAL struct work_handle work_push_from_slate_assume_locked(struct work_slate *ws, b32 help, enum work_priority priority)
|
|
{
|
|
__prof;
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
|
|
struct work *work = work_alloc_assume_locked();
|
|
struct work_handle wh = work_to_handle_assume_locked(work);
|
|
|
|
work->priority = priority;
|
|
work->status = WORK_STATUS_IN_PROGRESS;
|
|
|
|
work->task_head = ws->task_head;
|
|
work->tasks_incomplete = ws->num_tasks;
|
|
|
|
work_schedule_assume_locked(work);
|
|
|
|
if (help) {
|
|
work_exec_remaining_tasks_maybe_release_assume_locked(work);
|
|
} else {
|
|
/* When work is submitted from a worker thread, we want the worker to pick
|
|
* up the tasks itself when idle workers = 0 and work.workers = 0
|
|
* (work.workers will always = 0 when work is first pushed).
|
|
*
|
|
* This is not ideal, however it is necessary to prevent
|
|
* a scenario in which all workers are waiting on child work to complete in
|
|
* a subtle way (IE: outside of work_wait). Since all workers are waiting,
|
|
* there would be no remaining workers to complete the child work, meaning
|
|
* there is a deadlock.
|
|
*
|
|
* By forcing workers to do their own child work, we can guarantee that this
|
|
* does not occur. However it is not ideal since it creates situations in
|
|
* which work is not done asynchronously.
|
|
*/
|
|
struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx);
|
|
if (ctx->is_worker) {
|
|
b32 work_done = false;
|
|
while (!work_done && G.idle_worker_count == 0 && work->workers == 0) {
|
|
work_done = !work_exec_single_task_maybe_release_assume_locked(work);
|
|
}
|
|
}
|
|
}
|
|
|
|
return wh;
|
|
}
|
|
|
|
INTERNAL struct work_handle work_push_task_internal(work_task_func *func, void *data, b32 help, enum work_priority priority)
|
|
{
|
|
struct work_handle handle;
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
struct work_task *task = task_alloc_assume_locked();
|
|
task->data = data;
|
|
task->func = func;
|
|
|
|
struct work_slate ws = {
|
|
.task_head = task,
|
|
.task_tail = task,
|
|
.num_tasks = 1
|
|
};
|
|
handle = work_push_from_slate_assume_locked(&ws, help, priority);
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
|
|
return handle;
|
|
}
|
|
|
|
/* Push work that contains a single task */
|
|
struct work_handle work_push_task(work_task_func *func, void *data, enum work_priority priority)
|
|
{
|
|
__prof;
|
|
struct work_handle handle = work_push_task_internal(func, data, false, priority);
|
|
return handle;
|
|
}
|
|
|
|
struct work_handle work_push_task_and_help(work_task_func *func, void *data, enum work_priority priority)
|
|
{
|
|
__prof;
|
|
struct work_handle handle = work_push_task_internal(func, data, true, priority);
|
|
return handle;
|
|
}
|
|
|
|
struct work_slate work_slate_begin(void)
|
|
{
|
|
__prof;
|
|
struct work_slate ws = { 0 };
|
|
return ws;
|
|
}
|
|
|
|
void work_slate_push_task(struct work_slate *ws, work_task_func *func, void *data)
|
|
{
|
|
__prof;
|
|
|
|
struct work_task *task = NULL;
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
task = task_alloc_assume_locked();
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
|
|
task->data = data;
|
|
task->func = func;
|
|
|
|
if (ws->task_tail) {
|
|
ws->task_tail->next_in_work = task;
|
|
} else {
|
|
ws->task_head = task;
|
|
}
|
|
ws->task_tail = task;
|
|
++ws->num_tasks;
|
|
|
|
}
|
|
|
|
/* Push work that contains multiple tasks (work slate) */
|
|
struct work_handle work_slate_end(struct work_slate *ws, enum work_priority priority)
|
|
{
|
|
__prof;
|
|
|
|
struct work_handle handle;
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
handle = work_push_from_slate_assume_locked(ws, false, priority);
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
|
|
return handle;
|
|
}
|
|
|
|
struct work_handle work_slate_end_and_help(struct work_slate *ws, enum work_priority priority)
|
|
{
|
|
__prof;
|
|
|
|
sys_mutex_lock(&G.mutex);
|
|
struct work_handle handle = work_push_from_slate_assume_locked(ws, true, priority);
|
|
sys_mutex_unlock(&G.mutex);
|
|
|
|
return handle;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Work intervention interface
|
|
* ========================== */
|
|
|
|
INTERNAL struct work *work_from_handle_assume_locked(struct work_handle handle)
|
|
{
|
|
sys_mutex_assert_locked(&G.mutex);
|
|
|
|
struct work *work = handle.work;
|
|
if (work->gen != handle.gen) {
|
|
work = NULL;
|
|
}
|
|
return work;
|
|
}
|
|
|
|
/* Wait for all tasks in work to be completed. Will also pick up any unstarted
|
|
* tasks in the work since the caller will be idle while waiting anyway. */
|
|
void work_wait(struct work_handle handle)
|
|
{
|
|
__prof;
|
|
sys_mutex_lock(&G.mutex);
|
|
{
|
|
struct work *work = work_from_handle_assume_locked(handle);
|
|
if (work) {
|
|
/* Help with tasks */
|
|
work_exec_remaining_tasks_maybe_release_assume_locked(work);
|
|
|
|
/* Wait for work completion */
|
|
work = work_from_handle_assume_locked(handle); /* Re-checking work is sitll valid here in case work_do caused work to release */
|
|
if (work) {
|
|
while (work->status != WORK_STATUS_DONE) {
|
|
sys_condition_variable_wait(&work->condition_variable_finished, &G.mutex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
sys_mutex_unlock(&G.mutex);
|
|
}
|
|
|
|
/* Try to pick up any scheduled tasks */
|
|
void work_help(struct work_handle handle)
|
|
{
|
|
__prof;
|
|
sys_mutex_lock(&G.mutex);
|
|
|
|
struct work *work = work_from_handle_assume_locked(handle);
|
|
if (work) {
|
|
work_exec_remaining_tasks_maybe_release_assume_locked(work);
|
|
}
|
|
|
|
sys_mutex_unlock(&G.mutex);
|
|
}
|