#include "work.h" #include "intrinsics.h" #include "sys.h" #include "arena.h" #include "scratch.h" #include "memory.h" #include "string.h" #include "log.h" #include "thread_local.h" #include "atomic.h" #include "app.h" /* Terminology: * * Task: Single unit of stuff to be done (a function with a data pointer) * * Work: A group of tasks (doesn't have to be homogeneous) bundled together. * Work is "complete" when all of its tasks are complete. * * Work Slate: A list of tasks used as a building-tool for constructing work. * * Worker: A thread that can do work. "work_startup" will create a certain * amount of dedicated worker threads. Note that non-worker threads can * also do work themselves (IE: callers of "work_wait") */ struct worker { struct sys_thread thread; struct worker *next; }; struct work_task; struct work { enum work_priority priority; enum work_status status; u32 workers; struct sys_condition_variable condition_variable_finished; struct work *prev_scheduled; struct work *next_scheduled; struct work *next_free; struct work_task *task_head; /* Unstarted task head */ u32 tasks_incomplete; u64 gen; }; struct work_task { void *data; work_task_func *func; struct work *work; struct work_task *next_in_work; struct work_task *next_free; }; /* ========================== * * Global state * ========================== */ GLOBAL struct { struct arena arena; b32 workers_shutdown; struct sys_mutex mutex; struct sys_condition_variable cv; u32 worker_count; u32 idle_worker_count; struct worker *worker_head; /* TODO: Make below pointers volatile? */ struct work_task *free_task_head; struct work *free_work_head; struct work *scheduled_work_head; /* Pointers to the last piece of work of each priority in the scheduled * work list (used for O(1) insertion) */ struct work *scheduled_work_priority_tails[NUM_WORK_PRIORITIES]; } G = ZI, DEBUG_ALIAS(G, G_work); /* ========================== * * Thread local state * ========================== */ struct worker_ctx { b32 is_worker; }; GLOBAL THREAD_LOCAL_VAR_DEF(tl_worker_ctx, struct worker_ctx, NULL, NULL); /* ========================== * * Startup * ========================== */ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown); INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data); struct work_startup_receipt work_startup(u32 num_worker_threads) { struct temp_arena scratch = scratch_begin_no_conflict(); if (num_worker_threads <= 0) { sys_panic(LIT("Tried to start up worker pool with 0 threads")); } G.arena = arena_alloc(GIGABYTE(64)); G.mutex = sys_mutex_alloc(); G.cv = sys_condition_variable_alloc(); G.worker_count = num_worker_threads; G.idle_worker_count = num_worker_threads; app_register_exit_callback(&work_shutdown); /* Initialize threads */ struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { struct worker *prev = NULL; for (u32 i = 0; i < num_worker_threads; ++i) { struct string thread_name = string_format(scratch.arena, LIT("[P6] Worker %F"), FMT_UINT(i)); struct worker *worker = arena_push(&G.arena, struct worker); worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name); if (prev) { prev->next = worker; } else { G.worker_head = worker; } prev = worker; } } sys_mutex_unlock(&lock); scratch_end(scratch); return (struct work_startup_receipt) { 0 }; } INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown) { __prof; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { G.workers_shutdown = true; sys_condition_variable_broadcast(&G.cv); } sys_mutex_unlock(&lock); for (struct worker *worker = G.worker_head; worker; worker = worker->next) { sys_thread_wait_release(&worker->thread); } } /* ========================== * * Internal work / task allocation * ========================== */ INTERNAL struct work *work_alloc_locked(struct sys_lock *lock) { __prof; sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; struct work *work = NULL; /* Allocate work */ if (G.free_work_head) { /* Reuse from free list */ work = G.free_work_head; G.free_work_head = work->next_free; *work = (struct work) { .condition_variable_finished = work->condition_variable_finished, .gen = work->gen + 1 }; } else { /* Make new */ work = arena_push_no_zero(&G.arena, struct work); *work = (struct work) { .condition_variable_finished = sys_condition_variable_alloc(), .gen = 1 }; } return work; } INTERNAL void work_release_locked(struct sys_lock *lock, struct work *work) { sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; work->next_free = G.free_work_head; G.free_work_head = work; ++work->gen; } INTERNAL struct work_handle work_to_handle_locked(struct sys_lock *lock, struct work *work) { sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; return (struct work_handle) { .work = work, .gen = work->gen }; } INTERNAL struct work_task *task_alloc_locked(struct sys_lock *lock) { sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; struct work_task *task = NULL; /* Allocate task */ if (G.free_task_head) { /* Reuse from free list */ task = G.free_task_head; G.free_task_head = task->next_free; *task = (struct work_task) { 0 }; } else { /* Make new */ task = arena_push(&G.arena, struct work_task); } return task; } INTERNAL void task_release_locked(struct sys_lock *lock, struct work_task *task) { sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; task->next_free = G.free_task_head; G.free_task_head = task; } /* ========================== * * Work scheduling / insertion * ========================== */ INTERNAL void work_schedule_locked(struct sys_lock *lock, struct work *work) { __prof; sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; enum work_priority priority = work->priority; if (G.scheduled_work_head) { struct work *head = G.scheduled_work_head; if (head->priority >= priority) { /* Head is lower priority, insert work as new head */ G.scheduled_work_head = work; work->next_scheduled = head; head->prev_scheduled = work; } else { /* Find higher priority */ struct work *tail = NULL; for (i32 i = priority; i >= 0; --i) { tail = G.scheduled_work_priority_tails[i]; if (tail) { break; } } /* Hook work */ work->next_scheduled = tail->next_scheduled; work->prev_scheduled = tail; tail->next_scheduled = work; } } else { G.scheduled_work_head = work; } G.scheduled_work_priority_tails[priority] = work; sys_condition_variable_signal(&G.cv, work->tasks_incomplete); } INTERNAL void work_unschedule_locked(struct sys_lock *lock, struct work *work) { __prof; sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; struct work *prev = (struct work *)work->prev_scheduled; struct work *next = (struct work *)work->next_scheduled; /* Remove from priority tails array */ enum work_priority priority = work->priority; struct work *priority_tail = G.scheduled_work_priority_tails[priority]; if (priority_tail == work && (!prev || prev->priority == priority)) { G.scheduled_work_priority_tails[priority] = prev; } /* Unhook work */ if (prev) { prev->next_scheduled = next; } if (next) { next->prev_scheduled = prev; } if (work == G.scheduled_work_head) { G.scheduled_work_head = next; } } /* ========================== * * Task dequeuing * ========================== */ INTERNAL struct work_task *work_dequeue_task_locked(struct sys_lock *lock, struct work *work) { __prof; sys_assert_locked_e(lock, &G.mutex); struct work_task *task = work->task_head; if (task) { work->task_head = task->next_in_work; if (!work->task_head) { /* Unschedule work if last task */ work_unschedule_locked(lock, work); } } return task; } /* ========================== * * Work doing * ========================== */ /* NOTE: This function will release `work` if there are no more tasks once completed. * Returns `true` if more tasks are still present in the work after completion. */ INTERNAL b32 work_exec_single_task_maybe_release_locked(struct sys_lock *lock, struct work *work) { __prof; sys_assert_locked_e(lock, &G.mutex); struct work_task *task = work_dequeue_task_locked(lock, work); b32 more_tasks = work->task_head != NULL; if (task) { work->status = WORK_STATUS_IN_PROGRESS; ++work->workers; /* Do task (temporarily unlock) */ { sys_mutex_unlock(lock); task->func(task->data); *lock = sys_mutex_lock_e(&G.mutex); } --work->workers; --work->tasks_incomplete; task_release_locked(lock, task); if (work->tasks_incomplete == 0) { /* Signal finished */ work->status = WORK_STATUS_DONE; sys_condition_variable_broadcast(&work->condition_variable_finished); /* Release */ work_release_locked(lock, work); } } return more_tasks; } INTERNAL void work_exec_remaining_tasks_maybe_release_locked(struct sys_lock *lock, struct work *work) { __prof; sys_assert_locked_e(lock, &G.mutex); b32 more_tasks = true; while (more_tasks) { more_tasks = work_exec_single_task_maybe_release_locked(lock, work); } } /* ========================== * * Work thread proc * ========================== */ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data) { (UNUSED)thread_data; struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); *ctx = (struct worker_ctx) { .is_worker = true }; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { while (!G.workers_shutdown) { struct work *work = G.scheduled_work_head; if (work) { __profscope(work_pool_task); --G.idle_worker_count; work_exec_single_task_maybe_release_locked(&lock, work); ++G.idle_worker_count; } else { sys_condition_variable_wait(&G.cv, &lock); } } } sys_mutex_unlock(&lock); } /* ========================== * * Work pushing interface * ========================== */ /* If `help` is true, then the calling thread will start picking up tasks immediately (before other workers can see it) */ INTERNAL struct work_handle work_push_from_slate_locked(struct sys_lock *lock, struct work_slate *ws, b32 help, enum work_priority priority) { __prof; sys_assert_locked_e(lock, &G.mutex); struct work *work = work_alloc_locked(lock); struct work_handle wh = work_to_handle_locked(lock, work); work->priority = priority; work->status = WORK_STATUS_IN_PROGRESS; work->task_head = ws->task_head; work->tasks_incomplete = ws->num_tasks; work_schedule_locked(lock, work); if (help) { work_exec_remaining_tasks_maybe_release_locked(lock, work); } else { /* When work is submitted from a worker thread, we want the worker to pick * up the tasks itself when idle workers = 0 and work.workers = 0 * (work.workers will always = 0 when work is first pushed). * * This is not ideal, however it is necessary to prevent * a scenario in which all workers are waiting on child work to complete in * a subtle way (IE: outside of work_wait). Since all workers are waiting, * there would be no remaining workers to complete the child work, meaning * there is a deadlock. * * By forcing workers to do their own child work, we can guarantee that this * does not occur. However it is not ideal since it creates situations in * which work is not done asynchronously. */ struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); if (ctx->is_worker) { b32 work_done = false; while (!work_done && G.idle_worker_count == 0 && work->workers == 0) { work_done = !work_exec_single_task_maybe_release_locked(lock, work); } } } return wh; } INTERNAL struct work_handle work_push_task_internal(work_task_func *func, void *data, b32 help, enum work_priority priority) { struct work_handle handle; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { struct work_task *task = task_alloc_locked(&lock); task->data = data; task->func = func; struct work_slate ws = { .task_head = task, .task_tail = task, .num_tasks = 1 }; handle = work_push_from_slate_locked(&lock, &ws, help, priority); } sys_mutex_unlock(&lock); return handle; } /* Push work that contains a single task */ struct work_handle work_push_task(work_task_func *func, void *data, enum work_priority priority) { __prof; struct work_handle handle = work_push_task_internal(func, data, false, priority); return handle; } struct work_handle work_push_task_and_help(work_task_func *func, void *data, enum work_priority priority) { __prof; struct work_handle handle = work_push_task_internal(func, data, true, priority); return handle; } struct work_slate work_slate_begin(void) { __prof; struct work_slate ws = ZI; return ws; } void work_slate_push_task(struct work_slate *ws, work_task_func *func, void *data) { __prof; struct work_task *task = NULL; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { task = task_alloc_locked(&lock); } sys_mutex_unlock(&lock); task->data = data; task->func = func; if (ws->task_tail) { ws->task_tail->next_in_work = task; } else { ws->task_head = task; } ws->task_tail = task; ++ws->num_tasks; } /* Push work that contains multiple tasks (work slate) */ struct work_handle work_slate_end(struct work_slate *ws, enum work_priority priority) { __prof; struct work_handle handle; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { handle = work_push_from_slate_locked(&lock, ws, false, priority); } sys_mutex_unlock(&lock); return handle; } struct work_handle work_slate_end_and_help(struct work_slate *ws, enum work_priority priority) { __prof; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); struct work_handle handle = work_push_from_slate_locked(&lock, ws, true, priority); sys_mutex_unlock(&lock); return handle; } /* ========================== * * Work intervention interface * ========================== */ INTERNAL struct work *work_from_handle_locked(struct sys_lock *lock, struct work_handle handle) { sys_assert_locked_e(lock, &G.mutex); (UNUSED)lock; struct work *work = handle.work; if (work->gen != handle.gen) { work = NULL; } return work; } /* Wait for all tasks in work to be completed. Will also pick up any unstarted * tasks in the work since the caller will be idle while waiting anyway. */ void work_wait(struct work_handle handle) { __prof; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { struct work *work = work_from_handle_locked(&lock, handle); if (work) { /* Help with tasks */ work_exec_remaining_tasks_maybe_release_locked(&lock, work); /* Wait for work completion */ work = work_from_handle_locked(&lock, handle); /* Re-checking work is sitll valid here in case work_exec caused work to release */ if (work) { while (work->status != WORK_STATUS_DONE) { sys_condition_variable_wait(&work->condition_variable_finished, &lock); } } } } sys_mutex_unlock(&lock); } /* Try to pick up any scheduled tasks */ void work_help(struct work_handle handle) { __prof; struct sys_lock lock = sys_mutex_lock_e(&G.mutex); { struct work *work = work_from_handle_locked(&lock, handle); if (work) { work_exec_remaining_tasks_maybe_release_locked(&lock, work); } } sys_mutex_unlock(&lock); }