#include "job.h" #include "sys.h" #include "arena.h" #include "atomic.h" #include "string.h" #include "scratch.h" #include "app.h" #include "intrinsics.h" #if 0 /* FIXME: Remove this (replace with sys_ wrappers) */ #include #endif struct worker_job { struct worker_job_queue *queue; i32 num_workers; i32 num_dispatched; i32 pinned_worker_id; i32 count; job_func *func; void *sig; struct atomic_u64 gen; struct sys_mutex *gen_cv_mutex; struct sys_condition_variable *gen_cv; struct worker_job *prev; struct worker_job *next; struct worker_job *next_free; }; struct worker_job_queue { struct worker_job *first; struct worker_job *last; }; /* ========================== * * Global state * ========================== */ struct worker_info { i32 id; }; GLOBAL struct { struct arena *arena; struct atomic_i32 atomic_lock; struct worker_job *first_free_job; struct worker_job_queue global_queue; struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; u64 workers_wake_gen; struct sys_condition_variable *workers_wake_cv; struct sys_mutex *workers_wake_mutex; struct atomic_i32 num_idle_worker_threads; i32 num_worker_threads; b32 workers_shutdown; struct sys_thread *worker_threads[JOB_MAX_WORKERS]; } G = ZI, DEBUG_ALIAS(G, G_job); /* ========================== * * Startup * ========================== */ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg); INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown); void job_startup(i32 num_workers, struct string *worker_names) { __prof; struct arena_temp scratch = scratch_begin_no_conflict(); G.arena = arena_alloc(GIGABYTE(64)); G.workers_wake_cv = sys_condition_variable_alloc(); G.workers_wake_mutex = sys_mutex_alloc(); if (num_workers < JOB_MIN_WORKERS || num_workers > JOB_MAX_WORKERS) { /* Invalid worker count */ ASSERT(false); } G.num_worker_threads = num_workers; for (i32 i = 0; i < G.num_worker_threads; ++i) { struct string name = worker_names[i]; G.worker_threads[i] = sys_thread_alloc(worker_thread_entry_point, (void *)(i64)i, name); } atomic_i32_eval_exchange(&G.num_idle_worker_threads, num_workers); app_register_exit_callback(job_shutdown); scratch_end(scratch); } INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) { __prof; { struct sys_lock lock = sys_mutex_lock_e(G.workers_wake_mutex); G.workers_shutdown = true; ++G.workers_wake_gen; sys_condition_variable_broadcast(G.workers_wake_cv); sys_mutex_unlock(&lock); } for (i32 i = 0; i < G.num_worker_threads; ++i) { struct sys_thread *thread = G.worker_threads[i]; sys_thread_wait_release(thread); } } /* ========================== * * Atomic lock * ========================== */ INTERNAL void atomic_lock(void) { while (atomic_i32_eval_compare_exchange(&G.atomic_lock, 0, 1) != 0) { ix_pause(); } } INTERNAL void atomic_unlock(void) { atomic_i32_eval_exchange(&G.atomic_lock, 0); } /* ========================== * * Worker TLS * ========================== */ struct worker_ctx { i32 worker_id; /* Will be -1 if thread is not a worker */ i32 pin_depth; }; INTERNAL THREAD_LOCAL_VAR_ALLOC_FUNC_DEF(worker_ctx_init, vctx) { struct worker_ctx *ctx = vctx; ctx->worker_id = -1; } GLOBAL THREAD_LOCAL_VAR_DEF(tl_worker_ctx, struct worker_ctx, worker_ctx_init, NULL); /* ========================== * * Job * ========================== */ struct job_desc { job_func *func; void *sig; i32 count; b32 wait; i32 pinned_worker_id; }; INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) { struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); i32 worker_id = ctx->worker_id; job_func *job_func = desc.func; void *sig = desc.sig; i32 job_count = desc.count; b32 wait = desc.wait; i32 pinned_worker_id = desc.pinned_worker_id; if (pinned_worker_id >= G.num_worker_threads) { struct arena_temp scratch = scratch_begin_no_conflict(); sys_panic(string_format(scratch.arena, LIT("Tried to dispatch pinned job to worker with id %F when there are only %F worker threads"), FMT_SINT(pinned_worker_id), FMT_SINT(G.num_worker_threads))); scratch_end(scratch); } b32 is_pinned = pinned_worker_id >= 0; b32 is_pinned_to_caller = is_pinned && pinned_worker_id == worker_id; /* NOTE: If an unpinned worker dispatches an asynchronous job, then to avoid deadlocks we inline the work execution until atleast one other worker is assigned. */ b32 do_work = worker_id >= 0 && (wait || ctx->pin_depth <= 0) && pinned_worker_id < 0; if (is_pinned_to_caller) { do_work = true; wait = true; } struct job_handle handle = ZI; if (do_work && job_count == 1) { __profscope(Execute single job); struct job_data data = ZI; data.sig = sig; job_func(data); } else if (job_count >= 1) { __profscope(Allocate job); /* Allocate job */ u64 gen = 0; struct worker_job *job = NULL; { struct sys_mutex *old_cv_mutex = NULL; struct sys_condition_variable *old_cv = NULL; { atomic_lock(); if (G.first_free_job) { job = G.first_free_job; G.first_free_job = job->next_free; old_cv_mutex = job->gen_cv_mutex; old_cv = job->gen_cv; gen = atomic_u64_eval(&job->gen) + 1; } else { job = arena_push_no_zero(G.arena, struct worker_job); gen = 1; } atomic_unlock(); } atomic_u64_eval_exchange(&job->gen, 0); MEMZERO_STRUCT(job); if (old_cv_mutex) { job->gen_cv_mutex = old_cv_mutex; job->gen_cv = old_cv; } else { job->gen_cv_mutex = sys_mutex_alloc(); job->gen_cv = sys_condition_variable_alloc(); } job->pinned_worker_id = pinned_worker_id; job->count = job_count; job->func = job_func; job->sig = sig; atomic_u64_eval_exchange(&job->gen, gen); { /* Signal mutex change */ if (old_cv_mutex) { struct sys_lock lock = sys_mutex_lock_e(job->gen_cv_mutex); sys_condition_variable_broadcast(job->gen_cv); sys_mutex_unlock(&lock); } } } handle.job = job; handle.gen = gen; /* Reserve first job id for ourselves if necessary */ i32 job_id = 0; if (do_work) { job->num_dispatched = 1; job->num_workers = 1; } struct worker_job_queue *job_queue = NULL; if (!is_pinned_to_caller) { job_queue = is_pinned ? &G.pinned_queues[pinned_worker_id] : &G.global_queue; } job->queue = job_queue; /* Queue job */ if (job_queue) { __profscope(Queue job); atomic_lock(); { /* Push to queue */ { if (job_queue->last) { job_queue->last->next = job; } else { job_queue->first = job; } job->prev = job_queue->last; job_queue->last = job; } /* Signal workers */ struct sys_lock wake_lock = sys_mutex_lock_e(G.workers_wake_mutex); { ++G.workers_wake_gen; i32 num_signals = job_count - !!do_work; if (is_pinned) { num_signals = G.num_worker_threads; } sys_condition_variable_signal(G.workers_wake_cv, num_signals); } sys_mutex_unlock(&wake_lock); } atomic_unlock(); } /* Execute job */ b32 is_done = false; if (do_work && job_id < job_count) { __profscope(Execute job); ctx->pin_depth += is_pinned_to_caller; struct job_data data = ZI; data.sig = sig; b32 stop = false; while (!stop) { /* Run */ { __profscope(Run job); data.id = job_id; job_func(data); } /* Grab new ID or exit job */ { atomic_lock(); { if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { job_id = job->num_dispatched++; } else { stop = true; i32 num_workers = --job->num_workers; if (num_workers == 0) { __profscope(Release job); is_done = true; /* Dequeue */ { struct worker_job *prev = job->prev; struct worker_job *next = job->next; if (prev) { prev->next = next; } else { job_queue->first = next; } if (next) { next->prev = prev; } else { job_queue->last = prev; } } /* Add to free list */ { job->next_free = G.first_free_job; G.first_free_job = job; } /* Signal waiters */ atomic_u64_eval_add_u64(&job->gen, 1); { struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); sys_condition_variable_broadcast(job->gen_cv); sys_mutex_unlock(&cv_lock); } } } } atomic_unlock(); } } ctx->pin_depth -= is_pinned_to_caller; } /* Wait for job completion */ if (wait && !is_done) { __profscope(Wait for job); struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); is_done = atomic_u64_eval(&job->gen) != handle.gen; while (!is_done) { sys_condition_variable_wait(job->gen_cv, &cv_lock); is_done = atomic_u64_eval(&job->gen) != handle.gen; } sys_mutex_unlock(&cv_lock); } } return handle; } struct job_handle job_dispatch_async(i32 count, job_func *job_func, void *sig) { __prof; struct job_desc desc = ZI; desc.func = job_func; desc.sig = sig; desc.count = count; desc.wait = false; desc.pinned_worker_id = -1; return job_dispatch_ex(desc); } void job_dispatch_wait(i32 count, job_func *job_func, void *sig) { __prof; struct job_desc desc = ZI; desc.func = job_func; desc.sig = sig; desc.count = count; desc.wait = true; desc.pinned_worker_id = -1; job_dispatch_ex(desc); } void job_dispatch_pinned(i32 worker_id, job_func *job_func, void *sig) { __prof; struct job_desc desc = ZI; desc.func = job_func; desc.sig = sig; desc.count = 1; desc.wait = false; desc.pinned_worker_id = worker_id; job_dispatch_ex(desc); } void job_wait(struct job_handle handle) { struct worker_job *job = handle.job; if (job && handle.gen) { b32 is_done = atomic_u64_eval(&job->gen) != handle.gen; if (!is_done) { struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); i32 worker_id = ctx->worker_id; i32 job_pinned_worker = job->pinned_worker_id; if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) { i32 job_id = 0; i32 job_count = 0; struct worker_job_queue *job_queue = NULL; { atomic_lock(); job_id = job->num_dispatched++; job_count = job->count; job_queue = job->queue; if (job_id < job_count) { ++job->num_workers; } atomic_unlock(); } /* Execute job */ if (job_id < job_count) { __profscope(Execute job); struct job_data data = ZI; data.sig = job->sig; job_func *func = job->func; while (job_id < job_count) { /* Run */ { data.id = job_id; func(data); } /* Grab new ID or exit job */ { atomic_lock(); { job_id = job->num_dispatched++; if (job_id >= job_count) { __profscope(Release job); is_done = true; /* Dequeue */ { struct worker_job *prev = job->prev; struct worker_job *next = job->next; if (prev) { prev->next = next; } else { job_queue->first = next; } if (next) { next->prev = prev; } else { job_queue->last = prev; } } /* Add to free list */ { job->next_free = G.first_free_job; G.first_free_job = job; } /* Signal waiters */ atomic_u64_eval_add_u64(&job->gen, 1); { struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); sys_condition_variable_broadcast(job->gen_cv); sys_mutex_unlock(&cv_lock); } } } atomic_unlock(); } } } } /* Wait for job completion */ if (!is_done) { __profscope(Wait for job); struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); is_done = atomic_u64_eval(&job->gen) != handle.gen; while (!is_done) { sys_condition_variable_wait(job->gen_cv, &cv_lock); is_done = atomic_u64_eval(&job->gen) != handle.gen; } sys_mutex_unlock(&cv_lock); } } } } /* ========================== * * Worker * ========================== */ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) { i32 worker_id = (i32)(i64)thread_arg; struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); ctx->worker_id = worker_id; struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; u64 last_wake_gen = 0; struct sys_lock wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); while (!G.workers_shutdown) { last_wake_gen = G.workers_wake_gen; sys_mutex_unlock(&wake_lock); /* Try to pick job from queue */ i32 job_id = 0; i32 job_count = 0; b32 job_is_pinned_to_worker = false; struct worker_job_queue *job_queue = NULL; struct worker_job *job = NULL; { __profscope(Pick job); atomic_lock(); { for (i32 queue_index = 0; queue_index < (i32)ARRAY_COUNT(queues); ++queue_index) { struct worker_job_queue *queue = queues[queue_index]; struct worker_job *tmp = queue->first; while (!job && tmp) { i32 tmp_id = tmp->num_dispatched; i32 tmp_count = tmp->count; if (tmp_id < tmp_count) { /* Pick job */ ++tmp->num_workers; ++tmp->num_dispatched; job = tmp; job_id = tmp_id; job_count = tmp_count; job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id; job_queue = tmp->queue; } tmp = tmp->next; } } } atomic_unlock(); } /* Execute job */ if (job) { __profscope(Execute job); ctx->pin_depth += job_is_pinned_to_worker; atomic_i32_eval_add(&G.num_idle_worker_threads, -1); struct job_data data = ZI; data.sig = job->sig; job_func *func = job->func; b32 stop = job_id >= job_count; while (!stop) { /* Run */ { data.id = job_id; func(data); } /* Grab new ID or stop */ { atomic_lock(); job_id = job->num_dispatched++; if (job_id >= job_count) { stop = true; i32 num_workers = --job->num_workers; if (num_workers == 0) { __profscope(Release job); /* Dequeue */ { struct worker_job *prev = job->prev; struct worker_job *next = job->next; if (prev) { prev->next = next; } else { job_queue->first = next; } if (next) { next->prev = prev; } else { job_queue->last = prev; } } /* Add to free list */ { job->next_free = G.first_free_job; G.first_free_job = job; } /* Signal waiters */ atomic_u64_eval_add_u64(&job->gen, 1); { struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); sys_condition_variable_broadcast(job->gen_cv); sys_mutex_unlock(&cv_lock); } } } atomic_unlock(); } } atomic_i32_eval_add(&G.num_idle_worker_threads, 1); ctx->pin_depth -= job_is_pinned_to_worker; } wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); if (!G.workers_shutdown && G.workers_wake_gen == last_wake_gen) { sys_condition_variable_wait(G.workers_wake_cv, &wake_lock); } } sys_mutex_unlock(&wake_lock); }