power_play/src/work.c

612 lines
17 KiB
C

#include "work.h"
#include "intrinsics.h"
#include "sys.h"
#include "arena.h"
#include "scratch.h"
#include "memory.h"
#include "string.h"
#include "log.h"
#include "thread_local.h"
#include "atomic.h"
#include "app.h"
/* Terminology:
*
* Task: Single unit of stuff to be done (a function with a data pointer)
*
* Work: A group of tasks (doesn't have to be homogeneous) bundled together.
* Work is "complete" when all of its tasks are complete.
*
* Work Slate: A list of tasks used as a building-tool for constructing work.
*
* Worker: A thread that can do work. "work_startup" will create a certain
* amount of dedicated worker threads. Note that non-worker threads can
* also do work themselves (IE: callers of "work_wait")
*/
struct worker {
struct sys_thread thread;
struct worker *next;
};
struct work_task;
struct work {
enum work_priority priority;
enum work_status status;
u32 workers;
struct sys_condition_variable condition_variable_finished;
struct work *prev_scheduled;
struct work *next_scheduled;
struct work *next_free;
struct work_task *task_head; /* Unstarted task head */
u32 tasks_incomplete;
u64 gen;
};
struct work_task {
void *data;
work_task_func *func;
struct work *work;
struct work_task *next_in_work;
struct work_task *next_free;
};
/* ========================== *
* Global state
* ========================== */
GLOBAL struct {
struct arena arena;
b32 workers_shutdown;
struct sys_mutex mutex;
struct sys_condition_variable cv;
u32 worker_count;
u32 idle_worker_count;
struct worker *worker_head;
/* TODO: Make below pointers volatile? */
struct work_task *free_task_head;
struct work *free_work_head;
struct work *scheduled_work_head;
/* Pointers to the last piece of work of each priority in the scheduled
* work list (used for O(1) insertion) */
struct work *scheduled_work_priority_tails[NUM_WORK_PRIORITIES];
} G = ZI, DEBUG_ALIAS(G, G_work);
/* ========================== *
* Thread local state
* ========================== */
struct worker_ctx {
b32 is_worker;
};
GLOBAL THREAD_LOCAL_VAR_DEF(tl_worker_ctx, struct worker_ctx, NULL, NULL);
/* ========================== *
* Startup
* ========================== */
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown);
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data);
struct work_startup_receipt work_startup(u32 num_worker_threads)
{
struct temp_arena scratch = scratch_begin_no_conflict();
if (num_worker_threads <= 0) {
sys_panic(LIT("Tried to start up worker pool with 0 threads"));
}
G.arena = arena_alloc(GIGABYTE(64));
G.mutex = sys_mutex_alloc();
G.cv = sys_condition_variable_alloc();
G.worker_count = num_worker_threads;
G.idle_worker_count = num_worker_threads;
app_register_exit_callback(&work_shutdown);
/* Initialize threads */
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
struct worker *prev = NULL;
for (u32 i = 0; i < num_worker_threads; ++i) {
struct string thread_name = string_format(scratch.arena,
LIT("[P0] Worker %F"),
FMT_UINT(i));
struct worker *worker = arena_push_zero(&G.arena, struct worker);
worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name);
if (prev) {
prev->next = worker;
} else {
G.worker_head = worker;
}
prev = worker;
}
}
sys_mutex_unlock(&lock);
scratch_end(scratch);
return (struct work_startup_receipt) { 0 };
}
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown)
{
__prof;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
G.workers_shutdown = true;
sys_condition_variable_broadcast(&G.cv);
}
sys_mutex_unlock(&lock);
for (struct worker *worker = G.worker_head; worker; worker = worker->next) {
sys_thread_wait_release(&worker->thread);
}
}
/* ========================== *
* Internal work / task allocation
* ========================== */
INTERNAL struct work *work_alloc_locked(struct sys_lock *lock)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
struct work *work = NULL;
/* Allocate work */
if (G.free_work_head) {
/* Reuse from free list */
work = G.free_work_head;
G.free_work_head = work->next_free;
*work = (struct work) {
.condition_variable_finished = work->condition_variable_finished,
.gen = work->gen + 1
};
} else {
/* Make new */
work = arena_push(&G.arena, struct work);
*work = (struct work) {
.condition_variable_finished = sys_condition_variable_alloc(),
.gen = 1
};
}
return work;
}
INTERNAL void work_release_locked(struct sys_lock *lock, struct work *work)
{
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
work->next_free = G.free_work_head;
G.free_work_head = work;
++work->gen;
}
INTERNAL struct work_handle work_to_handle_locked(struct sys_lock *lock, struct work *work)
{
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
return (struct work_handle) {
.work = work,
.gen = work->gen
};
}
INTERNAL struct work_task *task_alloc_locked(struct sys_lock *lock)
{
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
struct work_task *task = NULL;
/* Allocate task */
if (G.free_task_head) {
/* Reuse from free list */
task = G.free_task_head;
G.free_task_head = task->next_free;
*task = (struct work_task) { 0 };
} else {
/* Make new */
task = arena_push_zero(&G.arena, struct work_task);
}
return task;
}
INTERNAL void task_release_locked(struct sys_lock *lock, struct work_task *task)
{
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
task->next_free = G.free_task_head;
G.free_task_head = task;
}
/* ========================== *
* Work scheduling / insertion
* ========================== */
INTERNAL void work_schedule_locked(struct sys_lock *lock, struct work *work)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
enum work_priority priority = work->priority;
if (G.scheduled_work_head) {
struct work *head = G.scheduled_work_head;
if (head->priority >= priority) {
/* Head is lower priority, insert work as new head */
G.scheduled_work_head = work;
work->next_scheduled = head;
head->prev_scheduled = work;
} else {
/* Find higher priority */
struct work *tail = NULL;
for (i32 i = priority; i >= 0; --i) {
tail = G.scheduled_work_priority_tails[i];
if (tail) {
break;
}
}
/* Hook work */
work->next_scheduled = tail->next_scheduled;
work->prev_scheduled = tail;
tail->next_scheduled = work;
}
} else {
G.scheduled_work_head = work;
}
G.scheduled_work_priority_tails[priority] = work;
sys_condition_variable_signal(&G.cv, work->tasks_incomplete);
}
INTERNAL void work_unschedule_locked(struct sys_lock *lock, struct work *work)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
struct work *prev = (struct work *)work->prev_scheduled;
struct work *next = (struct work *)work->next_scheduled;
/* Remove from priority tails array */
enum work_priority priority = work->priority;
struct work *priority_tail = G.scheduled_work_priority_tails[priority];
if (priority_tail == work && (!prev || prev->priority == priority)) {
G.scheduled_work_priority_tails[priority] = prev;
}
/* Unhook work */
if (prev) {
prev->next_scheduled = next;
}
if (next) {
next->prev_scheduled = prev;
}
if (work == G.scheduled_work_head) {
G.scheduled_work_head = next;
}
}
/* ========================== *
* Task dequeuing
* ========================== */
INTERNAL struct work_task *work_dequeue_task_locked(struct sys_lock *lock, struct work *work)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
struct work_task *task = work->task_head;
if (task) {
work->task_head = task->next_in_work;
if (!work->task_head) {
/* Unschedule work if last task */
work_unschedule_locked(lock, work);
}
}
return task;
}
/* ========================== *
* Work doing
* ========================== */
/* NOTE: This function will release `work` if there are no more tasks once completed.
* Returns `true` if more tasks are still present in the work after completion. */
INTERNAL b32 work_exec_single_task_maybe_release_locked(struct sys_lock *lock, struct work *work)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
struct work_task *task = work_dequeue_task_locked(lock, work);
b32 more_tasks = work->task_head != NULL;
if (task) {
work->status = WORK_STATUS_IN_PROGRESS;
++work->workers;
/* Do task (temporarily unlock) */
{
sys_mutex_unlock(lock);
task->func(task->data);
*lock = sys_mutex_lock_e(&G.mutex);
}
--work->workers;
--work->tasks_incomplete;
task_release_locked(lock, task);
if (work->tasks_incomplete == 0) {
/* Signal finished */
work->status = WORK_STATUS_DONE;
sys_condition_variable_broadcast(&work->condition_variable_finished);
/* Release */
work_release_locked(lock, work);
}
}
return more_tasks;
}
INTERNAL void work_exec_remaining_tasks_maybe_release_locked(struct sys_lock *lock, struct work *work)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
b32 more_tasks = true;
while (more_tasks) {
more_tasks = work_exec_single_task_maybe_release_locked(lock, work);
}
}
/* ========================== *
* Work thread proc
* ========================== */
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data)
{
(UNUSED)thread_data;
struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx);
*ctx = (struct worker_ctx) {
.is_worker = true
};
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
while (!G.workers_shutdown) {
struct work *work = G.scheduled_work_head;
if (work) {
__profscope(work_pool_task);
--G.idle_worker_count;
work_exec_single_task_maybe_release_locked(&lock, work);
++G.idle_worker_count;
} else {
sys_condition_variable_wait(&G.cv, &lock);
}
}
}
sys_mutex_unlock(&lock);
}
/* ========================== *
* Work pushing interface
* ========================== */
/* If `help` is true, then the calling thread will start picking up tasks immediately (before other workers can see it) */
INTERNAL struct work_handle work_push_from_slate_locked(struct sys_lock *lock, struct work_slate *ws, b32 help, enum work_priority priority)
{
__prof;
sys_assert_locked_e(lock, &G.mutex);
struct work *work = work_alloc_locked(lock);
struct work_handle wh = work_to_handle_locked(lock, work);
work->priority = priority;
work->status = WORK_STATUS_IN_PROGRESS;
work->task_head = ws->task_head;
work->tasks_incomplete = ws->num_tasks;
work_schedule_locked(lock, work);
if (help) {
work_exec_remaining_tasks_maybe_release_locked(lock, work);
} else {
/* When work is submitted from a worker thread, we want the worker to pick
* up the tasks itself when idle workers = 0 and work.workers = 0
* (work.workers will always = 0 when work is first pushed).
*
* This is not ideal, however it is necessary to prevent
* a scenario in which all workers are waiting on child work to complete in
* a subtle way (IE: outside of work_wait). Since all workers are waiting,
* there would be no remaining workers to complete the child work, meaning
* there is a deadlock.
*
* By forcing workers to do their own child work, we can guarantee that this
* does not occur. However it is not ideal since it creates situations in
* which work is not done asynchronously.
*/
struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx);
if (ctx->is_worker) {
b32 work_done = false;
while (!work_done && G.idle_worker_count == 0 && work->workers == 0) {
work_done = !work_exec_single_task_maybe_release_locked(lock, work);
}
}
}
return wh;
}
INTERNAL struct work_handle work_push_task_internal(work_task_func *func, void *data, b32 help, enum work_priority priority)
{
struct work_handle handle;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
struct work_task *task = task_alloc_locked(&lock);
task->data = data;
task->func = func;
struct work_slate ws = {
.task_head = task,
.task_tail = task,
.num_tasks = 1
};
handle = work_push_from_slate_locked(&lock, &ws, help, priority);
}
sys_mutex_unlock(&lock);
return handle;
}
/* Push work that contains a single task */
struct work_handle work_push_task(work_task_func *func, void *data, enum work_priority priority)
{
__prof;
struct work_handle handle = work_push_task_internal(func, data, false, priority);
return handle;
}
struct work_handle work_push_task_and_help(work_task_func *func, void *data, enum work_priority priority)
{
__prof;
struct work_handle handle = work_push_task_internal(func, data, true, priority);
return handle;
}
struct work_slate work_slate_begin(void)
{
__prof;
struct work_slate ws = ZI;
return ws;
}
void work_slate_push_task(struct work_slate *ws, work_task_func *func, void *data)
{
__prof;
struct work_task *task = NULL;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
task = task_alloc_locked(&lock);
}
sys_mutex_unlock(&lock);
task->data = data;
task->func = func;
if (ws->task_tail) {
ws->task_tail->next_in_work = task;
} else {
ws->task_head = task;
}
ws->task_tail = task;
++ws->num_tasks;
}
/* Push work that contains multiple tasks (work slate) */
struct work_handle work_slate_end(struct work_slate *ws, enum work_priority priority)
{
__prof;
struct work_handle handle;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
handle = work_push_from_slate_locked(&lock, ws, false, priority);
}
sys_mutex_unlock(&lock);
return handle;
}
struct work_handle work_slate_end_and_help(struct work_slate *ws, enum work_priority priority)
{
__prof;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
struct work_handle handle = work_push_from_slate_locked(&lock, ws, true, priority);
sys_mutex_unlock(&lock);
return handle;
}
/* ========================== *
* Work intervention interface
* ========================== */
INTERNAL struct work *work_from_handle_locked(struct sys_lock *lock, struct work_handle handle)
{
sys_assert_locked_e(lock, &G.mutex);
(UNUSED)lock;
struct work *work = handle.work;
if (work->gen != handle.gen) {
work = NULL;
}
return work;
}
/* Wait for all tasks in work to be completed. Will also pick up any unstarted
* tasks in the work since the caller will be idle while waiting anyway. */
void work_wait(struct work_handle handle)
{
__prof;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
struct work *work = work_from_handle_locked(&lock, handle);
if (work) {
/* Help with tasks */
work_exec_remaining_tasks_maybe_release_locked(&lock, work);
/* Wait for work completion */
work = work_from_handle_locked(&lock, handle); /* Re-checking work is sitll valid here in case work_exec caused work to release */
if (work) {
while (work->status != WORK_STATUS_DONE) {
sys_condition_variable_wait(&work->condition_variable_finished, &lock);
}
}
}
}
sys_mutex_unlock(&lock);
}
/* Try to pick up any scheduled tasks */
void work_help(struct work_handle handle)
{
__prof;
struct sys_lock lock = sys_mutex_lock_e(&G.mutex);
{
struct work *work = work_from_handle_locked(&lock, handle);
if (work) {
work_exec_remaining_tasks_maybe_release_locked(&lock, work);
}
}
sys_mutex_unlock(&lock);
}