rename 'runner' -> 'worker'

This commit is contained in:
jacob 2025-07-05 15:43:18 -05:00
parent 55187aed97
commit ca5c1d6ee3
10 changed files with 58 additions and 50 deletions

View File

@ -656,9 +656,8 @@ INLINE f64 clamp_f64(f64 v, f64 min, f64 max) { return v < min ? min : v > max ?
#include "prof_tracy.h" #include "prof_tracy.h"
#define PROF_THREAD_GROUP_RUNNERS -8000000 #define PROF_THREAD_GROUP_WORKERS -7000000
#define PROF_THREAD_GROUP_FIBERS -7000000 #define PROF_THREAD_GROUP_FIBERS -6000000
#define PROF_THREAD_GROUP_WORKERS -6000000
#define PROF_THREAD_GROUP_IO -5000000 #define PROF_THREAD_GROUP_IO -5000000
#define PROF_THREAD_GROUP_WINDOW -4000000 #define PROF_THREAD_GROUP_WINDOW -4000000
#define PROF_THREAD_GROUP_EVICTORS -3000000 #define PROF_THREAD_GROUP_EVICTORS -3000000

View File

@ -188,7 +188,7 @@ struct asset *font_load_asset(struct string path, f32 point_size, b32 wait)
/* Push task */ /* Push task */
asset_cache_mark_loading(asset); asset_cache_mark_loading(asset);
sys_run(1, font_load_asset_job, params, SYS_JOB_PRIORITY_BACKGROUND, &asset->counter); sys_run(1, font_load_asset_job, params, SYS_PRIORITY_BACKGROUND, &asset->counter);
if (wait) { if (wait) {
asset_cache_wait(asset); asset_cache_wait(asset);
} }

View File

@ -928,7 +928,7 @@ INTERNAL SYS_JOB_DEF(pipeline_init_job, job)
struct shader_compile_job_param *params[] = { &vs, &ps }; struct shader_compile_job_param *params[] = { &vs, &ps };
struct shader_compile_job_sig comp_sig = { .params = params }; struct shader_compile_job_sig comp_sig = { .params = params };
struct sys_counter counter = ZI; struct sys_counter counter = ZI;
sys_run(countof(params), shader_compile_job, &comp_sig, SYS_JOB_PRIORITY_HIGH, &counter); sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_HIGH, &counter);
sys_wait(&counter); sys_wait(&counter);
success = vs.success && ps.success; success = vs.success && ps.success;
} }
@ -1122,7 +1122,7 @@ INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in,
__prof; __prof;
struct sys_counter counter = ZI; struct sys_counter counter = ZI;
struct pipeline_init_job_sig sig = { .descs_in = descs_in, .pipelines_out = pipelines_out }; struct pipeline_init_job_sig sig = { .descs_in = descs_in, .pipelines_out = pipelines_out };
sys_run(num_pipelines, pipeline_init_job, &sig, SYS_JOB_PRIORITY_HIGH, &counter); sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_HIGH, &counter);
sys_wait(&counter); sys_wait(&counter);
} }

View File

@ -58,7 +58,7 @@ struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *m
(UNUSED)mixer_sr; (UNUSED)mixer_sr;
wasapi_initialize(); wasapi_initialize();
sys_run(1, playback_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); sys_run(1, playback_job, NULL, SYS_PRIORITY_HIGH, NULL);
app_register_exit_callback(&playback_shutdown); app_register_exit_callback(&playback_shutdown);
return (struct playback_startup_receipt) { 0 }; return (struct playback_startup_receipt) { 0 };

View File

@ -289,7 +289,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _)
sig.name = info->name; sig.name = info->name;
sig.callbacks = callbacks; sig.callbacks = callbacks;
struct sys_counter counter = ZI; struct sys_counter counter = ZI;
sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_JOB_PRIORITY_BACKGROUND, &counter); sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, &counter);
sys_wait(&counter); sys_wait(&counter);
} }
} }

View File

@ -181,7 +181,7 @@ struct asset *sound_load_asset(struct string path, u32 flags, b32 wait)
/* Push task */ /* Push task */
asset_cache_mark_loading(asset); asset_cache_mark_loading(asset);
sys_run(1, sound_load_asset_job, params, SYS_JOB_PRIORITY_BACKGROUND, &asset->counter); sys_run(1, sound_load_asset_job, params, SYS_PRIORITY_BACKGROUND, &asset->counter);
if (wait) { if (wait) {
asset_cache_wait(asset); asset_cache_wait(asset);
} }

View File

@ -335,7 +335,7 @@ INTERNAL void push_load_job(struct cache_ref ref, struct sprite_tag tag)
} }
/* Push work */ /* Push work */
sys_run(1, sprite_load_job, cmd, SYS_JOB_PRIORITY_BACKGROUND, NULL); sys_run(1, sprite_load_job, cmd, SYS_PRIORITY_BACKGROUND, NULL);
} }
INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag tag) INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag tag)
@ -1381,7 +1381,7 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg)
struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex); struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex);
while (!G.evictor_scheduler_shutdown) { while (!G.evictor_scheduler_shutdown) {
struct sys_counter counter = ZI; struct sys_counter counter = ZI;
sys_run(1, sprite_evictor_job, NULL, SYS_JOB_PRIORITY_BACKGROUND, &counter); sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, &counter);
sys_wait(&counter); sys_wait(&counter);
sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
} }

View File

@ -504,10 +504,10 @@ void sys_yield(void);
* Job * Job
* ========================== */ * ========================== */
enum sys_job_priority { enum sys_priority {
SYS_JOB_PRIORITY_HIGH = 0, SYS_PRIORITY_HIGH = 0,
SYS_JOB_PRIORITY_NORMAL = 1, SYS_PRIORITY_NORMAL = 1,
SYS_JOB_PRIORITY_BACKGROUND = 2, SYS_PRIORITY_BACKGROUND = 2,
NUM_SYS_JOB_PRIORITIES NUM_SYS_JOB_PRIORITIES
}; };
@ -520,7 +520,7 @@ struct sys_job_data {
#define SYS_JOB_DEF(job_name, arg_name) void job_name(struct sys_job_data arg_name) #define SYS_JOB_DEF(job_name, arg_name) void job_name(struct sys_job_data arg_name)
typedef SYS_JOB_DEF(sys_job_func, job_data); typedef SYS_JOB_DEF(sys_job_func, job_data);
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_job_priority priority, struct sys_counter *counter); void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct sys_counter *counter);
/* ========================== * /* ========================== *
* Scratch context * Scratch context

View File

@ -206,7 +206,7 @@ STATIC_ASSERT(alignof(struct fiber_ctx) == 64); /* Avoid false sharing */
struct alignas(64) runner_ctx { struct alignas(64) worker_ctx {
i32 id; i32 id;
HANDLE sleep_timer; HANDLE sleep_timer;
}; };
@ -317,12 +317,12 @@ GLOBAL struct {
/* Jobs */ /* Jobs */
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
/* Runners */ /* Workers */
struct atomic_i32 runners_shutdown; /* TODO: Prevent false sharing */ struct atomic_i32 workers_shutdown; /* TODO: Prevent false sharing */
i32 num_runner_threads; i32 num_worker_threads;
struct arena *runner_threads_arena; struct arena *worker_threads_arena;
struct sys_thread **runner_threads; struct sys_thread **worker_threads;
struct runner_ctx *runner_contexts; struct worker_ctx *worker_contexts;
} G = ZI, DEBUG_ALIAS(G, G_sys_win32); } G = ZI, DEBUG_ALIAS(G, G_sys_win32);
@ -338,6 +338,7 @@ void sys_counter_add(struct sys_counter *counter, i64 amount)
i64 old_v = atomic_i64_fetch_add(&counter->v, amount); i64 old_v = atomic_i64_fetch_add(&counter->v, amount);
i64 new_v = old_v + amount; i64 new_v = old_v + amount;
if (old_v > 0 && new_v <= 0) { if (old_v > 0 && new_v <= 0) {
/* TODO: Wake waiters */
(UNUSED)old_v; (UNUSED)old_v;
(UNUSED)new_v; (UNUSED)new_v;
} }
@ -345,6 +346,7 @@ void sys_counter_add(struct sys_counter *counter, i64 amount)
void sys_wait(struct sys_counter *sys_counter) void sys_wait(struct sys_counter *sys_counter)
{ {
__prof;
while (atomic_i64_fetch(&sys_counter->v) > 0) { while (atomic_i64_fetch(&sys_counter->v) > 0) {
ix_pause(); ix_pause();
} }
@ -484,14 +486,14 @@ void sys_yield(void)
yield(YIELD_KIND_COOPERATIVE); yield(YIELD_KIND_COOPERATIVE);
} }
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_job_priority priority, struct sys_counter *counter) void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct sys_counter *counter)
{ {
if (count > 0) { if (count > 0) {
if (counter) { if (counter) {
sys_counter_add(counter, count); sys_counter_add(counter, count);
} }
struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
priority = clamp_i32(priority, fiber->job_priority, SYS_JOB_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */
STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
enum job_queue_kind queue_kind = (enum job_queue_kind)priority; enum job_queue_kind queue_kind = (enum job_queue_kind)priority;
struct job_queue *queue = &G.job_queues[queue_kind]; struct job_queue *queue = &G.job_queues[queue_kind];
@ -549,15 +551,15 @@ INTERNAL void job_fiber_entry(void *id_ptr)
} }
/* ========================== * /* ========================== *
* Test runners * Test workers
* ========================== */ * ========================== */
INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg) INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
{ {
struct runner_ctx *ctx = runner_ctx_arg; struct worker_ctx *ctx = worker_ctx_arg;
(UNUSED)ctx; (UNUSED)ctx;
i32 runner_fiber_id = sys_current_fiber_id(); i32 worker_fiber_id = sys_current_fiber_id();
struct job_queue *queues[countof(G.job_queues)] = ZI; struct job_queue *queues[countof(G.job_queues)] = ZI;
for (u32 i = 0; i < countof(G.job_queues); ++i) { for (u32 i = 0; i < countof(G.job_queues); ++i) {
@ -566,9 +568,9 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg)
struct fiber *job_fiber = NULL; struct fiber *job_fiber = NULL;
while (!atomic_i32_fetch(&G.runners_shutdown)) { while (!atomic_i32_fetch(&G.workers_shutdown)) {
/* Pull job from queue */ /* Pull job from queue */
enum sys_job_priority job_priority = 0; enum sys_priority job_priority = 0;
i32 job_id = 0; i32 job_id = 0;
sys_job_func *job_func = 0; sys_job_func *job_func = 0;
void *job_sig = 0; void *job_sig = 0;
@ -586,7 +588,7 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg)
if (job_id < info->count) { if (job_id < info->count) {
/* Pick job */ /* Pick job */
STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
job_priority = (enum sys_job_priority)queue->kind; job_priority = (enum sys_priority)queue->kind;
job_func = info->func; job_func = info->func;
job_sig = info->sig; job_sig = info->sig;
job_counter = info->counter; job_counter = info->counter;
@ -617,7 +619,7 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg)
job_fiber->job_sig = job_sig; job_fiber->job_sig = job_sig;
job_fiber->job_id = job_id; job_fiber->job_id = job_id;
job_fiber->job_priority = job_priority; job_fiber->job_priority = job_priority;
job_fiber->parent_id = runner_fiber_id; job_fiber->parent_id = worker_fiber_id;
b32 done = false; b32 done = false;
while (!done) { while (!done) {
SwitchToFiber(job_fiber->addr); SwitchToFiber(job_fiber->addr);
@ -653,7 +655,7 @@ INTERNAL SYS_THREAD_DEF(runner_entry, runner_ctx_arg)
/* TODO */ /* TODO */
#if 0 #if 0
{ {
__profscope(Runner wait); __profscope(Worker wait);
} }
#endif #endif
} }
@ -670,23 +672,23 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
queue->arena = arena_alloc(GIGABYTE(64)); queue->arena = arena_alloc(GIGABYTE(64));
} }
/* Start runners */ /* Start workers */
G.num_runner_threads = 6; G.num_worker_threads = 6;
G.runner_threads_arena = arena_alloc(GIGABYTE(64)); G.worker_threads_arena = arena_alloc(GIGABYTE(64));
G.runner_threads = arena_push_array(G.runner_threads_arena, struct sys_thread *, G.num_runner_threads); G.worker_threads = arena_push_array(G.worker_threads_arena, struct sys_thread *, G.num_worker_threads);
G.runner_contexts = arena_push_array(G.runner_threads_arena, struct runner_ctx, G.num_runner_threads); G.worker_contexts = arena_push_array(G.worker_threads_arena, struct worker_ctx, G.num_worker_threads);
for (i32 i = 0; i < G.num_runner_threads; ++i) { for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct runner_ctx *ctx = &G.runner_contexts[i]; struct worker_ctx *ctx = &G.worker_contexts[i];
ctx->id = i; ctx->id = i;
ctx->sleep_timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); ctx->sleep_timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
struct string name = string_format(scratch.arena, LIT("Runner #%F"), FMT_SINT(i)); struct string name = string_format(scratch.arena, LIT("Worker #%F"), FMT_SINT(i));
G.runner_threads[i] = sys_thread_alloc(runner_entry, ctx, name, PROF_THREAD_GROUP_RUNNERS + i); G.worker_threads[i] = sys_thread_alloc(worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
} }
/* Wait on runners */ /* Wait on workers */
for (i32 i = 0; i < G.num_runner_threads; ++i) { for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct sys_thread *runner_thread = G.runner_threads[i]; struct sys_thread *worker_thread = G.worker_threads[i];
sys_thread_wait_release(runner_thread); sys_thread_wait_release(worker_thread);
} }
scratch_end(scratch); scratch_end(scratch);
@ -3061,7 +3063,7 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
} }
/* Shutdown test thread */ /* Shutdown test thread */
atomic_i32_fetch_set(&G.runners_shutdown, 1); atomic_i32_fetch_set(&G.workers_shutdown, 1);
sys_thread_wait_release(test_thread); sys_thread_wait_release(test_thread);
} }

View File

@ -259,8 +259,8 @@ struct user_startup_receipt user_startup(struct gp_startup_receipt *gp_sr,
sys_window_register_event_callback(G.window, &window_event_callback); sys_window_register_event_callback(G.window, &window_event_callback);
/* Start jobs */ /* Start jobs */
sys_run(1, local_sim_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); sys_run(1, local_sim_job, NULL, SYS_PRIORITY_HIGH, NULL);
sys_run(1, user_job, NULL, SYS_JOB_PRIORITY_HIGH, NULL); sys_run(1, user_job, NULL, SYS_PRIORITY_HIGH, NULL);
app_register_exit_callback(&user_shutdown); app_register_exit_callback(&user_shutdown);
return (struct user_startup_receipt) { 0 }; return (struct user_startup_receipt) { 0 };
@ -629,8 +629,15 @@ INTERNAL void user_update(void)
struct sim_snapshot *src = sim_snapshot_from_tick(G.local_to_user_client, last_tick); struct sim_snapshot *src = sim_snapshot_from_tick(G.local_to_user_client, last_tick);
sim_snapshot_alloc(G.user_unblended_client, src, src->tick); sim_snapshot_alloc(G.user_unblended_client, src, src->tick);
G.last_local_to_user_snapshot_published_at_ns = G.local_to_user_client_publish_time_ns; G.last_local_to_user_snapshot_published_at_ns = G.local_to_user_client_publish_time_ns;
/* FIXME: Enable this */
#if 0
G.average_local_to_user_snapshot_publish_dt_ns -= G.average_local_to_user_snapshot_publish_dt_ns / 50; G.average_local_to_user_snapshot_publish_dt_ns -= G.average_local_to_user_snapshot_publish_dt_ns / 50;
G.average_local_to_user_snapshot_publish_dt_ns += G.local_to_user_client_publish_dt_ns / 50; G.average_local_to_user_snapshot_publish_dt_ns += G.local_to_user_client_publish_dt_ns / 50;
#else
G.average_local_to_user_snapshot_publish_dt_ns -= G.average_local_to_user_snapshot_publish_dt_ns / 1;
G.average_local_to_user_snapshot_publish_dt_ns += G.local_to_user_client_publish_dt_ns / 1;
#endif
} }
sys_mutex_unlock(&lock); sys_mutex_unlock(&lock);
} }