From fea034698229342bfaf67f43cc67fb829bba0211 Mon Sep 17 00:00:00 2001 From: jacob Date: Mon, 30 Jun 2025 20:00:11 -0500 Subject: [PATCH] pinned jobs --- res/shaders_dx11/common.hlsl | 22 -- res/shaders_dx11/grid.hlsl | 106 ------ res/shaders_dx11/mesh.hlsl | 42 --- res/shaders_dx11/test.hlsl | 51 --- res/shaders_dx11/texture.hlsl | 84 ----- src/app.c | 42 ++- src/app.h | 8 + src/job.c | 605 +++++++++++++++++++++++++--------- src/job.h | 7 +- src/playback_wasapi.c | 17 +- src/sys_win32.c | 8 - src/user.c | 45 +-- 12 files changed, 516 insertions(+), 521 deletions(-) delete mode 100644 res/shaders_dx11/common.hlsl delete mode 100644 res/shaders_dx11/grid.hlsl delete mode 100644 res/shaders_dx11/mesh.hlsl delete mode 100644 res/shaders_dx11/test.hlsl delete mode 100644 res/shaders_dx11/texture.hlsl diff --git a/res/shaders_dx11/common.hlsl b/res/shaders_dx11/common.hlsl deleted file mode 100644 index c1e397f3..00000000 --- a/res/shaders_dx11/common.hlsl +++ /dev/null @@ -1,22 +0,0 @@ -#define DECL(t, n) t n : n -#define DESV(t, n, s) t n : s - -#define PI 3.14159265359 -#define GOLDEN 1.61803398875 - -/* Linear color from normalized sRGB */ -float4 linear_from_srgb(float4 srgb) -{ - return float4(pow(srgb.rgb, 2.2), srgb.a); -} - -/* Linear color from R8G8B8A8 sRGB */ -float4 linear_from_srgb32(uint srgb32) -{ - float4 res; - res.r = ((srgb32 >> 0) & 0xFF) / 255.0; - res.g = ((srgb32 >> 8) & 0xFF) / 255.0; - res.b = ((srgb32 >> 16) & 0xFF) / 255.0; - res.a = ((srgb32 >> 24) & 0xFF) / 255.0; - return linear_from_srgb(res); -} diff --git a/res/shaders_dx11/grid.hlsl b/res/shaders_dx11/grid.hlsl deleted file mode 100644 index 9a9ebd03..00000000 --- a/res/shaders_dx11/grid.hlsl +++ /dev/null @@ -1,106 +0,0 @@ -#include "shaders_dx11/common.hlsl" - -struct vs_instance { - float2x3 xf; - float line_thickness; - float line_spacing; - float2 offset; - uint bg0_srgb; - uint bg1_srgb; - uint line_srgb; - uint x_srgb; - uint y_srgb; -}; - -struct ps_input { - DESV(float4, screen_pos, SV_POSITION); - DECL(float, line_thickness); - DECL(float, line_spacing); - DECL(float2, offset); - DECL(float4, bg0_lin); - DECL(float4, bg1_lin); - DECL(float4, line_lin); - DECL(float4, x_lin); - DECL(float4, y_lin); -}; - -/* ========================== * - * Globals - * ========================== */ - -StructuredBuffer G_instance_buffer : register(t0); - -cbuffer constants : register(b0) -{ - float4x4 G_projection; - uint G_instance_offset; -}; - -/* ========================== * - * Vertex shader - * ========================== */ - -static const float2 G_quad_verts[4] = { - float2(-0.5f, -0.5f), - float2( 0.5f, -0.5f), - float2( 0.5f, 0.5f), - float2(-0.5f, 0.5f) -}; - -ps_input vs_main(uint instance_id : SV_InstanceID, uint vertex_id : SV_VertexID) -{ - vs_instance instance = G_instance_buffer[G_instance_offset + instance_id]; - float2 vert = G_quad_verts[vertex_id]; - float2 world_pos = mul(instance.xf, float3(vert, 1)).xy; - - ps_input output; - output.screen_pos = mul(G_projection, float4(world_pos, 0, 1)); - output.line_thickness = instance.line_thickness; - output.line_spacing = instance.line_spacing; - output.offset = instance.offset; - output.bg0_lin = linear_from_srgb32(instance.bg0_srgb); - output.bg1_lin = linear_from_srgb32(instance.bg1_srgb); - output.line_lin = linear_from_srgb32(instance.line_srgb); - output.x_lin = linear_from_srgb32(instance.x_srgb); - output.y_lin = linear_from_srgb32(instance.y_srgb); - - return output; -} - -/* ========================== * - * Pixel shader - * ========================== */ - -float4 ps_main(ps_input input) : SV_TARGET -{ - float2 grid_pos = input.screen_pos.xy + input.offset; - float half_thickness = input.line_thickness / 2; - float spacing = input.line_spacing; - - float4 color = input.bg0_lin; - - float2 v = abs(round(grid_pos / spacing) * spacing - grid_pos); - float dist = min(v.x, v.y); - - if (grid_pos.y <= half_thickness && grid_pos.y >= -half_thickness) { - color = input.x_lin; - } else if (grid_pos.x <= half_thickness && grid_pos.x >= -half_thickness) { - color = input.y_lin; - } else if (dist < half_thickness) { - color = input.line_lin; - } else { - bool checker = false; - uint cell_x = (uint)(abs(grid_pos.x) / spacing) + (grid_pos.x < 0); - uint cell_y = (uint)(abs(grid_pos.y) / spacing) + (grid_pos.y < 0); - if (cell_x % 2 == 0) { - checker = cell_y % 2 == 0; - } else { - checker = cell_y % 2 == 1; - } - if (checker) { - color = input.bg1_lin; - } - } - - return color; -} diff --git a/res/shaders_dx11/mesh.hlsl b/res/shaders_dx11/mesh.hlsl deleted file mode 100644 index 73e30cf8..00000000 --- a/res/shaders_dx11/mesh.hlsl +++ /dev/null @@ -1,42 +0,0 @@ -#include "shaders_dx11/common.hlsl" - -struct vs_input { - DECL(float4, pos); - DECL(float4, color_srgb); -}; - -struct ps_input { - DESV(float4, screen_pos, SV_POSITION); - DECL(float4, color_lin); -}; - -/* ========================== * - * Globals - * ========================== */ - -cbuffer constants : register(b0) -{ - float4x4 G_projection; -}; - -/* ========================== * - * Vertex shader - * ========================== */ - -ps_input vs_main(vs_input input) -{ - ps_input output; - output.screen_pos = mul(G_projection, float4(input.pos.xy, 0.f, 1.f)); - output.color_lin = linear_from_srgb(input.color_srgb); - - return output; -} - -/* ========================== * - * Pixel shader - * ========================== */ - -float4 ps_main(ps_input input) : SV_TARGET -{ - return input.color_lin; -} diff --git a/res/shaders_dx11/test.hlsl b/res/shaders_dx11/test.hlsl deleted file mode 100644 index 3cfec5f6..00000000 --- a/res/shaders_dx11/test.hlsl +++ /dev/null @@ -1,51 +0,0 @@ -#include "shaders_dx11/common.hlsl" - -struct vs_instance { - float2x3 xf; -}; - -struct ps_input { - DESV(float4, screen_pos, SV_POSITION); -}; - -/* ========================== * - * Globals - * ========================== */ - -StructuredBuffer G_instance_buffer : register(t0); - -cbuffer constants : register(b0) -{ - float4x4 G_projection; - uint G_instance_offset; -}; - -/* ========================== * - * Vertex shader - * ========================== */ - -static const float2 G_quad_verts[4] = { - float2(-0.5f, -0.5f), - float2( 0.5f, -0.5f), - float2( 0.5f, 0.5f), - float2(-0.5f, 0.5f) -}; - -ps_input vs_main(uint instance_id : SV_InstanceID, uint vertex_id : SV_VertexID) -{ - vs_instance instance = G_instance_buffer[G_instance_offset + instance_id]; - float2 vert = G_quad_verts[vertex_id]; - float2 world_pos = mul(instance.xf, float3(vert, 1)).xy; - ps_input output; - output.screen_pos = mul(G_projection, float4(world_pos, 0, 1)); - return output; -} - -/* ========================== * - * Pixel shader - * ========================== */ - -float4 ps_main(ps_input input) : SV_TARGET -{ - return float4(0, 0, 0, 0); -} diff --git a/res/shaders_dx11/texture.hlsl b/res/shaders_dx11/texture.hlsl deleted file mode 100644 index cb76cb66..00000000 --- a/res/shaders_dx11/texture.hlsl +++ /dev/null @@ -1,84 +0,0 @@ -#include "shaders_dx11/common.hlsl" - -struct vs_constants { - float4x4 projection; - uint instance_offset; -}; - -struct vs_instance { - float2x3 xf; - float2 uv0; - float2 uv1; - uint tint_srgb; - float emittance; -}; - -/* ========================== * - * Globals - * ========================== */ - -StructuredBuffer g_instances : register(t0); - -Texture2D g_texture : register(t1); - -SamplerState g_sampler : register(s0); - -cbuffer cbuff : register(b0) -{ - struct vs_constants g_constants; -}; - -/* ========================== * - * Vertex shader - * ========================== */ - -static const float2 g_quad_verts[4] = { - float2(-0.5f, -0.5f), - float2( 0.5f, -0.5f), - float2( 0.5f, 0.5f), - float2(-0.5f, 0.5f) -}; - -struct vs_input { - DECL(uint, SV_InstanceID); - DECL(uint, SV_VertexID); -}; - -struct vs_output { - DECL(float4, SV_Position); - DECL(float2, uv); - DECL(float4, tint_lin); -}; - -struct vs_output vs_main(struct vs_input input) -{ - vs_instance instance = g_instances[g_constants.instance_offset + input.SV_InstanceID]; - float2 vert = g_quad_verts[input.SV_VertexID]; - float2 world_pos = mul(instance.xf, float3(vert, 1)).xy; - - struct vs_output output; - output.SV_Position = mul(g_constants.projection, float4(world_pos, 0, 1)); - output.uv = instance.uv0 + ((vert + 0.5) * (instance.uv1 - instance.uv0)); - output.tint_lin = linear_from_srgb32(instance.tint_srgb); - - return output; -} - -/* ========================== * - * Pixel shader - * ========================== */ - -struct ps_input { - struct vs_output vs; -}; - -struct ps_output { - DECL(float4, SV_Target); -}; - -struct ps_output ps_main(struct ps_input input) -{ - struct ps_output output; - output.SV_Target = g_texture.Sample(g_sampler, input.vs.uv) * input.vs.tint_lin; - return output; -} diff --git a/src/app.c b/src/app.c index c9dcc024..9aeb718d 100644 --- a/src/app.c +++ b/src/app.c @@ -233,27 +233,35 @@ void app_entry_point(struct string args_str) G.exit_callbacks_arena = arena_alloc(GIGABYTE(64)); G.arena = arena_alloc(GIGABYTE(64)); - u32 worker_count = 4; + i32 worker_count; { /* FIXME: Switch this on to utilize all cores. Only decreasing worker count for testing purposes. */ -#if !PROFILING && !RTC - /* Ideally these layers should have cores "reserved" for them - * 1. User thread - * 2. Sim thread - * 3. Audio mixing / playback thread - * 4. Networking thread - */ - i32 num_reserved_cores = 4; - - i32 num_logical_cores = (i32)sys_num_logical_processors(); - //num_logical_cores = min(num_logical_cores, 8) + (max(num_logical_cores - 8, 0) / 2); /* Dumb heuristic to try and lessen e-core usage */ - i32 min_worker_count = JOB_MIN_WORKER_COUNT; - i32 max_worker_count = JOB_MAX_WORKER_COUNT; - i32 target_worker_count = num_logical_cores - num_reserved_cores; - worker_count = (u32)clamp_i32(target_worker_count, min_worker_count, max_worker_count); +#if !PROFILING && !RTC || 1 + i32 min_worker_count = min_i32(NUM_APP_DEDICATED_WORKERS + 2, JOB_MIN_WORKERS); + i32 max_worker_count = JOB_MAX_WORKERS; + i32 target_worker_count = (i32)sys_num_logical_processors() * 0.75; + worker_count = clamp_i32(target_worker_count, min_worker_count, max_worker_count); +#else + worker_count = 8; #endif } + struct string *worker_names = arena_push_array(scratch.arena, struct string, worker_count); + for (i32 i = 0; i < worker_count; ++i) { + struct string prefix = string_from_int(scratch.arena, worker_count - i, 10, 2); /* For profiler sorting order */ + struct string id = string_from_int(scratch.arena, i, 10, 2); + struct string *name = &worker_names[i]; + if (i == APP_DEDICATED_WORKER_ID_USER) { + *name = string_format(scratch.arena, LIT("[W%F] Worker #%F (User)"), FMT_STR(prefix), FMT_STR(id)); + } else if (i == APP_DEDICATED_WORKER_ID_SIM) { + *name = string_format(scratch.arena, LIT("[W%F] Worker #%F (Sim)"), FMT_STR(prefix), FMT_STR(id)); + } else if (i == APP_DEDICATED_WORKER_ID_AUDIO) { + *name = string_format(scratch.arena, LIT("[W%F] Worker #%F (Audio)"), FMT_STR(prefix), FMT_STR(id)); + } else { + *name = string_format(scratch.arena, LIT("[W%F] Worker #%F"), FMT_STR(prefix), FMT_STR(id)); + } + } + G.write_path = initialize_write_directory(G.arena, LIT(WRITE_DIR)); /* Startup logging */ @@ -318,7 +326,7 @@ void app_entry_point(struct string args_str) } /* Startup systems */ - job_startup(worker_count); + job_startup(worker_count, worker_names); struct resource_startup_receipt resource_sr = resource_startup(); struct sock_startup_receipt sock_sr = sock_startup(); struct host_startup_receipt host_sr = host_startup(&sock_sr); diff --git a/src/app.h b/src/app.h index 807ffa7c..21c7f071 100644 --- a/src/app.h +++ b/src/app.h @@ -4,6 +4,14 @@ #define APP_EXIT_CALLBACK_FUNC_DEF(name) void name(void) typedef APP_EXIT_CALLBACK_FUNC_DEF(app_exit_callback_func); +enum app_dedicated_worker_id { + APP_DEDICATED_WORKER_ID_USER = 0, + APP_DEDICATED_WORKER_ID_SIM = 1, + APP_DEDICATED_WORKER_ID_AUDIO = 2, + + NUM_APP_DEDICATED_WORKERS +}; + struct string app_write_path_cat(struct arena *arena, struct string filename); /* Register a function that will be called when the application exits */ diff --git a/src/job.c b/src/job.c index 35336e2a..b1d46509 100644 --- a/src/job.c +++ b/src/job.c @@ -12,15 +12,17 @@ #endif struct worker_job { + struct worker_job_queue *queue; struct sys_mutex *mutex; i32 num_workers; i32 num_dispatched; + i32 pinned_worker_id; i32 count; job_func *func; void *sig; - u64 gen; + struct atomic_u64 gen; struct sys_condition_variable *gen_cv; @@ -30,6 +32,11 @@ struct worker_job { struct worker_job *next_free; }; +struct worker_job_queue { + struct worker_job *first; + struct worker_job *last; +}; + /* ========================== * * Global state * ========================== */ @@ -44,14 +51,13 @@ GLOBAL struct { struct worker_job *first_free_job; struct sys_mutex *queued_jobs_mutex; - struct worker_job *first_queued_job; - struct worker_job *last_queued_job; - u64 num_queued_jobs; + struct worker_job_queue global_queue; + struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; + u64 queue_submit_gen; - - u32 num_worker_threads; + struct atomic_i32 num_idle_worker_threads; + i32 num_worker_threads; b32 workers_shutdown; - struct sys_mutex *workers_wake_mutex; struct sys_condition_variable *workers_wake_cv; struct sys_thread *worker_threads[JOB_MAX_WORKERS]; } G = ZI, DEBUG_ALIAS(G, G_job); @@ -63,7 +69,7 @@ GLOBAL struct { INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg); INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown); -void job_startup(i32 num_workers) +void job_startup(i32 num_workers, struct string *worker_names) { __prof; struct arena_temp scratch = scratch_begin_no_conflict(); @@ -73,7 +79,6 @@ void job_startup(i32 num_workers) G.queued_jobs_mutex = sys_mutex_alloc(); - G.workers_wake_mutex = sys_mutex_alloc(); G.workers_wake_cv = sys_condition_variable_alloc(); if (num_workers < JOB_MIN_WORKERS || num_workers > JOB_MAX_WORKERS) { @@ -81,11 +86,11 @@ void job_startup(i32 num_workers) ASSERT(false); } G.num_worker_threads = num_workers; - for (u64 i = 0; i < G.num_worker_threads; ++i) { - u32 prefix = num_workers - i; /* For profiler sorting order */ - struct string name = string_format(scratch.arena, LIT("[P6%F] Worker #%F"), FMT_UINT(prefix), FMT_UINT(i)); - G.worker_threads[i] = sys_thread_alloc(worker_thread_entry_point, &i, name); + for (i32 i = 0; i < G.num_worker_threads; ++i) { + struct string name = worker_names[i]; + G.worker_threads[i] = sys_thread_alloc(worker_thread_entry_point, (void *)(i64)i, name); } + atomic_i32_eval_exchange(&G.num_idle_worker_threads, num_workers); app_register_exit_callback(job_shutdown); @@ -96,180 +101,210 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) { __prof; { - struct sys_lock lock = sys_mutex_lock_e(G.workers_wake_mutex); + struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); G.workers_shutdown = true; sys_condition_variable_signal(G.workers_wake_cv, U32_MAX); sys_mutex_unlock(&lock); } - for (u32 i = 0; i < G.num_worker_threads; ++i) { + for (i32 i = 0; i < G.num_worker_threads; ++i) { struct sys_thread *thread = G.worker_threads[i]; sys_thread_wait_release(thread); } } +/* ========================== * + * Worker TLS + * ========================== */ + +struct worker_ctx { + i32 worker_id; /* Will be -1 if thread is not a worker */ + i32 pin_depth; +}; + +INTERNAL THREAD_LOCAL_VAR_ALLOC_FUNC_DEF(worker_ctx_init, vctx) +{ + struct worker_ctx *ctx = vctx; + ctx->worker_id = -1; +} + +GLOBAL THREAD_LOCAL_VAR_DEF(tl_worker_ctx, struct worker_ctx, worker_ctx_init, NULL); + /* ========================== * * Job * ========================== */ -struct job_handle job_dispatch_async(u32 count, job_func *job_func, void *sig) +struct job_desc { + job_func *func; + void *sig; + i32 count; + b32 wait; + i32 pinned_worker_id; +}; + +INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) { - __prof; + struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); + i32 worker_id = ctx->worker_id; - /* Allocate job */ - u64 gen = 0; - struct worker_job *job = NULL; - { - struct sys_mutex *old_mutex = NULL; - struct sys_condition_variable *old_cv = NULL; - { - struct sys_lock lock = sys_mutex_lock_e(G.free_jobs_mutex); - if (G.first_free_job) { - job = G.first_free_job; - G.first_free_job = job->next_free; - old_mutex = job->mutex; - old_cv = job->gen_cv; - gen = job->gen + 1; - } else { - job = arena_push_no_zero(G.free_jobs_arena, struct worker_job); - gen = 1; - } - sys_mutex_unlock(&lock); - } - MEMZERO_STRUCT(job); - if (old_mutex) { - job->mutex = old_mutex; - job->gen_cv = old_cv; - } else { - job->mutex = sys_mutex_alloc(); - job->gen_cv = sys_condition_variable_alloc(); - } - } - job->count = count; - job->func = job_func; - job->sig = sig; - job->gen = gen; + job_func *job_func = desc.func; + void *sig = desc.sig; + i32 job_count = desc.count; + b32 wait = desc.wait; + i32 pinned_worker_id = desc.pinned_worker_id; - /* Queue job */ - { - struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); - if (G.last_queued_job) { - G.last_queued_job->next = job; - } else { - G.first_queued_job = job; - } - G.last_queued_job = job; - sys_mutex_unlock(&lock); + if (pinned_worker_id >= G.num_worker_threads) { + struct arena_temp scratch = scratch_begin_no_conflict(); + sys_panic(string_format(scratch.arena, LIT("Tried to dispatch pinned job to worker with id %F when there are only %F worker threads"), FMT_SINT(pinned_worker_id), FMT_SINT(G.num_worker_threads))); + scratch_end(scratch); } - /* Signal workers */ - { - struct sys_lock lock = sys_mutex_lock_e(G.workers_wake_mutex); - sys_condition_variable_signal(G.workers_wake_cv, count); - sys_mutex_unlock(&lock); + b32 is_pinned = pinned_worker_id >= 0; + b32 is_pinned_to_caller = is_pinned && pinned_worker_id == worker_id; + + /* NOTE: If an unpinned worker dispatches an asynchronous job, then to avoid deadlocks we inline the work execution until atleast one other worker is assigned. */ + b32 do_work = worker_id >= 0 && (wait || ctx->pin_depth <= 0) && pinned_worker_id < 0; + + if (is_pinned_to_caller) { + do_work = true; + wait = true; } struct job_handle handle = ZI; - handle.job = job; - handle.gen = gen; - return handle; -} - -void job_dispatch_wait(u32 count, job_func *job_func, void *sig) -{ - __prof; - struct job_handle handle = job_dispatch_async(count, job_func, sig); - job_wait(handle); -} - -void job_wait(struct job_handle handle) -{ - __prof; - if (handle.job) { - struct worker_job *job = handle.job; - while (job->gen == handle.gen) { - struct sys_lock lock = sys_mutex_lock_s(job->mutex); - sys_condition_variable_wait(job->gen_cv, &lock); - sys_mutex_unlock(&lock); - } - } -} - -/* ========================== * - * Worker - * ========================== */ - -INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) -{ - i32 worker_id = *(i32 *)thread_arg; - (UNUSED)worker_id; - - struct sys_lock workers_wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); - while (!G.workers_shutdown) { - sys_mutex_unlock(&workers_wake_lock); - - /* Try to pick job from queue */ - i32 job_id = 0; - i32 job_count = 0; + if (do_work && job_count == 1) { + __profscope(Execute single job); + struct job_data data = ZI; + data.sig = sig; + job_func(data); + } else if (job_count >= 1) { + __profscope(Allocate job); + /* Allocate job */ + u64 gen = 0; struct worker_job *job = NULL; { - struct sys_lock queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); - for (struct worker_job *tmp = G.first_queued_job; tmp && !job; tmp = tmp->next) { - struct sys_lock job_lock = sys_mutex_lock_e(tmp->mutex); - { - i32 tmp_id = tmp->num_dispatched++; - i32 tmp_count = tmp->count; - if (tmp_id < tmp_count) { - /* Pick job */ - ++tmp->num_workers; - job = tmp; - job_id = tmp_id; - job_count = tmp_count; - } + struct sys_mutex *old_mutex = NULL; + struct sys_condition_variable *old_cv = NULL; + { + struct sys_lock lock = sys_mutex_lock_e(G.free_jobs_mutex); + if (G.first_free_job) { + job = G.first_free_job; + G.first_free_job = job->next_free; + old_mutex = job->mutex; + old_cv = job->gen_cv; + gen = atomic_u64_eval(&job->gen) + 1; + } else { + job = arena_push_no_zero(G.free_jobs_arena, struct worker_job); + gen = 1; } - sys_mutex_unlock(&job_lock); + sys_mutex_unlock(&lock); } - sys_mutex_unlock(&queue_lock); + struct sys_lock lock = old_mutex ? sys_mutex_lock_e(old_mutex) : (struct sys_lock)ZI; + { + atomic_u64_eval_exchange(&job->gen, 0); + MEMZERO_STRUCT(job); + if (old_mutex) { + job->mutex = old_mutex; + job->gen_cv = old_cv; + } else { + job->mutex = sys_mutex_alloc(); + job->gen_cv = sys_condition_variable_alloc(); + } + job->pinned_worker_id = pinned_worker_id; + job->count = job_count; + job->func = job_func; + job->sig = sig; + atomic_u64_eval_exchange(&job->gen, gen); + } + if (lock.mutex) { + sys_mutex_unlock(&lock); + } + } + handle.job = job; + handle.gen = gen; + + /* Reserve first job id for ourselves if necessary */ + i32 job_id = 0; + if (do_work) { + job->num_dispatched = 1; + job->num_workers = 1; } - /* Remove job from queue */ - if (job_id == (job_count - 1)) { - struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); + struct worker_job_queue *job_queue = NULL; + if (!is_pinned_to_caller) { + job_queue = is_pinned ? &G.pinned_queues[pinned_worker_id] : &G.global_queue; + } + job->queue = job_queue; + + /* Queue job */ + if (job_queue) { + __profscope(Queue); + struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); { - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - G.first_queued_job = next; + /* Push to queue */ + { + if (job_queue->last) { + job_queue->last->next = job; + } else { + job_queue->first = job; + } + job_queue->last = job; } - if (next) { - next->prev = prev; - } else { - G.last_queued_job = prev; + ++G.queue_submit_gen; + /* Signal workers */ + i32 num_signals = job_count - !!do_work; + if (is_pinned) { + num_signals = G.num_worker_threads; } - --G.num_queued_jobs; + sys_condition_variable_signal(G.workers_wake_cv, num_signals); + } - sys_mutex_unlock(&queue_lock); + sys_mutex_unlock(&lock); } /* Execute job */ - if (job) { + b32 is_done = false; + if (do_work && job_id < job_count) { + __profscope(Execute job); + ctx->pin_depth += is_pinned_to_caller; struct job_data data = ZI; - data.sig = job->sig; - job_func *func = job->func; + data.sig = sig; b32 should_release = false; - while (job_id < job_count) { + b32 stop = false; + while (!stop) { + /* Remove job from queue */ + if (job_queue && job_id == (job_count - 1)) { + struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + sys_mutex_unlock(&queue_lock); + } + /* Run */ { data.id = job_id; - func(data); + job_func(data); } + /* Grab new ID or exit job */ { struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); - job_id = job->num_dispatched++; - if (job_id >= job_count) { + if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { + job_id = job->num_dispatched++; + } else { + stop = true; i32 num_workers = --job->num_workers; if (num_workers == 0) { - ++job->gen; + is_done = true; + atomic_u64_eval_add_u64(&job->gen, 1); should_release = true; sys_condition_variable_signal(job->gen_cv, U32_MAX); } @@ -285,12 +320,286 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) } sys_mutex_unlock(&fj_lock); } + ctx->pin_depth -= is_pinned_to_caller; } - workers_wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); - if (!G.workers_shutdown && !G.first_queued_job) { - __profscope(Worker sleep); - sys_condition_variable_wait(G.workers_wake_cv, &workers_wake_lock); + /* Wait for job completion */ + if (wait && !is_done) { + __profscope(Wait for job); + struct sys_lock lock = sys_mutex_lock_s(job->mutex); + is_done = atomic_u64_eval(&job->gen) != handle.gen; + while (!is_done) { + sys_condition_variable_wait(job->gen_cv, &lock); + is_done = atomic_u64_eval(&job->gen) != handle.gen; + } + sys_mutex_unlock(&lock); + } + } + return handle; +} + +struct job_handle job_dispatch_async(i32 count, job_func *job_func, void *sig) +{ + __prof; + struct job_desc desc = ZI; + desc.func = job_func; + desc.sig = sig; + desc.count = count; + desc.wait = false; + desc.pinned_worker_id = -1; + return job_dispatch_ex(desc); +} + +void job_dispatch_wait(i32 count, job_func *job_func, void *sig) +{ + __prof; + struct job_desc desc = ZI; + desc.func = job_func; + desc.sig = sig; + desc.count = count; + desc.wait = true; + desc.pinned_worker_id = -1; + job_dispatch_ex(desc); +} + +void job_dispatch_pinned(i32 worker_id, job_func *job_func, void *sig) +{ + __prof; + struct job_desc desc = ZI; + desc.func = job_func; + desc.sig = sig; + desc.count = 1; + desc.wait = false; + desc.pinned_worker_id = worker_id; + job_dispatch_ex(desc); +} + +void job_wait(struct job_handle handle) +{ + struct worker_job *job = handle.job; + if (job && handle.gen) { + b32 is_done = atomic_u64_eval(&job->gen) != handle.gen; + if (!is_done) { + struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); + i32 worker_id = ctx->worker_id; + i32 job_pinned_worker = job->pinned_worker_id; + if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) { + i32 job_id = 0; + i32 job_count = 0; + struct worker_job_queue *job_queue = job->queue; + { + struct sys_lock lock = sys_mutex_lock_e(job->mutex); + job_id = job->num_dispatched++; + job_count = job->count; + job_queue = job->queue; + if (job_id < job_count) { + ++job->num_workers; + } + sys_mutex_unlock(&lock); + } + + /* Execute job */ + if (job_id < job_count) { + __profscope(Execute job); + struct job_data data = ZI; + data.sig = job->sig; + job_func *func = job->func; + b32 should_release = false; + while (job_id < job_count) { + /* Remove job from queue */ + if (job_id == (job_count - 1)) { + __profscope(Dequeue job); + struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + sys_mutex_unlock(&queue_lock); + } + /* Run */ + { + data.id = job_id; + func(data); + } + /* Grab new ID or exit job */ + { + struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); + job_id = job->num_dispatched++; + if (job_id >= job_count) { + i32 num_workers = --job->num_workers; + if (num_workers == 0) { + is_done = true; + atomic_u64_eval_add_u64(&job->gen, 1); + should_release = true; + sys_condition_variable_signal(job->gen_cv, U32_MAX); + } + } + sys_mutex_unlock(&job_lock); + } + } + if (should_release) { + __profscope(Release job); + struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex); + { + job->next_free = G.first_free_job; + G.first_free_job = job; + } + sys_mutex_unlock(&fj_lock); + } + } + } + + /* Wait for job completion */ + if (!is_done) { + __profscope(Wait for job); + struct sys_lock lock = sys_mutex_lock_s(job->mutex); + is_done = atomic_u64_eval(&job->gen) != handle.gen; + while (!is_done) { + sys_condition_variable_wait(job->gen_cv, &lock); + is_done = atomic_u64_eval(&job->gen) != handle.gen; + } + sys_mutex_unlock(&lock); + } + } + } +} + +/* ========================== * + * Worker + * ========================== */ + +INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) +{ + i32 worker_id = (i32)(i64)thread_arg; + + struct worker_ctx *ctx = thread_local_var_eval(&tl_worker_ctx); + ctx->worker_id = worker_id; + + struct worker_job_queues *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; + u64 seen_queue_submit_gen = 0; + + struct sys_lock queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + while (!G.workers_shutdown) { + sys_mutex_unlock(&queue_lock); + + /* Try to pick job from queue */ + i32 job_id = 0; + i32 job_count = 0; + b32 job_is_pinned_to_worker = false; + struct worker_job_queue *job_queue = NULL; + struct worker_job *job = NULL; + { + __profscope(Pick job); + queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + { + seen_queue_submit_gen = G.queue_submit_gen; + for (i32 queue_index = 0; queue_index < ARRAY_COUNT(queues); ++queue_index) { + struct worker_job_queue *queue = queues[queue_index]; + struct worker_job *tmp = queue->first; + while (!job && tmp) { + struct sys_lock job_lock = sys_mutex_lock_e(tmp->mutex); + { + i32 tmp_id = tmp->num_dispatched; + i32 tmp_count = tmp->count; + i32 tmp_pinned_worker = tmp->pinned_worker_id; + b32 tmp_is_pinned_to_worker = tmp_pinned_worker == worker_id; + if (tmp_id < tmp_count) { + /* Pick job */ + ++tmp->num_workers; + ++tmp->num_dispatched; + job = tmp; + job_id = tmp_id; + job_count = tmp_count; + job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id; + job_queue = tmp->queue; + } + } + sys_mutex_unlock(&job_lock); + tmp = tmp->next; + } + } + } + sys_mutex_unlock(&queue_lock); + } + + /* Remove job from queue */ + if (job_id == (job_count - 1)) { + __profscope(Dequeue job); + queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + sys_mutex_unlock(&queue_lock); + } + + /* Execute job */ + if (job) { + __profscope(Execute job); + ctx->pin_depth += job_is_pinned_to_worker; + atomic_i32_eval_add(&G.num_idle_worker_threads, -1); + struct job_data data = ZI; + data.sig = job->sig; + job_func *func = job->func; + b32 should_release = false; + while (job_id < job_count) { + /* Run */ + { + data.id = job_id; + func(data); + } + /* Grab new ID */ + { + struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); + job_id = job->num_dispatched++; + if (job_id >= job_count) { + i32 num_workers = --job->num_workers; + if (num_workers == 0) { + atomic_u64_eval_add_u64(&job->gen, 1); + should_release = true; + sys_condition_variable_signal(job->gen_cv, U32_MAX); + } + } + sys_mutex_unlock(&job_lock); + } + } + if (should_release) { + __profscope(Release job); + struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex); + { + job->next_free = G.first_free_job; + G.first_free_job = job; + } + sys_mutex_unlock(&fj_lock); + } + atomic_i32_eval_add(&G.num_idle_worker_threads, 1); + ctx->pin_depth -= job_is_pinned_to_worker; + } + + queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + if (!G.workers_shutdown && G.queue_submit_gen == seen_queue_submit_gen) { + //__profscope(Worker sleep); + sys_condition_variable_wait(G.workers_wake_cv, &queue_lock); } } } diff --git a/src/job.h b/src/job.h index 4a8d0a7e..116c33ee 100644 --- a/src/job.h +++ b/src/job.h @@ -8,7 +8,7 @@ * Startup * ========================== */ -void job_startup(i32 num_workers); +void job_startup(i32 num_workers, struct string *worker_names); /* ========================== * * Job @@ -27,8 +27,9 @@ struct job_handle { #define JOB_DEF(job_name, arg_name) void job_name(struct job_data arg_name) typedef JOB_DEF(job_func, job_data); -struct job_handle job_dispatch_async(u32 count, job_func *job_func, void *sig); -void job_dispatch_wait(u32 count, job_func *job_func, void *sig); +struct job_handle job_dispatch_async(i32 count, job_func *job_func, void *sig); +void job_dispatch_wait(i32 count, job_func *job_func, void *sig); +void job_dispatch_pinned(i32 worker_id, job_func *job_func, void *sig); void job_wait(struct job_handle handle); #endif diff --git a/src/playback_wasapi.c b/src/playback_wasapi.c index 44d07010..fe19caba 100644 --- a/src/playback_wasapi.c +++ b/src/playback_wasapi.c @@ -11,6 +11,7 @@ #include "mixer.h" #include "atomic.h" #include "app.h" +#include "job.h" #define COBJMACROS #define WIN32_LEAN_AND_MEAN @@ -37,8 +38,7 @@ struct wasapi_buffer { }; GLOBAL struct { - struct atomic_i32 playback_thread_shutdown; - struct sys_thread *playback_thread; + struct atomic_i32 shutdown; IAudioClient *client; HANDLE event; IAudioRenderClient *playback; @@ -52,14 +52,14 @@ GLOBAL struct { INTERNAL void wasapi_initialize(void); INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown); -INTERNAL SYS_THREAD_DEF(playback_thread_entry_point, arg); +INTERNAL JOB_DEF(playback_job, _); struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *mixer_sr) { (UNUSED)mixer_sr; wasapi_initialize(); - G.playback_thread = sys_thread_alloc(&playback_thread_entry_point, NULL, LIT("[P7] Audio thread")); + job_dispatch_pinned(APP_DEDICATED_WORKER_ID_AUDIO, playback_job, NULL); app_register_exit_callback(&playback_shutdown); return (struct playback_startup_receipt) { 0 }; @@ -68,8 +68,7 @@ struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *m INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown) { __prof; - atomic_i32_eval_exchange(&G.playback_thread_shutdown, true); - sys_thread_wait_release(G.playback_thread); + atomic_i32_eval_exchange(&G.shutdown, true); } /* ========================== * @@ -228,10 +227,10 @@ INTERNAL void wasapi_update_end(struct wasapi_buffer *wspbuf, struct mixed_pcm_f * Playback thread entry * ========================== */ -INTERNAL SYS_THREAD_DEF(playback_thread_entry_point, arg) +INTERNAL JOB_DEF(playback_job, _) { + (UNUSED)_; struct arena_temp scratch = scratch_begin_no_conflict(); - (UNUSED)arg; /* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */ DWORD task = 0; @@ -240,7 +239,7 @@ INTERNAL SYS_THREAD_DEF(playback_thread_entry_point, arg) /* FIXME: If playback fails at any point and mixer stops advancing, we * need to halt mixer to prevent memory leak when sounds are played. */ - while (!atomic_i32_eval(&G.playback_thread_shutdown)) { + while (!atomic_i32_eval(&G.shutdown)) { struct arena_temp temp = arena_temp_begin(scratch.arena); struct wasapi_buffer wspbuf = wasapi_update_begin(); struct mixed_pcm_f32 pcm = mixer_update(temp.arena, wspbuf.frames_count); diff --git a/src/sys_win32.c b/src/sys_win32.c index b55ea886..fda76878 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -1622,7 +1622,6 @@ struct sys_mutex *sys_mutex_alloc(void) void sys_mutex_release(struct sys_mutex *mutex) { __prof; - struct win32_mutex *m = (struct win32_mutex *)mutex; ASSERT(atomic_i64_eval(&m->count) == 0); /* Mutex should be unlocked */ { @@ -1635,7 +1634,6 @@ void sys_mutex_release(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) { - __prof; struct win32_mutex *m = (struct win32_mutex *)mutex; __proflock_before_exclusive_lock(m->profiling_ctx); AcquireSRWLockExclusive((SRWLOCK *)&m->srwlock); @@ -1652,7 +1650,6 @@ struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) { - __prof; struct win32_mutex *m = (struct win32_mutex *)mutex; __proflock_before_shared_lock(m->profiling_ctx); AcquireSRWLockShared((SRWLOCK *)&m->srwlock); @@ -1667,7 +1664,6 @@ struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) void sys_mutex_unlock(struct sys_lock *lock) { - __prof; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; #if RTC atomic_i64_eval_add(&m->count, -1); @@ -1747,7 +1743,6 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv) void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; @@ -1785,7 +1780,6 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; @@ -1824,7 +1818,6 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; /* Windows will wake all waiters if many single-wakes occur anyway, so we * might as well wake all ourselves. @@ -1840,7 +1833,6 @@ void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 co void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; WakeAllConditionVariable(&cv->condition_variable); } diff --git a/src/user.c b/src/user.c index 92c0604c..3eb76175 100644 --- a/src/user.c +++ b/src/user.c @@ -49,11 +49,8 @@ struct console_log { }; GLOBAL struct { - struct atomic_i32 user_thread_shutdown; - struct sys_thread *user_thread; + struct atomic_i32 shutdown; - struct atomic_i32 local_sim_thread_shutdown; - struct sys_thread *local_sim_thread; struct sim_ctx *local_sim_ctx; struct arena *arena; @@ -197,8 +194,8 @@ GLOBAL READONLY enum user_bind_kind g_binds[SYS_BTN_COUNT] = { INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(user_shutdown); INTERNAL LOG_EVENT_CALLBACK_FUNC_DEF(debug_console_log_callback, log); -INTERNAL SYS_THREAD_DEF(user_thread_entry_point, arg); -INTERNAL SYS_THREAD_DEF(user_local_sim_thread_entry_point, arg); +INTERNAL JOB_DEF(user_job, _); +INTERNAL JOB_DEF(local_sim_job , _); INTERNAL SYS_WINDOW_EVENT_CALLBACK_FUNC_DEF(window_event_callback, event); struct user_startup_receipt user_startup(struct gp_startup_receipt *gp_sr, @@ -263,11 +260,9 @@ struct user_startup_receipt user_startup(struct gp_startup_receipt *gp_sr, G.window = window; sys_window_register_event_callback(G.window, &window_event_callback); - G.local_sim_thread = sys_thread_alloc(&user_local_sim_thread_entry_point, G.local_sim_ctx, LIT("[P8] Local sim thread")); - - //G.debug_draw = true; - - G.user_thread = sys_thread_alloc(&user_thread_entry_point, NULL, LIT("[P9] User thread")); + /* Start jobs */ + job_dispatch_pinned(APP_DEDICATED_WORKER_ID_SIM, local_sim_job, NULL); + job_dispatch_pinned(APP_DEDICATED_WORKER_ID_USER, user_job, NULL); app_register_exit_callback(&user_shutdown); return (struct user_startup_receipt) { 0 }; @@ -278,20 +273,7 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(user_shutdown) __prof; sys_window_unregister_event_callback(G.window, &window_event_callback); - - atomic_i32_eval_exchange(&G.user_thread_shutdown, true); - sys_thread_wait_release(G.user_thread); - -#if 0 - if (G.local_sim_ctx) { - atomic_i32_eval_exchange(&G.local_sim_thread_shutdown, true); - sys_thread_wait_release(&G.local_sim_thread); - sim_ctx_release(G.local_sim_ctx); - } -#else - atomic_i32_eval_exchange(&G.local_sim_thread_shutdown, true); - sys_thread_wait_release(G.local_sim_thread); -#endif + atomic_i32_eval_exchange(&G.shutdown, true); } /* ========================== * @@ -2125,13 +2107,13 @@ INTERNAL void user_update(void) * User thread * ========================== */ -INTERNAL SYS_THREAD_DEF(user_thread_entry_point, arg) +INTERNAL JOB_DEF(user_job, _) { - (UNUSED)arg; + (UNUSED)_; i64 last_frame_ns = 0; i64 target_dt_ns = NS_FROM_SECONDS(USER_FPS_LIMIT > (0) ? (1.0 / USER_FPS_LIMIT) : 0); - while (!atomic_i32_eval(&G.user_thread_shutdown)) { + while (!atomic_i32_eval(&G.shutdown)) { { __profscope(User sleep); sleep_frame(last_frame_ns, target_dt_ns); @@ -2214,8 +2196,10 @@ struct sim_decode_queue { }; -INTERNAL SYS_THREAD_DEF(user_local_sim_thread_entry_point, arg) +INTERNAL JOB_DEF(local_sim_job, _) { + (UNUSED)_; + #if 0 struct host_listen_address local_listen_addr = host_listen_address_from_local_name(LIT("LOCAL_SIM")); struct host_listen_address net_listen_addr = host_listen_address_from_net_port(12345); @@ -2224,7 +2208,6 @@ INTERNAL SYS_THREAD_DEF(user_local_sim_thread_entry_point, arg) //host_listen(host, local_listen_addr); //host_listen(host, net_listen_addr); #endif - (UNUSED)arg; b32 is_master = false; struct host *host; @@ -2290,7 +2273,7 @@ INTERNAL SYS_THREAD_DEF(user_local_sim_thread_entry_point, arg) i64 real_dt_ns = 0; i64 step_dt_ns = NS_FROM_SECONDS(1) / SIM_TICKS_PER_SECOND; f64 compute_timescale = 1.0; - while (!atomic_i32_eval(&G.local_sim_thread_shutdown)) { + while (!atomic_i32_eval(&G.shutdown)) { struct arena_temp scratch = scratch_begin_no_conflict(); { __profscope(Sim sleep);