power_play/src/work.c

607 lines
16 KiB
C

#include "work.h"
#include "intrinsics.h"
#include "sys.h"
#include "arena.h"
#include "scratch.h"
#include "memory.h"
#include "string.h"
#include "log.h"
/* Terminology:
*
* Task: Single unit of stuff to be done (a function with a data pointer)
*
* Work: A group of tasks (doesn't have to be homogeneous) bundled together.
* Work is "complete" when all of its tasks are complete.
*
* Work Slate: A list of tasks used as a building-tool for constructing work.
*
* Worker: A thread that can do work. "work_startup" will create a certain
* amount of dedicated worker threads. Note that non-worker threads can
* also do work themselves (IE: callers of "work_wait")
*/
/* NOTE:
* Functions suffixed with "assume_locked" require `L.mutex` to be
* locked & unlocked by the caller.
*/
struct worker {
struct sys_thread thread;
struct worker *next;
};
struct work_task;
struct work {
enum work_priority priority;
enum work_status status;
u32 workers;
struct sys_condition_variable condition_variable_finished;
struct work *prev_scheduled;
struct work *next_scheduled;
struct work *next_free;
struct work_task *task_head; /* Unstarted task head */
u32 tasks_incomplete;
u64 gen;
};
struct work_task {
void *data;
work_task_func *func;
struct work *work;
struct work_task *next_in_work;
struct work_task *next_free;
};
/* ========================== *
* Global state
* ========================== */
GLOBAL struct {
b32 shutdown;
struct arena arena;
struct sys_mutex mutex;
struct sys_semaphore semaphore;
u32 worker_count;
u32 idle_worker_count;
struct worker *worker_head;
/* TODO: Make below pointers volatile? */
struct work_task *free_task_head;
struct work *free_work_head;
struct work *scheduled_work_head;
/* Pointers to the last piece of work of each priority in the scheduled
* work list (used for O(1) insertion) */
struct work *scheduled_work_priority_tails[NUM_WORK_PRIORITIES];
} L = { 0 }, DEBUG_LVAR(L_work);
INTERNAL void worker_thread_entry_point(void *thread_data);
/* ========================== *
* Startup
* ========================== */
void work_startup(u32 num_worker_threads)
{
struct temp_arena scratch = scratch_begin_no_conflict();
if (num_worker_threads <= 0) {
sys_panic(STR("Tried to start up worker pool with 0 threads"));
}
L.arena = arena_alloc(GIGABYTE(64));
L.mutex = sys_mutex_alloc();
L.semaphore = sys_semaphore_alloc(num_worker_threads);
L.worker_count = num_worker_threads;
L.idle_worker_count = num_worker_threads;
/* Initialize threads */
{
sys_mutex_lock(&L.mutex);
{
struct worker *prev = NULL;
for (u32 i = 0; i < num_worker_threads; ++i) {
struct string thread_name = string_format(scratch.arena,
STR("[P0] Worker %F"),
FMT_UINT(i));
struct worker *worker = arena_push_zero(&L.arena, struct worker);
worker->thread = sys_thread_init(&worker_thread_entry_point, NULL, thread_name);
if (prev) {
prev->next = worker;
} else {
L.worker_head = worker;
}
prev = worker;
}
}
sys_mutex_unlock(&L.mutex);
}
scratch_end(scratch);
}
void work_shutdown(void)
{
L.shutdown = true;
WRITE_BARRIER();
sys_semaphore_signal(&L.semaphore, L.worker_count);
for (struct worker *worker = L.worker_head; (worker = worker->next);) {
sys_thread_join(&worker->thread);
}
}
/* ========================== *
* Internal work / task allocation
* ========================== */
INTERNAL struct work *work_alloc_assume_locked(void)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
struct work *work = NULL;
/* Allocate work */
if (L.free_work_head) {
/* Reuse from free list */
work = L.free_work_head;
L.free_work_head = work->next_free;
*work = (struct work) {
.condition_variable_finished = work->condition_variable_finished,
.gen = work->gen
};
} else {
/* Make new */
work = arena_push(&L.arena, struct work);
*work = (struct work) {
.condition_variable_finished = sys_condition_variable_alloc(),
.gen = 1
};
}
return work;
}
INTERNAL void work_release_assume_locked(struct work *work)
{
sys_mutex_assert_locked(&L.mutex);
work->next_free = L.free_work_head;
L.free_work_head = work;
++work->gen;
}
INTERNAL struct work_handle work_to_handle_assume_locked(struct work *work)
{
sys_mutex_assert_locked(&L.mutex);
return (struct work_handle) {
.work = work,
.gen = work->gen
};
}
INTERNAL struct work_task *task_alloc_assume_locked(void)
{
sys_mutex_assert_locked(&L.mutex);
struct work_task *task = NULL;
/* Allocate task */
if (L.free_task_head) {
/* Reuse from free list */
task = L.free_task_head;
L.free_task_head = task->next_free;
*task = (struct work_task) { 0 };
} else {
/* Make new */
task = arena_push_zero(&L.arena, struct work_task);
}
return task;
}
INTERNAL void task_release_assume_locked(struct work_task *task)
{
sys_mutex_assert_locked(&L.mutex);
task->next_free = L.free_task_head;
L.free_task_head = task;
}
/* ========================== *
* Work scheduling / insertion
* ========================== */
INTERNAL void work_schedule_assume_locked(struct work *work)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
enum work_priority priority = work->priority;
if (L.scheduled_work_head) {
struct work *head = L.scheduled_work_head;
if (head->priority >= priority) {
/* Head is lower priority, insert work as new head */
L.scheduled_work_head = work;
work->next_scheduled = head;
head->prev_scheduled = work;
} else {
/* Find higher priority */
struct work *tail = NULL;
for (i32 i = priority; i >= 0; --i) {
tail = L.scheduled_work_priority_tails[i];
if (tail) {
break;
}
}
/* Hook work */
work->next_scheduled = tail->next_scheduled;
work->prev_scheduled = tail;
tail->next_scheduled = work;
}
} else {
L.scheduled_work_head = work;
}
L.scheduled_work_priority_tails[priority] = work;
WRITE_BARRIER();
sys_semaphore_signal(&L.semaphore, min_u32(work->tasks_incomplete, L.worker_count));
}
INTERNAL void work_unschedule_assume_locked(struct work *work)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
struct work *prev = (struct work *)work->prev_scheduled;
struct work *next = (struct work *)work->next_scheduled;
/* Remove from priority tails array */
enum work_priority priority = work->priority;
struct work *priority_tail = L.scheduled_work_priority_tails[priority];
if (priority_tail == work && (!prev || prev->priority == priority)) {
L.scheduled_work_priority_tails[priority] = prev;
}
/* Unhook work */
if (prev) {
prev->next_scheduled = next;
}
if (next) {
next->prev_scheduled = prev;
}
if (work == L.scheduled_work_head) {
L.scheduled_work_head = next;
}
}
/* ========================== *
* Task dequeuing
* ========================== */
INTERNAL struct work_task *work_dequeue_task_assume_locked(struct work *work)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
struct work_task *task = work->task_head;
if (task) {
work->task_head = task->next_in_work;
if (!work->task_head) {
/* Unschedule work if last task */
work_unschedule_assume_locked(work);
}
}
return task;
}
/* ========================== *
* Work doing
* ========================== */
/* NOTE: This function will release `work` there are no more tasks once completed.
* Returns `true` if work was finished & released. */
INTERNAL b32 work_exec_single_task_maybe_release_assume_locked(struct work *work)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
struct work_task *task = work_dequeue_task_assume_locked(work);
b32 more_tasks = work->task_head != NULL;
if (task) {
work->status = WORK_STATUS_IN_PROGRESS;
++work->workers;
/* Do task (temporarily unlock) */
sys_mutex_unlock(&L.mutex);
{
task->func(task->data);
}
sys_mutex_lock(&L.mutex);
--work->workers;
--work->tasks_incomplete;
task_release_assume_locked(task);
if (work->tasks_incomplete == 0) {
/* Signal finished */
work->status = WORK_STATUS_DONE;
sys_condition_variable_signal(&work->condition_variable_finished);
/* Release */
work_release_assume_locked(work);
}
}
return more_tasks;
}
INTERNAL void work_exec_remaining_tasks_maybe_release_assume_locked(struct work *work)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
b32 more_tasks = true;
while (more_tasks) {
more_tasks = work_exec_single_task_maybe_release_assume_locked(work);
}
}
/* ========================== *
* Work thread proc
* ========================== */
INTERNAL void worker_thread_entry_point(void *thread_data)
{
(UNUSED)thread_data;
struct worker_context *ctx = sys_thread_get_worker_context();
*ctx = (struct worker_context) {
.is_worker = true
};
while (true) {
sys_semaphore_wait(&L.semaphore);
if (L.shutdown) {
/* Exit thread */
break;
}
while (L.scheduled_work_head) {
/* Do work from top */
sys_mutex_lock(&L.mutex);
{
struct work *work = L.scheduled_work_head;
if (work) {
__profscope(work_pool_task);
--L.idle_worker_count;
work_exec_single_task_maybe_release_assume_locked((struct work *)work);
++L.idle_worker_count;
}
}
sys_mutex_unlock(&L.mutex);
}
}
}
/* ========================== *
* Work pushing interface
* ========================== */
/* If `help` is true, then the calling thread will start picking up tasks immediately (before other workers can see it) */
INTERNAL struct work_handle work_push_from_slate_assume_locked(struct work_slate *ws, b32 help, enum work_priority priority)
{
__prof;
sys_mutex_assert_locked(&L.mutex);
struct work *work = work_alloc_assume_locked();
struct work_handle wh = work_to_handle_assume_locked(work);
work->priority = priority;
work->status = WORK_STATUS_IN_PROGRESS;
work->task_head = ws->task_head;
work->tasks_incomplete = ws->num_tasks;
work_schedule_assume_locked(work);
if (help) {
work_exec_remaining_tasks_maybe_release_assume_locked(work);
} else {
/* When work is submitted from a worker thread, we want the worker to pick
* up the tasks itself when idle workers = 0 and work.workers = 0
* (work.workers will always = 0 when work is first pushed).
*
* This is not ideal, however it is necessary to prevent
* a scenario in which all workers are waiting on child work to complete in
* a subtle way (IE: outside of work_wait). Since all workers are waiting,
* there would be no remaining workers to complete the child work, meaning
* there is a deadlock.
*
* By forcing workers to do their own child work, we can guarantee that this
* does not occur. However it is not ideal since it creates situations in
* which work is not done asynchronously.
*/
struct worker_context *ctx = sys_thread_get_worker_context();
if (ctx->is_worker) {
b32 more_tasks = true;
while (L.idle_worker_count == 0 && work->workers == 0 && more_tasks) {
more_tasks = work_exec_single_task_maybe_release_assume_locked(work);
}
}
}
return wh;
}
INTERNAL struct work_handle work_push_task_internal(work_task_func *func, void *data, b32 help, enum work_priority priority)
{
struct work_handle handle;
sys_mutex_lock(&L.mutex);
{
struct work_task *task = task_alloc_assume_locked();
task->data = data;
task->func = func;
struct work_slate ws = {
.task_head = task,
.task_tail = task,
.num_tasks = 1
};
handle = work_push_from_slate_assume_locked(&ws, help, priority);
}
sys_mutex_unlock(&L.mutex);
return handle;
}
/* Push work that contains a single task */
struct work_handle work_push_task(work_task_func *func, void *data, enum work_priority priority)
{
__prof;
struct work_handle handle = work_push_task_internal(func, data, false, priority);
return handle;
}
struct work_handle work_push_task_and_help(work_task_func *func, void *data, enum work_priority priority)
{
__prof;
struct work_handle handle = work_push_task_internal(func, data, true, priority);
return handle;
}
struct work_slate work_slate_begin(void)
{
__prof;
struct work_slate ws = { 0 };
return ws;
}
void work_slate_push_task(struct work_slate *ws, work_task_func *func, void *data)
{
__prof;
struct work_task *task = NULL;
sys_mutex_lock(&L.mutex);
{
task = task_alloc_assume_locked();
}
sys_mutex_unlock(&L.mutex);
task->data = data;
task->func = func;
if (ws->task_tail) {
ws->task_tail->next_in_work = task;
} else {
ws->task_head = task;
}
ws->task_tail = task;
++ws->num_tasks;
}
/* Push work that contains multiple tasks (work slate) */
struct work_handle work_slate_end(struct work_slate *ws, enum work_priority priority)
{
__prof;
struct work_handle handle;
sys_mutex_lock(&L.mutex);
{
handle = work_push_from_slate_assume_locked(ws, false, priority);
}
sys_mutex_unlock(&L.mutex);
return handle;
}
struct work_handle work_slate_end_and_help(struct work_slate *ws, enum work_priority priority)
{
__prof;
sys_mutex_lock(&L.mutex);
struct work_handle handle = work_push_from_slate_assume_locked(ws, true, priority);
sys_mutex_unlock(&L.mutex);
return handle;
}
/* ========================== *
* Work intervention interface
* ========================== */
INTERNAL struct work *work_from_handle_assume_locked(struct work_handle handle)
{
sys_mutex_assert_locked(&L.mutex);
struct work *work = handle.work;
if (work->gen != handle.gen) {
work = NULL;
}
return work;
}
/* Wait for all tasks in work to be completed. Will also pick up any unstarted
* tasks in the work since the caller will be idle while waiting anyway. */
void work_wait(struct work_handle handle)
{
__prof;
sys_mutex_lock(&L.mutex);
{
struct work *work = work_from_handle_assume_locked(handle);
if (work) {
/* Help with tasks */
work_exec_remaining_tasks_maybe_release_assume_locked(work);
/* Wait for work completion */
work = work_from_handle_assume_locked(handle); /* Re-checking work is sitll valid here in case work_do caused work to release */
if (work) {
while (work->status != WORK_STATUS_DONE) {
sys_condition_variable_wait(&work->condition_variable_finished, &L.mutex);
}
}
}
}
sys_mutex_unlock(&L.mutex);
}
/* Try to pick up any scheduled tasks */
void work_help(struct work_handle handle)
{
__prof;
sys_mutex_lock(&L.mutex);
struct work *work = work_from_handle_assume_locked(handle);
if (work) {
work_exec_remaining_tasks_maybe_release_assume_locked(work);
}
sys_mutex_unlock(&L.mutex);
}
/* ========================== *
* Worker context interface
* ========================== */
struct worker_context worker_context_alloc(void)
{
return (struct worker_context) { 0 };
}
void worker_context_release(struct worker_context *ctx)
{
/* Do nothing */
(UNUSED)ctx;
}