From 25e20ea3bdd9d0b681acf0c7b17abb48104ef42c Mon Sep 17 00:00:00 2001 From: jacob Date: Fri, 11 Jul 2025 01:05:07 -0500 Subject: [PATCH] sys job pools --- src/common.h | 19 +-- src/font.c | 2 +- src/gp_dx12.c | 4 +- src/playback_wasapi.c | 2 +- src/resource.c | 2 +- src/sound.c | 2 +- src/sprite.c | 4 +- src/sys.h | 24 +++- src/sys_win32.c | 284 ++++++++++++++++++++++++------------------ src/user.c | 4 +- 10 files changed, 207 insertions(+), 140 deletions(-) diff --git a/src/common.h b/src/common.h index 8c634a63..b75bdcab 100644 --- a/src/common.h +++ b/src/common.h @@ -685,14 +685,17 @@ INLINE f64 clamp_f64(f64 v, f64 min, f64 max) { return v < min ? min : v > max ? #include "prof_tracy.h" -#define PROF_THREAD_GROUP_WORKERS -MEBI(8) -#define PROF_THREAD_GROUP_FIBERS -MEBI(7) -#define PROF_THREAD_GROUP_SCHEDULER -MEBI(6) -#define PROF_THREAD_GROUP_IO -MEBI(5) -#define PROF_THREAD_GROUP_WINDOW -MEBI(4) -#define PROF_THREAD_GROUP_EVICTORS -MEBI(3) -#define PROF_THREAD_GROUP_APP -MEBI(2) -#define PROF_THREAD_GROUP_MAIN -MEBI(1) + +#define PROF_THREAD_GROUP_WORKERS_DEDICATED -MEBI(10) +#define PROF_THREAD_GROUP_FIBERS -MEBI(9) +#define PROF_THREAD_GROUP_WORKERS_BACKGROUND -MEBI(8) +#define PROF_THREAD_GROUP_WORKERS_BLOCKING -MEBI(7) +#define PROF_THREAD_GROUP_SCHEDULER -MEBI(6) +#define PROF_THREAD_GROUP_IO -MEBI(5) +#define PROF_THREAD_GROUP_WINDOW -MEBI(4) +#define PROF_THREAD_GROUP_EVICTORS -MEBI(3) +#define PROF_THREAD_GROUP_APP -MEBI(2) +#define PROF_THREAD_GROUP_MAIN -MEBI(1) #ifdef __cplusplus } diff --git a/src/font.c b/src/font.c index d2b6ddd7..114cd0d5 100644 --- a/src/font.c +++ b/src/font.c @@ -183,7 +183,7 @@ struct asset *font_load_asset(struct string path, f32 point_size, b32 wait) /* Push task */ asset_cache_mark_loading(asset); - sys_run(1, font_load_asset_job, params, SYS_PRIORITY_BACKGROUND, 0); + sys_run(1, font_load_asset_job, params, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, 0); if (wait) { asset_cache_wait(asset); } diff --git a/src/gp_dx12.c b/src/gp_dx12.c index 7ba061e6..4de6b8dd 100644 --- a/src/gp_dx12.c +++ b/src/gp_dx12.c @@ -925,7 +925,7 @@ INTERNAL SYS_JOB_DEF(pipeline_init_job, job) struct shader_compile_job_param *params[] = { &vs, &ps }; struct shader_compile_job_sig comp_sig = { .params = params }; struct snc_counter counter = ZI; - sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_INHERIT, &counter); + sys_run(countof(params), shader_compile_job, &comp_sig, SYS_POOL_INHERIT, SYS_PRIORITY_INHERIT, &counter); snc_counter_wait(&counter); success = vs.success && ps.success; } @@ -1119,7 +1119,7 @@ INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in, __prof; struct pipeline_init_job_sig sig = { .descs_in = descs_in, .pipelines_out = pipelines_out }; struct snc_counter counter = ZI; - sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_INHERIT, &counter); + sys_run(num_pipelines, pipeline_init_job, &sig, SYS_POOL_INHERIT, SYS_PRIORITY_INHERIT, &counter); snc_counter_wait(&counter); } diff --git a/src/playback_wasapi.c b/src/playback_wasapi.c index d8e9ba4d..03efd59a 100644 --- a/src/playback_wasapi.c +++ b/src/playback_wasapi.c @@ -256,7 +256,7 @@ INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _) { __profn("Run mix job & wait"); struct snc_counter counter = ZI; - sys_run(1, playback_mix_job, 0, SYS_PRIORITY_CRITICAL, &counter); + sys_run(1, playback_mix_job, 0, SYS_POOL_DEDICATED, SYS_PRIORITY_CRITICAL, &counter); snc_counter_wait(&counter); } } diff --git a/src/resource.c b/src/resource.c index b0e81eb2..a93ec1f4 100644 --- a/src/resource.c +++ b/src/resource.c @@ -290,7 +290,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _) sig.name = info->name; sig.callbacks = callbacks; struct snc_counter counter = ZI; - sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, &counter); + sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &counter); snc_counter_wait(&counter); } } diff --git a/src/sound.c b/src/sound.c index 07021d04..3f79269d 100644 --- a/src/sound.c +++ b/src/sound.c @@ -177,7 +177,7 @@ struct asset *sound_load_asset(struct string path, u32 flags, b32 wait) /* Push task */ asset_cache_mark_loading(asset); - sys_run(1, sound_load_asset_job, params, SYS_PRIORITY_BACKGROUND, &asset->counter); + sys_run(1, sound_load_asset_job, params, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &asset->counter); if (wait) { asset_cache_wait(asset); } diff --git a/src/sprite.c b/src/sprite.c index 0e9e832e..80353501 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -245,7 +245,7 @@ struct sprite_startup_receipt sprite_startup(void) G.scopes_arena = arena_alloc(GIBI(64)); - sys_run(1, sprite_evictor_job, 0, SYS_PRIORITY_BACKGROUND, &G.shutdown_counter); + sys_run(1, sprite_evictor_job, 0, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &G.shutdown_counter); sys_on_exit(&sprite_shutdown); resource_register_watch_callback(&sprite_resource_watch_callback); @@ -325,7 +325,7 @@ INTERNAL void push_load_job(struct cache_ref ref, struct sprite_tag tag) } /* Push work */ - sys_run(1, sprite_load_job, cmd, SYS_PRIORITY_BACKGROUND, 0); + sys_run(1, sprite_load_job, cmd, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, 0); } INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag tag) diff --git a/src/sys.h b/src/sys.h index 43de5908..3fcb3454 100644 --- a/src/sys.h +++ b/src/sys.h @@ -47,12 +47,32 @@ i16 sys_current_fiber_id(void); * Job * ========================== */ +/* Work pools contain their own worker threads with their own thread priority/affinity based on the intended context of the pool. */ +enum sys_pool { + SYS_POOL_INHERIT = -1, + + /* This pool contains workers that rougly map to dedicated performance cores on the CPU. + * Time critical jobs should be pushed here. */ + SYS_POOL_DEDICATED = 0, + + /* This pool contains workers that that will not interfere with the dedicated pool. + * Asynchronous jobs (e.g. as asset streaming) should be pushed here. */ + SYS_POOL_BACKGROUND = 1, + + /* This pool contains a large number of floating low priority worker threads with the intent that these threads will only block and do no actual work. + * Blocking operations (e.g. opening a file) should be isolated to jobs that get pushed to this pool. They can then be yielded on by jobs in other pools that actually do work. */ + SYS_POOL_BLOCKING = 2, + + NUM_SYS_POOLS +}; + +/* Job execution order within a pool is based on priority. */ enum sys_priority { SYS_PRIORITY_INHERIT = -1, SYS_PRIORITY_CRITICAL = 0, SYS_PRIORITY_HIGH = 1, SYS_PRIORITY_NORMAL = 2, - SYS_PRIORITY_BACKGROUND = 3, + SYS_PRIORITY_LOW = 3, NUM_SYS_PRIORITIES }; @@ -66,7 +86,7 @@ struct sys_job_data { typedef SYS_JOB_DEF(sys_job_func, job_data); struct snc_counter; -void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct snc_counter *counter); +void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter); /* ========================== * * Scratch context diff --git a/src/sys_win32.c b/src/sys_win32.c index 5c9b0de8..6b429922 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -192,7 +192,8 @@ struct alignas(64) fiber { void *job_sig; /* 08 bytes */ /* ==================================================== */ i32 job_id; /* 04 bytes */ - i32 job_priority; /* 04 bytes */ + i16 job_pool; /* 02 bytes */ + i16 job_priority; /* 02 bytes */ /* ==================================================== */ struct snc_counter *job_counter; /* 08 bytes */ /* ==================================================== */ @@ -206,6 +207,7 @@ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */ struct alignas(64) worker_ctx { + enum sys_pool pool_kind; i32 id; }; @@ -222,18 +224,7 @@ struct job_info { struct job_info *next; }; -enum job_queue_kind { - JOB_QUEUE_KIND_CRITICAL_PRIORITY, - JOB_QUEUE_KIND_HIGH_PRIORITY, - JOB_QUEUE_KIND_NORMAL_PRIORITY, - JOB_QUEUE_KIND_BACKGROUND, - - NUM_JOB_QUEUE_KINDS -}; - struct alignas(64) job_queue { - enum job_queue_kind kind; - struct ticket_mutex lock; struct arena *arena; @@ -243,6 +234,24 @@ struct alignas(64) job_queue { struct job_info *first_free; }; +struct alignas(64) job_pool { + /* Jobs */ + struct job_queue job_queues[NUM_SYS_PRIORITIES]; + + /* Workers */ + struct atomic_i32_padded workers_shutdown; + struct atomic_i64_padded num_jobs_in_queue; + struct snc_mutex workers_wake_mutex; + struct snc_cv workers_wake_cv; + + i32 num_worker_threads; + i32 thread_priority; + u64 thread_affinity_mask; + struct arena *worker_threads_arena; + struct sys_thread **worker_threads; + struct worker_ctx *worker_contexts; +}; + /* ========================== * * Global state * ========================== */ @@ -252,6 +261,7 @@ GLOBAL struct { i64 timer_start_qpc; i64 ns_per_qpc; u32 main_thread_id; + struct atomic_i32 shutdown; wchar_t cmdline_args_wstr[8192]; @@ -291,6 +301,13 @@ GLOBAL struct { struct atomic_i64_padded current_scheduler_cycle; struct atomic_i64_padded current_scheduler_cycle_period_ns; + /* Fibers */ + i16 num_fibers; + i16 first_free_fiber_id; + struct arena *fiber_names_arena; + struct ticket_mutex fibers_lock; + struct fiber fibers[SYS_MAX_FIBERS]; + /* Wait lists */ struct atomic_u64_padded waiter_wake_gen; struct ticket_mutex wait_lists_arena_lock; @@ -300,27 +317,8 @@ GLOBAL struct { struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS]; struct wait_bin wait_time_bins[NUM_WAIT_TIME_BINS]; - /* Fibers */ - i16 num_fibers; - i16 first_free_fiber_id; - struct arena *fiber_names_arena; - struct ticket_mutex fibers_lock; - struct fiber fibers[SYS_MAX_FIBERS]; - - /* Jobs */ - struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; - - /* Workers */ - struct atomic_i32_padded workers_shutdown; - struct atomic_i64_padded num_jobs_in_queue; - struct snc_mutex workers_wake_mutex; - struct snc_cv workers_wake_cv; - - i32 num_worker_threads; - struct arena *worker_threads_arena; - struct sys_thread **worker_threads; - struct worker_ctx *worker_contexts; - + /* Job pools */ + struct job_pool job_pools[NUM_SYS_POOLS]; } G = ZI, DEBUG_ALIAS(G, G_sys_win32); @@ -328,8 +326,6 @@ GLOBAL struct { INTERNAL struct fiber *fiber_from_id(i16 id); INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); -INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority); -INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind); @@ -539,10 +535,13 @@ INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers) /* Resume jobs */ /* TODO: Batch submit waiters based on queue kind rather than one at a time */ + i32 num_workers_to_wake[NUM_SYS_POOLS] = ZI; for (i32 i = 0; i < num_fibers; ++i) { struct fiber *fiber = fibers[i]; - enum job_queue_kind queue_kind = job_queue_kind_from_priority(fiber->job_priority); - struct job_queue *queue = &G.job_queues[queue_kind]; + enum sys_pool pool_kind = fiber->job_pool; + ++num_workers_to_wake[pool_kind]; + struct job_pool *pool = &G.job_pools[pool_kind]; + struct job_queue *queue = &pool->job_queues[fiber->job_priority]; tm_lock(&queue->lock); { struct job_info *info = 0; @@ -572,12 +571,18 @@ INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers) /* Wake workers */ if (num_fibers > 0) { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - { - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_fibers); - snc_cv_signal(&G.workers_wake_cv, num_fibers); + for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(num_workers_to_wake); ++pool_kind) { + i32 wake_count = num_workers_to_wake[pool_kind]; + if (wake_count > 0) { + struct job_pool *pool = &G.job_pools[pool_kind]; + struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex); + { + atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, wake_count); + snc_cv_signal(&pool->workers_wake_cv, wake_count); + } + snc_unlock(&lock); + } } - snc_unlock(&lock); } } @@ -761,6 +766,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) fiber->job_func = 0; fiber->job_sig = 0; fiber->job_id = 0; + fiber->job_pool = 0; fiber->job_priority = 0; fiber->job_counter = 0; fiber->yield_param = 0; @@ -796,16 +802,18 @@ i16 sys_current_fiber_id(void) return (i16)(i64)GetFiberData(); } -void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct snc_counter *counter) +void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter) { if (count > 0) { if (counter) { snc_counter_add(counter, count); } struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); - priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ - enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority); - struct job_queue *queue = &G.job_queues[queue_kind]; + /* A job cannot create a job with a pool / priority than itself */ + pool_kind = clamp_i32(pool_kind, fiber->job_pool, NUM_SYS_POOLS - 1); + priority = clamp_i32(priority, fiber->job_priority, NUM_SYS_PRIORITIES - 1); + struct job_pool *pool = &G.job_pools[pool_kind]; + struct job_queue *queue = &pool->job_queues[priority]; tm_lock(&queue->lock); { struct job_info *info = 0; @@ -828,14 +836,15 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit queue->last = info; } tm_unlock(&queue->lock); + + /* Wake workers */ + struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex); + { + atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, count); + snc_cv_signal(&pool->workers_wake_cv, count); + } + snc_unlock(&lock); } - /* Wake workers */ - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - { - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count); - snc_cv_signal(&G.workers_wake_cv, count); - } - snc_unlock(&lock); } /* ========================== * @@ -901,48 +910,33 @@ INTERNAL void job_fiber_entry(void *id_ptr) * Job worker thread * ========================== */ -INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority) -{ - STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ - return (enum job_queue_kind)priority; -} - -INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind) -{ - STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ - return (enum sys_priority)queue_kind; -} - INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) { struct worker_ctx *ctx = worker_ctx_arg; + enum sys_pool pool_kind = ctx->pool_kind; + struct job_pool *pool = &G.job_pools[pool_kind]; (UNUSED)ctx; { /* TODO: Heuristic pinning */ /* TODO: Pin non-worker threads to other cores */ HANDLE thread_handle = GetCurrentThread(); - b32 success = 0; - (UNUSED)success; - i32 priority = THREAD_PRIORITY_TIME_CRITICAL; - success = SetThreadPriority(thread_handle, priority); - ASSERT(success); + if (pool->thread_priority) { + b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0; + ASSERT(success); + (UNUSED)success; + } -#if 0 - u64 affinity_mask = (u64)1 << (ctx->id * 2); - success = SetThreadAffinityMask(thread_handle, affinity_mask) != 0; - ASSERT(success); -#endif + if (pool->thread_affinity_mask) { + b32 success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0; + ASSERT(success); + (UNUSED)success; + } } i32 worker_fiber_id = sys_current_fiber_id(); - struct job_queue *queues[countof(G.job_queues)] = ZI; - for (u32 i = 0; i < countof(G.job_queues); ++i) { - queues[i] = &G.job_queues[i]; - } - struct fiber *job_fiber = 0; b32 shutdown = 0; while (!shutdown) { @@ -955,14 +949,12 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) struct snc_counter *job_counter = 0; { //__profnc("Pull job", RGB32_F(0.75, 0.75, 0)); - for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) { - struct job_queue *queue = queues[queue_index]; + for (enum sys_priority priority = 0; priority < (i32)countof(pool->job_queues) && !job_func; ++priority) { + struct job_queue *queue = &pool->job_queues[priority]; if (queue) { tm_lock(&queue->lock); { struct job_info *info = queue->first; - job_priority = (enum sys_priority)queue->kind; - job_priority = job_priority_from_queue_kind(queue->kind); while (info && !job_func) { struct job_info *next = info->next; b32 dequeue = 0; @@ -970,7 +962,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) job_id = info->num_dispatched++; if (job_id < info->count) { /* Pick job */ - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1); + atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, -1); + job_priority = priority; job_func = info->func; job_sig = info->sig; job_counter = info->counter; @@ -981,8 +974,9 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) } } else { /* This job is to be resumed from a yield */ - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1); + atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, -1); job_fiber_id = info->fiber_id; + job_priority = priority; job_id = info->num_dispatched; job_func = info->func; job_sig = info->sig; @@ -1026,6 +1020,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) job_fiber->job_func = job_func; job_fiber->job_sig = job_sig; job_fiber->job_id = job_id; + job_fiber->job_pool = pool_kind; job_fiber->job_priority = job_priority; job_fiber->job_counter = job_counter; job_fiber->parent_id = worker_fiber_id; @@ -1184,13 +1179,13 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) } /* Wait */ - struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex); + struct snc_lock wake_lock = snc_lock_s(&pool->workers_wake_mutex); { - shutdown = atomic_i32_fetch(&G.workers_shutdown.v); - while (atomic_i64_fetch(&G.num_jobs_in_queue.v) <= 0 && !shutdown) { + shutdown = atomic_i32_fetch(&pool->workers_shutdown.v); + while (atomic_i64_fetch(&pool->num_jobs_in_queue.v) <= 0 && !shutdown) { //__profnc("Wait for job", RGB32_F(0.75, 0.75, 0)); - snc_cv_wait(&G.workers_wake_cv, &wake_lock); - shutdown = atomic_i32_fetch(&G.workers_shutdown.v); + snc_cv_wait(&pool->workers_wake_cv, &wake_lock); + shutdown = atomic_i32_fetch(&pool->workers_shutdown.v); } } snc_unlock(&wake_lock); @@ -1228,7 +1223,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) } i64 last_cycle_ns = 0; - while (!atomic_i32_fetch(&G.workers_shutdown.v)) { + while (!atomic_i32_fetch(&G.shutdown)) { __profn("Job scheduler cycle"); { __profn("Job scheduler wait"); @@ -1283,22 +1278,60 @@ INTERNAL SYS_THREAD_DEF(test_entry, _) struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, 0, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); /* Start workers */ - //G.num_worker_threads = 1; - G.num_worker_threads = 4; - G.worker_threads_arena = arena_alloc(GIBI(64)); - G.worker_threads = arena_push_array(G.worker_threads_arena, struct sys_thread *, G.num_worker_threads); - G.worker_contexts = arena_push_array(G.worker_threads_arena, struct worker_ctx, G.num_worker_threads); - for (i32 i = 0; i < G.num_worker_threads; ++i) { - struct worker_ctx *ctx = &G.worker_contexts[i]; - ctx->id = i; - struct string name = string_format(scratch.arena, LIT("Worker #%F"), FMT_SINT(i)); - G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i); + /* TODO: Heuristic worker count & priorities */ + for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) { + struct job_pool *pool = &G.job_pools[pool_kind]; + struct string name_fmt = ZI; + i32 prof_group = 0; + switch (pool_kind) { + default: ASSERT(0); break; + + case SYS_POOL_DEDICATED: + { + name_fmt = LIT("Dedicated worker #%F"); + prof_group = PROF_THREAD_GROUP_WORKERS_DEDICATED; + pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL; + pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL; + pool->num_worker_threads = 4; + } break; + + case SYS_POOL_BACKGROUND: + { + name_fmt = LIT("Background worker #%F"); + prof_group = PROF_THREAD_GROUP_WORKERS_BACKGROUND; + pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL; + pool->thread_priority = 0; + pool->num_worker_threads = 4; + } break; + + case SYS_POOL_BLOCKING: + { + name_fmt = LIT("Blocking worker #%F"); + prof_group = PROF_THREAD_GROUP_WORKERS_BLOCKING; + pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL; + pool->thread_priority = 0; + pool->num_worker_threads = 4; + } break; + } + pool->worker_threads_arena = arena_alloc(GIBI(64)); + pool->worker_threads = arena_push_array(pool->worker_threads_arena, struct sys_thread *, pool->num_worker_threads); + pool->worker_contexts = arena_push_array(pool->worker_threads_arena, struct worker_ctx, pool->num_worker_threads); + for (i32 i = 0; i < pool->num_worker_threads; ++i) { + struct worker_ctx *ctx = &pool->worker_contexts[i]; + ctx->pool_kind = pool_kind; + ctx->id = i; + struct string name = string_format(scratch.arena, name_fmt, FMT_SINT(i)); + pool->worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, prof_group + i); + } } /* Wait on workers */ - for (i32 i = 0; i < G.num_worker_threads; ++i) { - struct sys_thread *worker_thread = G.worker_threads[i]; - sys_thread_wait_release(worker_thread); + for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) { + struct job_pool *pool = &G.job_pools[pool_kind]; + for (i32 i = 0; i < pool->num_worker_threads; ++i) { + struct sys_thread *worker_thread = pool->worker_threads[i]; + sys_thread_wait_release(worker_thread); + } } /* Wait on scheduler */ sys_thread_wait_release(scheduler_thread); @@ -2254,7 +2287,7 @@ INTERNAL SYS_THREAD_DEF(window_present_thread_entry_point, arg) struct sys_window_present_job_sig sig = ZI; sig.window = (struct sys_window *)window; sig.events = events; - sys_run(1, window->present_job, &sig, SYS_PRIORITY_HIGH, &counter); + sys_run(1, window->present_job, &sig, SYS_POOL_DEDICATED, SYS_PRIORITY_HIGH, &counter); snc_counter_wait(&counter); } arena_reset(events_arena); @@ -3189,24 +3222,27 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, G.timer_start_qpc = qpc.QuadPart; } - - /* Init wait lists */ - G.wait_lists_arena = arena_alloc(GIBI(64)); - /* Init fibers */ G.num_fibers = 1; /* Fiber at index 0 always nil */ G.fiber_names_arena = arena_alloc(GIBI(64)); - /* Init job queues */ - for (u32 i = 0; i < countof(G.job_queues); ++i) { - struct job_queue *queue = &G.job_queues[i]; - queue->kind = (enum job_queue_kind)i; - queue->arena = arena_alloc(GIBI(64)); - } + /* Init wait lists */ + G.wait_lists_arena = arena_alloc(GIBI(64)); /* Convert main thread to fiber */ fiber_alloc(FIBER_KIND_CONVERTED_THREAD); + /* Init job pools */ + for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) { + struct job_pool *pool = &G.job_pools[pool_kind]; + + /* Init queues */ + for (enum sys_priority priority = 0; priority < (i32)countof(pool->job_queues); ++priority) { + struct job_queue *queue = &pool->job_queues[priority]; + queue->arena = arena_alloc(GIBI(64)); + } + } + u64 cmdline_len = wstr_len(cmdline_wstr, countof(G.cmdline_args_wstr) - 1); MEMCPY(G.cmdline_args_wstr, cmdline_wstr, cmdline_len * sizeof(*cmdline_wstr)); G.cmdline_args_wstr[cmdline_len] = 0; @@ -3322,12 +3358,20 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, } } + /* Signal shutdown */ + atomic_i32_fetch_set(&G.shutdown, 1); + /* Shutdown test thread */ { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - atomic_i32_fetch_set(&G.workers_shutdown.v, 1); - snc_cv_signal(&G.workers_wake_cv, I32_MAX); - snc_unlock(&lock); + for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) { + struct job_pool *pool = &G.job_pools[pool_kind]; + struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex); + { + atomic_i32_fetch_set(&pool->workers_shutdown.v, 1); + snc_cv_signal(&pool->workers_wake_cv, I32_MAX); + } + snc_unlock(&lock); + } } sys_thread_wait_release(test_thread); diff --git a/src/user.c b/src/user.c index d9d0aaea..f7335e28 100644 --- a/src/user.c +++ b/src/user.c @@ -242,8 +242,8 @@ struct user_startup_receipt user_startup(struct font_startup_receipt *font_sr, //log_register_callback(debug_console_log_callback, LOG_LEVEL_SUCCESS); log_register_callback(debug_console_log_callback, LOG_LEVEL_DEBUG); - /* Start jobs */ - sys_run(1, local_sim_job, 0, SYS_PRIORITY_HIGH, &G.shutdown_job_counters); + /* Start sim job */ + sys_run(1, local_sim_job, 0, SYS_POOL_DEDICATED, SYS_PRIORITY_HIGH, &G.shutdown_job_counters); sys_on_exit(&user_shutdown); G.window = sys_window_alloc(user_update_job);