From 55187aed97fc4ef724ea828e3f5bbde63f1622f6 Mon Sep 17 00:00:00 2001 From: jacob Date: Sat, 5 Jul 2025 14:44:00 -0500 Subject: [PATCH] remove old job system --- src/app.c | 2 - src/asset_cache.c | 14 +- src/asset_cache.h | 6 +- src/font.c | 10 +- src/gp_dx12.c | 14 +- src/job.c | 617 ------------------------------------------ src/job.h | 35 --- src/playback_wasapi.c | 7 +- src/resource.c | 7 +- src/sound.c | 10 +- src/sprite.c | 13 +- src/sys.h | 12 +- src/sys_win32.c | 110 ++++---- src/user.c | 9 +- 14 files changed, 106 insertions(+), 760 deletions(-) delete mode 100644 src/job.c delete mode 100644 src/job.h diff --git a/src/app.c b/src/app.c index da5123ba..b57985fb 100644 --- a/src/app.c +++ b/src/app.c @@ -2,7 +2,6 @@ #include "arena.h" #include "string.h" #include "sys.h" -#include "job.h" #include "user.h" #include "sim.h" #include "playback.h" @@ -324,7 +323,6 @@ void sys_app_entry(struct string args_str) } /* Startup systems */ - job_startup(worker_count, worker_names); struct resource_startup_receipt resource_sr = resource_startup(); struct sock_startup_receipt sock_sr = sock_startup(); struct host_startup_receipt host_sr = host_startup(&sock_sr); diff --git a/src/asset_cache.c b/src/asset_cache.c index 2e9498e8..18712ee7 100644 --- a/src/asset_cache.c +++ b/src/asset_cache.c @@ -5,7 +5,6 @@ #include "arena.h" #include "util.h" #include "log.h" -#include "job.h" /* ========================== * * Global state @@ -149,7 +148,6 @@ struct asset *asset_cache_touch(struct string key, u64 hash, b32 *is_first_touch .status = ASSET_STATUS_UNINITIALIZED, .hash = hash, .key = key_stored, - .job_ready_sf = sync_flag_alloc(), .asset_ready_sf = sync_flag_alloc() }; if (is_first_touch) { @@ -189,21 +187,11 @@ void asset_cache_mark_ready(struct asset *asset, void *store_data) * Job * ========================== */ -void asset_cache_set_job(struct asset *asset, struct job_handle *job) -{ - asset->job = job ? *job : (struct job_handle) { 0 }; - sync_flag_set(&asset->job_ready_sf); -} - void asset_cache_wait(struct asset *asset) { if (asset->status != ASSET_STATUS_READY) { - /* Wait for job to be set */ - sync_flag_wait(&asset->job_ready_sf); /* Wait on job */ - if (asset->job.gen != 0) { - job_wait(asset->job); - } + sys_wait(&asset->counter); /* Wait for asset to be ready */ sync_flag_wait(&asset->asset_ready_sf); } diff --git a/src/asset_cache.h b/src/asset_cache.h index 668432c3..b539c03f 100644 --- a/src/asset_cache.h +++ b/src/asset_cache.h @@ -3,7 +3,6 @@ #include "sys.h" #include "util.h" -#include "job.h" enum asset_status { ASSET_STATUS_NONE, @@ -19,9 +18,7 @@ struct asset { u64 hash; struct string key; - /* Managed via asset_cache_set_job */ - struct job_handle job; - struct sync_flag job_ready_sf; + struct sys_counter counter; /* Managed via asset_cache_mark_x functions */ enum asset_status status; @@ -46,7 +43,6 @@ struct asset *asset_cache_touch(struct string key, u64 hash, b32 *is_first_touch void asset_cache_mark_loading(struct asset *asset); void asset_cache_mark_ready(struct asset *asset, void *store_data); -void asset_cache_set_job(struct asset *asset, struct job_handle *job); void asset_cache_wait(struct asset *asset); void *asset_cache_get_store_data(struct asset *asset); diff --git a/src/font.c b/src/font.c index b7f07587..c8c00757 100644 --- a/src/font.c +++ b/src/font.c @@ -1,7 +1,6 @@ #include "font.h" #include "arena.h" #include "ttf.h" -#include "job.h" #include "asset_cache.h" #include "resource.h" #include "log.h" @@ -88,7 +87,7 @@ INTERNAL void font_task_params_release(struct font_task_params *p) * Load * ========================== */ -INTERNAL JOB_DEF(font_load_asset_job, job) +INTERNAL SYS_JOB_DEF(font_load_asset_job, job) { __prof; struct arena_temp scratch = scratch_begin_no_conflict(); @@ -189,12 +188,9 @@ struct asset *font_load_asset(struct string path, f32 point_size, b32 wait) /* Push task */ asset_cache_mark_loading(asset); + sys_run(1, font_load_asset_job, params, SYS_JOB_PRIORITY_BACKGROUND, &asset->counter); if (wait) { - job_dispatch_wait(1, font_load_asset_job, params); - asset_cache_set_job(asset, NULL); - } else { - struct job_handle job = job_dispatch_async(1, font_load_asset_job, params); - asset_cache_set_job(asset, &job); + asset_cache_wait(asset); } } diff --git a/src/gp_dx12.c b/src/gp_dx12.c index 3f9aaaea..c75fc823 100644 --- a/src/gp_dx12.c +++ b/src/gp_dx12.c @@ -1,11 +1,9 @@ #include "gp.h" #include "sys.h" #include "arena.h" -#include "arena.h" #include "memory.h" #include "string.h" #include "app.h" -#include "job.h" #include "log.h" #include "resource.h" #include "atomic.h" @@ -784,7 +782,7 @@ struct shader_compile_job_sig { }; /* TODO: Compile shaders offline w/ dxc for performance & language features */ -INTERNAL JOB_DEF(shader_compile_job, job) +INTERNAL SYS_JOB_DEF(shader_compile_job, job) { __prof; struct shader_compile_job_sig *sig = job.sig; @@ -865,7 +863,7 @@ struct pipeline_init_job_sig { struct pipeline **pipelines_out; }; -INTERNAL JOB_DEF(pipeline_init_job, job) +INTERNAL SYS_JOB_DEF(pipeline_init_job, job) { __prof; struct pipeline_init_job_sig *sig = job.sig; @@ -929,7 +927,9 @@ INTERNAL JOB_DEF(pipeline_init_job, job) if (success) { struct shader_compile_job_param *params[] = { &vs, &ps }; struct shader_compile_job_sig comp_sig = { .params = params }; - job_dispatch_wait(countof(params), shader_compile_job, &comp_sig); + struct sys_counter counter = ZI; + sys_run(countof(params), shader_compile_job, &comp_sig, SYS_JOB_PRIORITY_HIGH, &counter); + sys_wait(&counter); success = vs.success && ps.success; } @@ -1120,8 +1120,10 @@ INTERNAL JOB_DEF(pipeline_init_job, job) INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in, struct pipeline **pipelines_out) { __prof; + struct sys_counter counter = ZI; struct pipeline_init_job_sig sig = { .descs_in = descs_in, .pipelines_out = pipelines_out }; - job_dispatch_wait(num_pipelines, pipeline_init_job, &sig); + sys_run(num_pipelines, pipeline_init_job, &sig, SYS_JOB_PRIORITY_HIGH, &counter); + sys_wait(&counter); } INTERNAL void pipeline_release_now(struct pipeline *pipeline) diff --git a/src/job.c b/src/job.c deleted file mode 100644 index 3d6f1181..00000000 --- a/src/job.c +++ /dev/null @@ -1,617 +0,0 @@ -#include "job.h" -#include "sys.h" -#include "arena.h" -#include "atomic.h" -#include "string.h" -#include "app.h" -#include "intrinsics.h" - -#if 0 -/* FIXME: Remove this (replace with sys_ wrappers) */ -#include -#endif - -struct worker_job { - struct worker_job_queue *queue; - i32 num_workers; - i32 num_dispatched; - - i32 pinned_worker_id; - i32 count; - job_func *func; - void *sig; - - struct atomic_u64 gen; - struct sys_mutex *gen_cv_mutex; - struct sys_condition_variable *gen_cv; - - - struct worker_job *prev; - struct worker_job *next; - - struct worker_job *next_free; -}; - -struct worker_job_queue { - struct worker_job *first; - struct worker_job *last; -}; - -struct alignas(64) worker_ctx { - i32 worker_id; /* Will be -1 if thread is not a worker */ - i32 pin_depth; - b32 initialized; -}; -STATIC_ASSERT(alignof(struct worker_ctx) == 64); /* Avoid false sharing */ - -/* ========================== * - * Global state - * ========================== */ - -GLOBAL struct { - struct arena *arena; - - struct atomic_i32 atomic_lock; - - struct worker_job *first_free_job; - - struct worker_job_queue global_queue; - struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; - - u64 workers_wake_gen; - struct sys_condition_variable *workers_wake_cv; - struct sys_mutex *workers_wake_mutex; - - struct atomic_i32 num_idle_worker_threads; - i32 num_worker_threads; - b32 workers_shutdown; - struct sys_thread *worker_threads[JOB_MAX_WORKERS]; - - /* Fiber local storage */ - struct worker_ctx worker_contexts[SYS_MAX_FIBERS]; -} G = ZI, DEBUG_ALIAS(G, G_job); - -/* ========================== * - * Startup - * ========================== */ - -INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg); -INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown); - -void job_startup(i32 num_workers, struct string *worker_names) -{ - __prof; - struct arena_temp scratch = scratch_begin_no_conflict(); - - G.arena = arena_alloc(GIGABYTE(64)); - - G.workers_wake_cv = sys_condition_variable_alloc(); - G.workers_wake_mutex = sys_mutex_alloc(); - - if (num_workers < JOB_MIN_WORKERS || num_workers > JOB_MAX_WORKERS) { - /* Invalid worker count */ - ASSERT(false); - } - G.num_worker_threads = num_workers; - for (i32 i = 0; i < G.num_worker_threads; ++i) { - struct string name = worker_names[i]; - G.worker_threads[i] = sys_thread_alloc(worker_thread_entry_point, (void *)(i64)i, name, PROF_THREAD_GROUP_WORKERS + i); - } - atomic_i32_fetch_set(&G.num_idle_worker_threads, num_workers); - - app_register_exit_callback(job_shutdown); - - scratch_end(scratch); -} - -INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) -{ - __prof; - { - struct sys_lock lock = sys_mutex_lock_e(G.workers_wake_mutex); - G.workers_shutdown = true; - ++G.workers_wake_gen; - sys_condition_variable_broadcast(G.workers_wake_cv); - sys_mutex_unlock(&lock); - } - for (i32 i = 0; i < G.num_worker_threads; ++i) { - struct sys_thread *thread = G.worker_threads[i]; - sys_thread_wait_release(thread); - } -} - -/* ========================== * - * Atomic lock - * ========================== */ - -INTERNAL void atomic_lock(void) -{ - while (atomic_i32_fetch_test_set(&G.atomic_lock, 0, 1) != 0) { - ix_pause(); - } -} - -INTERNAL void atomic_unlock(void) -{ - atomic_i32_fetch_set(&G.atomic_lock, 0); -} - -/* ========================== * - * Worker TLS - * ========================== */ - -INTERNAL struct worker_ctx *worker_ctx_from_fiber_id(i32 fiber_id) -{ - struct worker_ctx *ctx = &G.worker_contexts[fiber_id]; - if (!ctx->initialized) { - ctx->initialized = 1; - ctx->worker_id = -1; - } - return ctx; -} - -/* ========================== * - * Job - * ========================== */ - -struct job_desc { - job_func *func; - void *sig; - i32 count; - b32 wait; - i32 pinned_worker_id; -}; - -INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) -{ - struct worker_ctx *ctx = worker_ctx_from_fiber_id(sys_current_fiber_id()); - i32 worker_id = ctx->worker_id; - - job_func *job_func = desc.func; - void *sig = desc.sig; - i32 job_count = desc.count; - b32 wait = desc.wait; - i32 pinned_worker_id = desc.pinned_worker_id; - - if (pinned_worker_id >= G.num_worker_threads) { - struct arena_temp scratch = scratch_begin_no_conflict(); - sys_panic(string_format(scratch.arena, LIT("Tried to dispatch pinned job to worker with id %F when there are only %F worker threads"), FMT_SINT(pinned_worker_id), FMT_SINT(G.num_worker_threads))); - scratch_end(scratch); - } - - b32 is_pinned = pinned_worker_id >= 0; - b32 is_pinned_to_caller = is_pinned && pinned_worker_id == worker_id; - - /* NOTE: If an unpinned worker dispatches an asynchronous job, then to avoid deadlocks we inline the work execution until atleast one other worker is assigned. */ - b32 do_work = worker_id >= 0 && (wait || ctx->pin_depth <= 0) && pinned_worker_id < 0; - - if (is_pinned_to_caller) { - do_work = true; - wait = true; - } - - struct job_handle handle = ZI; - if (do_work && job_count == 1) { - __profscope(Execute single job); - struct job_data data = ZI; - data.sig = sig; - job_func(data); - } else if (job_count >= 1) { - __profscope(Allocate job); - /* Allocate job */ - u64 gen = 0; - struct worker_job *job = NULL; - { - struct sys_mutex *old_cv_mutex = NULL; - struct sys_condition_variable *old_cv = NULL; - { - atomic_lock(); - if (G.first_free_job) { - job = G.first_free_job; - G.first_free_job = job->next_free; - old_cv_mutex = job->gen_cv_mutex; - old_cv = job->gen_cv; - gen = atomic_u64_fetch(&job->gen) + 1; - } else { - job = arena_push_no_zero(G.arena, struct worker_job); - gen = 1; - } - atomic_unlock(); - } - atomic_u64_fetch_set(&job->gen, 0); - MEMZERO_STRUCT(job); - if (old_cv_mutex) { - job->gen_cv_mutex = old_cv_mutex; - job->gen_cv = old_cv; - } else { - job->gen_cv_mutex = sys_mutex_alloc(); - job->gen_cv = sys_condition_variable_alloc(); - } - job->pinned_worker_id = pinned_worker_id; - job->count = job_count; - job->func = job_func; - job->sig = sig; - atomic_u64_fetch_set(&job->gen, gen); - { - /* Signal mutex change */ - if (old_cv_mutex) { - struct sys_lock lock = sys_mutex_lock_e(job->gen_cv_mutex); - sys_condition_variable_broadcast(job->gen_cv); - sys_mutex_unlock(&lock); - } - } - } - handle.job = job; - handle.gen = gen; - - /* Reserve first job id for ourselves if necessary */ - i32 job_id = 0; - if (do_work) { - job->num_dispatched = 1; - job->num_workers = 1; - } - - struct worker_job_queue *job_queue = NULL; - if (!is_pinned_to_caller) { - job_queue = is_pinned ? &G.pinned_queues[pinned_worker_id] : &G.global_queue; - } - job->queue = job_queue; - - /* Queue job */ - if (job_queue) { - __profscope(Queue job); - atomic_lock(); - { - /* Push to queue */ - { - if (job_queue->last) { - job_queue->last->next = job; - } else { - job_queue->first = job; - } - job->prev = job_queue->last; - job_queue->last = job; - } - /* Signal workers */ - struct sys_lock wake_lock = sys_mutex_lock_e(G.workers_wake_mutex); - { - ++G.workers_wake_gen; - i32 num_signals = job_count - !!do_work; - if (is_pinned) { - num_signals = G.num_worker_threads; - } - sys_condition_variable_signal(G.workers_wake_cv, num_signals); - } - sys_mutex_unlock(&wake_lock); - - } - atomic_unlock(); - } - - /* Execute job */ - b32 is_done = false; - if (do_work && job_id < job_count) { - __profscope(Execute job); - ctx->pin_depth += is_pinned_to_caller; - struct job_data data = ZI; - data.sig = sig; - b32 stop = false; - while (!stop) { - /* Run */ - { - __profscope(Run job); - data.id = job_id; - job_func(data); - } - /* Grab new ID or exit job */ - { - atomic_lock(); - { - if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { - job_id = job->num_dispatched++; - } else { - stop = true; - i32 num_workers = --job->num_workers; - if (num_workers == 0) { - __profscope(Release job); - is_done = true; - /* Dequeue */ - { - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - } - /* Add to free list */ - { - job->next_free = G.first_free_job; - G.first_free_job = job; - } - /* Signal waiters */ - atomic_u64_fetch_add_u64(&job->gen, 1); - { - struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); - sys_condition_variable_broadcast(job->gen_cv); - sys_mutex_unlock(&cv_lock); - } - } - } - } - atomic_unlock(); - } - } - ctx->pin_depth -= is_pinned_to_caller; - } - - /* Wait for job completion */ - if (wait && !is_done) { - __profscope(Wait for job); - struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); - is_done = atomic_u64_fetch(&job->gen) != handle.gen; - while (!is_done) { - sys_condition_variable_wait(job->gen_cv, &cv_lock); - is_done = atomic_u64_fetch(&job->gen) != handle.gen; - } - sys_mutex_unlock(&cv_lock); - } - } - return handle; -} - -struct job_handle job_dispatch_async(i32 count, job_func *job_func, void *sig) -{ - __prof; - struct job_desc desc = ZI; - desc.func = job_func; - desc.sig = sig; - desc.count = count; - desc.wait = false; - desc.pinned_worker_id = -1; - return job_dispatch_ex(desc); -} - -void job_dispatch_wait(i32 count, job_func *job_func, void *sig) -{ - __prof; - struct job_desc desc = ZI; - desc.func = job_func; - desc.sig = sig; - desc.count = count; - desc.wait = true; - desc.pinned_worker_id = -1; - job_dispatch_ex(desc); -} - -void job_dispatch_pinned(i32 worker_id, job_func *job_func, void *sig) -{ - __prof; - struct job_desc desc = ZI; - desc.func = job_func; - desc.sig = sig; - desc.count = 1; - desc.wait = false; - desc.pinned_worker_id = worker_id; - job_dispatch_ex(desc); -} - -void job_wait(struct job_handle handle) -{ - struct worker_job *job = handle.job; - if (job && handle.gen) { - b32 is_done = atomic_u64_fetch(&job->gen) != handle.gen; - if (!is_done) { - struct worker_ctx *ctx = worker_ctx_from_fiber_id(sys_current_fiber_id()); - i32 worker_id = ctx->worker_id; - i32 job_pinned_worker = job->pinned_worker_id; - if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) { - i32 job_id = 0; - i32 job_count = 0; - struct worker_job_queue *job_queue = NULL; - { - atomic_lock(); - job_id = job->num_dispatched++; - job_count = job->count; - job_queue = job->queue; - if (job_id < job_count) { - ++job->num_workers; - } - atomic_unlock(); - } - - /* Execute job */ - if (job_id < job_count) { - __profscope(Execute job); - struct job_data data = ZI; - data.sig = job->sig; - job_func *func = job->func; - while (job_id < job_count) { - /* Run */ - { - data.id = job_id; - func(data); - } - /* Grab new ID or exit job */ - { - atomic_lock(); - { - job_id = job->num_dispatched++; - if (job_id >= job_count) { - __profscope(Release job); - is_done = true; - /* Dequeue */ - { - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - } - /* Add to free list */ - { - job->next_free = G.first_free_job; - G.first_free_job = job; - } - /* Signal waiters */ - atomic_u64_fetch_add_u64(&job->gen, 1); - { - struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); - sys_condition_variable_broadcast(job->gen_cv); - sys_mutex_unlock(&cv_lock); - } - } - } - atomic_unlock(); - } - } - } - } - - /* Wait for job completion */ - if (!is_done) { - __profscope(Wait for job); - struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); - is_done = atomic_u64_fetch(&job->gen) != handle.gen; - while (!is_done) { - sys_condition_variable_wait(job->gen_cv, &cv_lock); - is_done = atomic_u64_fetch(&job->gen) != handle.gen; - } - sys_mutex_unlock(&cv_lock); - } - } - } -} - -/* ========================== * - * Worker - * ========================== */ - -INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) -{ - i32 worker_id = (i32)(i64)thread_arg; - - struct worker_ctx *ctx = worker_ctx_from_fiber_id(sys_current_fiber_id()); - ctx->worker_id = worker_id; - - struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; - u64 last_wake_gen = 0; - - struct sys_lock wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); - while (!G.workers_shutdown) { - last_wake_gen = G.workers_wake_gen; - sys_mutex_unlock(&wake_lock); - - /* Try to pick job from queue */ - i32 job_id = 0; - i32 job_count = 0; - b32 job_is_pinned_to_worker = false; - struct worker_job_queue *job_queue = NULL; - struct worker_job *job = NULL; - { - __profscope(Pick job); - atomic_lock(); - { - for (i32 queue_index = 0; queue_index < (i32)countof(queues); ++queue_index) { - struct worker_job_queue *queue = queues[queue_index]; - struct worker_job *tmp = queue->first; - while (!job && tmp) { - i32 tmp_id = tmp->num_dispatched; - i32 tmp_count = tmp->count; - if (tmp_id < tmp_count) { - /* Pick job */ - ++tmp->num_workers; - ++tmp->num_dispatched; - job = tmp; - job_id = tmp_id; - job_count = tmp_count; - job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id; - job_queue = tmp->queue; - } - tmp = tmp->next; - } - } - } - atomic_unlock(); - } - - /* Execute job */ - if (job) { - __profscope(Execute job); - ctx->pin_depth += job_is_pinned_to_worker; - atomic_i32_fetch_add(&G.num_idle_worker_threads, -1); - struct job_data data = ZI; - data.sig = job->sig; - job_func *func = job->func; - b32 stop = job_id >= job_count; - while (!stop) { - /* Run */ - { - data.id = job_id; - func(data); - } - /* Grab new ID or stop */ - { - atomic_lock(); - job_id = job->num_dispatched++; - if (job_id >= job_count) { - stop = true; - i32 num_workers = --job->num_workers; - if (num_workers == 0) { - __profscope(Release job); - /* Dequeue */ - { - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - } - /* Add to free list */ - { - job->next_free = G.first_free_job; - G.first_free_job = job; - } - /* Signal waiters */ - atomic_u64_fetch_add_u64(&job->gen, 1); - { - struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); - sys_condition_variable_broadcast(job->gen_cv); - sys_mutex_unlock(&cv_lock); - } - } - } - atomic_unlock(); - } - } - atomic_i32_fetch_add(&G.num_idle_worker_threads, 1); - ctx->pin_depth -= job_is_pinned_to_worker; - } - - wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); - if (!G.workers_shutdown && G.workers_wake_gen == last_wake_gen) { - sys_condition_variable_wait(G.workers_wake_cv, &wake_lock); - } - } - sys_mutex_unlock(&wake_lock); -} diff --git a/src/job.h b/src/job.h deleted file mode 100644 index 116c33ee..00000000 --- a/src/job.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef JOB_H -#define JOB_H - -#define JOB_MIN_WORKERS 2 -#define JOB_MAX_WORKERS 64 - -/* ========================== * - * Startup - * ========================== */ - -void job_startup(i32 num_workers, struct string *worker_names); - -/* ========================== * - * Job - * ========================== */ - -struct job_data { - i32 id; - void *sig; -}; - -struct job_handle { - void *job; - u64 gen; -}; - -#define JOB_DEF(job_name, arg_name) void job_name(struct job_data arg_name) -typedef JOB_DEF(job_func, job_data); - -struct job_handle job_dispatch_async(i32 count, job_func *job_func, void *sig); -void job_dispatch_wait(i32 count, job_func *job_func, void *sig); -void job_dispatch_pinned(i32 worker_id, job_func *job_func, void *sig); -void job_wait(struct job_handle handle); - -#endif diff --git a/src/playback_wasapi.c b/src/playback_wasapi.c index 522c2035..41dbd2dc 100644 --- a/src/playback_wasapi.c +++ b/src/playback_wasapi.c @@ -11,7 +11,6 @@ #include "mixer.h" #include "atomic.h" #include "app.h" -#include "job.h" #define COBJMACROS #define WIN32_LEAN_AND_MEAN @@ -52,14 +51,14 @@ GLOBAL struct { INTERNAL void wasapi_initialize(void); INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown); -INTERNAL JOB_DEF(playback_job, _); +INTERNAL SYS_JOB_DEF(playback_job, _); struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *mixer_sr) { (UNUSED)mixer_sr; wasapi_initialize(); - job_dispatch_pinned(APP_DEDICATED_WORKER_ID_AUDIO, playback_job, NULL); + sys_run(1, playback_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); app_register_exit_callback(&playback_shutdown); return (struct playback_startup_receipt) { 0 }; @@ -227,7 +226,7 @@ INTERNAL void wasapi_update_end(struct wasapi_buffer *wspbuf, struct mixed_pcm_f * Playback thread entry * ========================== */ -INTERNAL JOB_DEF(playback_job, _) +INTERNAL SYS_JOB_DEF(playback_job, _) { (UNUSED)_; struct arena_temp scratch = scratch_begin_no_conflict(); diff --git a/src/resource.c b/src/resource.c index 4093e257..9ae48e09 100644 --- a/src/resource.c +++ b/src/resource.c @@ -4,7 +4,6 @@ #include "tar.h" #include "incbin.h" #include "util.h" -#include "job.h" /* ========================== * * Global data @@ -224,7 +223,7 @@ struct resource_watch_callback_job_sig { resource_watch_callback **callbacks; }; -INTERNAL JOB_DEF(resource_watch_callback_job, job) +INTERNAL SYS_JOB_DEF(resource_watch_callback_job, job) { __prof; struct resource_watch_callback_job_sig *sig = job.sig; @@ -289,7 +288,9 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _) struct resource_watch_callback_job_sig sig = ZI; sig.name = info->name; sig.callbacks = callbacks; - job_dispatch_wait(num_callbacks, resource_watch_callback_job, &sig); + struct sys_counter counter = ZI; + sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_JOB_PRIORITY_BACKGROUND, &counter); + sys_wait(&counter); } } } diff --git a/src/sound.c b/src/sound.c index f5075812..4b5ce9d8 100644 --- a/src/sound.c +++ b/src/sound.c @@ -5,7 +5,6 @@ #include "resource.h" #include "asset_cache.h" #include "mp3.h" -#include "job.h" struct sound_task_params { struct sound_task_params *next_free; @@ -78,7 +77,7 @@ INTERNAL void sound_task_params_release(struct sound_task_params *p) * Load * ========================== */ -INTERNAL JOB_DEF(sound_load_asset_job, job) +INTERNAL SYS_JOB_DEF(sound_load_asset_job, job) { __prof; struct sound_task_params *params = job.sig; @@ -182,12 +181,9 @@ struct asset *sound_load_asset(struct string path, u32 flags, b32 wait) /* Push task */ asset_cache_mark_loading(asset); + sys_run(1, sound_load_asset_job, params, SYS_JOB_PRIORITY_BACKGROUND, &asset->counter); if (wait) { - job_dispatch_wait(1, sound_load_asset_job, params); - asset_cache_set_job(asset, NULL); - } else { - struct job_handle job = job_dispatch_async(1, sound_load_asset_job, params); - asset_cache_set_job(asset, &job); + asset_cache_wait(asset); } } diff --git a/src/sprite.c b/src/sprite.c index b28e594d..2e73c99a 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -5,7 +5,6 @@ #include "resource.h" #include "ase.h" #include "util.h" -#include "job.h" #include "atomic.h" #include "app.h" #include "gp.h" @@ -201,7 +200,7 @@ INTERNAL struct image_rgba generate_purple_black_image(struct arena *arena, u32 * ========================== */ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(sprite_shutdown); -INTERNAL JOB_DEF(sprite_load_job, arg); +INTERNAL SYS_JOB_DEF(sprite_load_job, arg); INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg); #if RESOURCE_RELOADING @@ -336,7 +335,7 @@ INTERNAL void push_load_job(struct cache_ref ref, struct sprite_tag tag) } /* Push work */ - job_dispatch_async(1, sprite_load_job, cmd); + sys_run(1, sprite_load_job, cmd, SYS_JOB_PRIORITY_BACKGROUND, NULL); } INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag tag) @@ -1142,7 +1141,7 @@ struct sprite_sheet_slice_array sprite_sheet_get_slices(struct sprite_sheet *she * Load job * ========================== */ -INTERNAL JOB_DEF(sprite_load_job, job) +INTERNAL SYS_JOB_DEF(sprite_load_job, job) { __prof; struct load_cmd *cmd = job.sig; @@ -1228,7 +1227,7 @@ INTERNAL SORT_COMPARE_FUNC_DEF(evict_sort, arg_a, arg_b, udata) return (b_cycle > a_cycle) - (a_cycle > b_cycle); } -INTERNAL JOB_DEF(sprite_evictor_job, _) +INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) { (UNUSED)_; __prof; @@ -1381,7 +1380,9 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg) struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex); while (!G.evictor_scheduler_shutdown) { - job_dispatch_wait(1, sprite_evictor_job, NULL); + struct sys_counter counter = ZI; + sys_run(1, sprite_evictor_job, NULL, SYS_JOB_PRIORITY_BACKGROUND, &counter); + sys_wait(&counter); sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); } sys_mutex_unlock(&evictor_lock); diff --git a/src/sys.h b/src/sys.h index 043c494f..3bbded94 100644 --- a/src/sys.h +++ b/src/sys.h @@ -478,7 +478,17 @@ b32 sys_run_command(struct string cmd); +/* ========================== * + * Counter + * ========================== */ +struct sys_counter { + struct atomic_i64 v; +}; + +void sys_counter_add(struct sys_counter *sys_counter, i64 amount); + +void sys_wait(struct sys_counter *sys_counter); /* ========================== * * Fiber @@ -510,7 +520,7 @@ struct sys_job_data { #define SYS_JOB_DEF(job_name, arg_name) void job_name(struct sys_job_data arg_name) typedef SYS_JOB_DEF(sys_job_func, job_data); -void sys_run(i32 count, sys_job_func *func, enum sys_job_priority priority); +void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_job_priority priority, struct sys_counter *counter); /* ========================== * * Scratch context diff --git a/src/sys_win32.c b/src/sys_win32.c index 7598bb86..4519a1c5 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -217,6 +217,7 @@ struct job_info { i32 count; sys_job_func *func; void *sig; + struct sys_counter *counter; struct job_info *next; }; @@ -332,7 +333,22 @@ GLOBAL struct { * Counters * ========================== */ +void sys_counter_add(struct sys_counter *counter, i64 amount) +{ + i64 old_v = atomic_i64_fetch_add(&counter->v, amount); + i64 new_v = old_v + amount; + if (old_v > 0 && new_v <= 0) { + (UNUSED)old_v; + (UNUSED)new_v; + } +} +void sys_wait(struct sys_counter *sys_counter) +{ + while (atomic_i64_fetch(&sys_counter->v) > 0) { + ix_pause(); + } +} /* ========================== * * Fibers @@ -468,9 +484,12 @@ void sys_yield(void) yield(YIELD_KIND_COOPERATIVE); } -void sys_run(i32 count, sys_job_func *func, enum sys_job_priority priority) +void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_job_priority priority, struct sys_counter *counter) { if (count > 0) { + if (counter) { + sys_counter_add(counter, count); + } struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); priority = clamp_i32(priority, fiber->job_priority, SYS_JOB_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ @@ -488,6 +507,8 @@ void sys_run(i32 count, sys_job_func *func, enum sys_job_priority priority) MEMZERO_STRUCT(info); info->count = count; info->func = func; + info->sig = sig; + info->counter = counter; if (queue->last) { queue->last->next = info; } else { @@ -499,22 +520,6 @@ void sys_run(i32 count, sys_job_func *func, enum sys_job_priority priority) } } -struct bla_job_sig { - i32 _; -}; - -INTERNAL SYS_JOB_DEF(bla_job, job) -{ - __prof; - (UNUSED)job; - Sleep(20); - { - __profscope(Do tha yield); - yield(YIELD_KIND_SLEEP); - } - Sleep(20); -} - /* ========================== * * Job fiber func * ========================== */ @@ -549,7 +554,6 @@ INTERNAL void job_fiber_entry(void *id_ptr) INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg) { - __profscope(Runner); struct runner_ctx *ctx = runner_ctx_arg; (UNUSED)ctx; @@ -568,39 +572,44 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg) i32 job_id = 0; sys_job_func *job_func = 0; void *job_sig = 0; - for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) { - struct job_queue *queue = queues[queue_index]; - if (queue) { - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); - struct job_info *info = queue->first; - while (info && !job_func) { - struct job_info *next = info->next; - job_id = info->num_dispatched++; - if (job_id < info->count) { - /* Pick job */ - STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ - job_priority = (enum sys_job_priority)queue->kind; - job_func = info->func; - job_sig = info->sig; - if (job_id == (info->count - 1)) { - /* We're picking up the last dispatch, so dequeue the job */ - if (!next) { - queue->last = NULL; + struct sys_counter *job_counter = 0; + { + //__profscope(Pull job); + for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) { + struct job_queue *queue = queues[queue_index]; + if (queue) { + while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + struct job_info *info = queue->first; + while (info && !job_func) { + struct job_info *next = info->next; + job_id = info->num_dispatched++; + if (job_id < info->count) { + /* Pick job */ + STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ + job_priority = (enum sys_job_priority)queue->kind; + job_func = info->func; + job_sig = info->sig; + job_counter = info->counter; + if (job_id == (info->count - 1)) { + /* We're picking up the last dispatch, so dequeue the job */ + if (!next) { + queue->last = NULL; + } + queue->first = next; + info->next = queue->first_free; + queue->first_free = info; } - queue->first = next; - info->next = queue->first_free; - queue->first_free = info; } + info = next; } - info = next; + atomic_i32_fetch_set(&queue->lock, 0); } - atomic_i32_fetch_set(&queue->lock, 0); } } - /* Execute fiber */ + /* Run fiber */ if (job_func) { - __profscope(Execute fiber); + __profscope(Run fiber); if (!job_fiber) { job_fiber = fiber_alloc(FIBER_KIND_JOB_RUNNER); } @@ -628,6 +637,10 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg) case YIELD_KIND_DONE: { + if (job_counter) { + sys_counter_add(job_counter, -1); + } + /* TODO: remove this */ (UNUSED)fiber_release; done = true; @@ -636,6 +649,13 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg) } } } + + /* TODO */ +#if 0 + { + __profscope(Runner wait); + } +#endif } INTERNAL SYS_THREAD_DEF(test_entry, _) @@ -663,10 +683,6 @@ INTERNAL SYS_THREAD_DEF(test_entry, _) G.runner_threads[i] = sys_thread_alloc(runner_entry, ctx, name, PROF_THREAD_GROUP_RUNNERS + i); } - /* Push test job */ - Sleep(300); - sys_run(2, bla_job, SYS_JOB_PRIORITY_NORMAL); - /* Wait on runners */ for (i32 i = 0; i < G.num_runner_threads; ++i) { struct sys_thread *runner_thread = G.runner_threads[i]; diff --git a/src/user.c b/src/user.c index 12e45626..88ffd3a1 100644 --- a/src/user.c +++ b/src/user.c @@ -259,13 +259,8 @@ struct user_startup_receipt user_startup(struct gp_startup_receipt *gp_sr, sys_window_register_event_callback(G.window, &window_event_callback); /* Start jobs */ -#if 0 - job_dispatch_pinned(APP_DEDICATED_WORKER_ID_SIM, local_sim_job, NULL); - job_dispatch_pinned(APP_DEDICATED_WORKER_ID_USER, user_job, NULL); -#else - sys_run(1, local_sim_job, SYS_JOB_PRIORITY_HIGH); - sys_run(1, user_job, SYS_JOB_PRIORITY_HIGH); -#endif + sys_run(1, local_sim_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); + sys_run(1, user_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); app_register_exit_callback(&user_shutdown); return (struct user_startup_receipt) { 0 };