sys job pools

This commit is contained in:
jacob 2025-07-11 01:05:07 -05:00
parent 13e1860656
commit 25e20ea3bd
10 changed files with 207 additions and 140 deletions

View File

@ -685,8 +685,11 @@ INLINE f64 clamp_f64(f64 v, f64 min, f64 max) { return v < min ? min : v > max ?
#include "prof_tracy.h"
#define PROF_THREAD_GROUP_WORKERS -MEBI(8)
#define PROF_THREAD_GROUP_FIBERS -MEBI(7)
#define PROF_THREAD_GROUP_WORKERS_DEDICATED -MEBI(10)
#define PROF_THREAD_GROUP_FIBERS -MEBI(9)
#define PROF_THREAD_GROUP_WORKERS_BACKGROUND -MEBI(8)
#define PROF_THREAD_GROUP_WORKERS_BLOCKING -MEBI(7)
#define PROF_THREAD_GROUP_SCHEDULER -MEBI(6)
#define PROF_THREAD_GROUP_IO -MEBI(5)
#define PROF_THREAD_GROUP_WINDOW -MEBI(4)

View File

@ -183,7 +183,7 @@ struct asset *font_load_asset(struct string path, f32 point_size, b32 wait)
/* Push task */
asset_cache_mark_loading(asset);
sys_run(1, font_load_asset_job, params, SYS_PRIORITY_BACKGROUND, 0);
sys_run(1, font_load_asset_job, params, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, 0);
if (wait) {
asset_cache_wait(asset);
}

View File

@ -925,7 +925,7 @@ INTERNAL SYS_JOB_DEF(pipeline_init_job, job)
struct shader_compile_job_param *params[] = { &vs, &ps };
struct shader_compile_job_sig comp_sig = { .params = params };
struct snc_counter counter = ZI;
sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_INHERIT, &counter);
sys_run(countof(params), shader_compile_job, &comp_sig, SYS_POOL_INHERIT, SYS_PRIORITY_INHERIT, &counter);
snc_counter_wait(&counter);
success = vs.success && ps.success;
}
@ -1119,7 +1119,7 @@ INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in,
__prof;
struct pipeline_init_job_sig sig = { .descs_in = descs_in, .pipelines_out = pipelines_out };
struct snc_counter counter = ZI;
sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_INHERIT, &counter);
sys_run(num_pipelines, pipeline_init_job, &sig, SYS_POOL_INHERIT, SYS_PRIORITY_INHERIT, &counter);
snc_counter_wait(&counter);
}

View File

@ -256,7 +256,7 @@ INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _)
{
__profn("Run mix job & wait");
struct snc_counter counter = ZI;
sys_run(1, playback_mix_job, 0, SYS_PRIORITY_CRITICAL, &counter);
sys_run(1, playback_mix_job, 0, SYS_POOL_DEDICATED, SYS_PRIORITY_CRITICAL, &counter);
snc_counter_wait(&counter);
}
}

View File

@ -290,7 +290,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _)
sig.name = info->name;
sig.callbacks = callbacks;
struct snc_counter counter = ZI;
sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, &counter);
sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &counter);
snc_counter_wait(&counter);
}
}

View File

@ -177,7 +177,7 @@ struct asset *sound_load_asset(struct string path, u32 flags, b32 wait)
/* Push task */
asset_cache_mark_loading(asset);
sys_run(1, sound_load_asset_job, params, SYS_PRIORITY_BACKGROUND, &asset->counter);
sys_run(1, sound_load_asset_job, params, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &asset->counter);
if (wait) {
asset_cache_wait(asset);
}

View File

@ -245,7 +245,7 @@ struct sprite_startup_receipt sprite_startup(void)
G.scopes_arena = arena_alloc(GIBI(64));
sys_run(1, sprite_evictor_job, 0, SYS_PRIORITY_BACKGROUND, &G.shutdown_counter);
sys_run(1, sprite_evictor_job, 0, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &G.shutdown_counter);
sys_on_exit(&sprite_shutdown);
resource_register_watch_callback(&sprite_resource_watch_callback);
@ -325,7 +325,7 @@ INTERNAL void push_load_job(struct cache_ref ref, struct sprite_tag tag)
}
/* Push work */
sys_run(1, sprite_load_job, cmd, SYS_PRIORITY_BACKGROUND, 0);
sys_run(1, sprite_load_job, cmd, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, 0);
}
INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag tag)

View File

@ -47,12 +47,32 @@ i16 sys_current_fiber_id(void);
* Job
* ========================== */
/* Work pools contain their own worker threads with their own thread priority/affinity based on the intended context of the pool. */
enum sys_pool {
SYS_POOL_INHERIT = -1,
/* This pool contains workers that rougly map to dedicated performance cores on the CPU.
* Time critical jobs should be pushed here. */
SYS_POOL_DEDICATED = 0,
/* This pool contains workers that that will not interfere with the dedicated pool.
* Asynchronous jobs (e.g. as asset streaming) should be pushed here. */
SYS_POOL_BACKGROUND = 1,
/* This pool contains a large number of floating low priority worker threads with the intent that these threads will only block and do no actual work.
* Blocking operations (e.g. opening a file) should be isolated to jobs that get pushed to this pool. They can then be yielded on by jobs in other pools that actually do work. */
SYS_POOL_BLOCKING = 2,
NUM_SYS_POOLS
};
/* Job execution order within a pool is based on priority. */
enum sys_priority {
SYS_PRIORITY_INHERIT = -1,
SYS_PRIORITY_CRITICAL = 0,
SYS_PRIORITY_HIGH = 1,
SYS_PRIORITY_NORMAL = 2,
SYS_PRIORITY_BACKGROUND = 3,
SYS_PRIORITY_LOW = 3,
NUM_SYS_PRIORITIES
};
@ -66,7 +86,7 @@ struct sys_job_data {
typedef SYS_JOB_DEF(sys_job_func, job_data);
struct snc_counter;
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct snc_counter *counter);
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter);
/* ========================== *
* Scratch context

View File

@ -192,7 +192,8 @@ struct alignas(64) fiber {
void *job_sig; /* 08 bytes */
/* ==================================================== */
i32 job_id; /* 04 bytes */
i32 job_priority; /* 04 bytes */
i16 job_pool; /* 02 bytes */
i16 job_priority; /* 02 bytes */
/* ==================================================== */
struct snc_counter *job_counter; /* 08 bytes */
/* ==================================================== */
@ -206,6 +207,7 @@ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */
STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */
struct alignas(64) worker_ctx {
enum sys_pool pool_kind;
i32 id;
};
@ -222,18 +224,7 @@ struct job_info {
struct job_info *next;
};
enum job_queue_kind {
JOB_QUEUE_KIND_CRITICAL_PRIORITY,
JOB_QUEUE_KIND_HIGH_PRIORITY,
JOB_QUEUE_KIND_NORMAL_PRIORITY,
JOB_QUEUE_KIND_BACKGROUND,
NUM_JOB_QUEUE_KINDS
};
struct alignas(64) job_queue {
enum job_queue_kind kind;
struct ticket_mutex lock;
struct arena *arena;
@ -243,6 +234,24 @@ struct alignas(64) job_queue {
struct job_info *first_free;
};
struct alignas(64) job_pool {
/* Jobs */
struct job_queue job_queues[NUM_SYS_PRIORITIES];
/* Workers */
struct atomic_i32_padded workers_shutdown;
struct atomic_i64_padded num_jobs_in_queue;
struct snc_mutex workers_wake_mutex;
struct snc_cv workers_wake_cv;
i32 num_worker_threads;
i32 thread_priority;
u64 thread_affinity_mask;
struct arena *worker_threads_arena;
struct sys_thread **worker_threads;
struct worker_ctx *worker_contexts;
};
/* ========================== *
* Global state
* ========================== */
@ -252,6 +261,7 @@ GLOBAL struct {
i64 timer_start_qpc;
i64 ns_per_qpc;
u32 main_thread_id;
struct atomic_i32 shutdown;
wchar_t cmdline_args_wstr[8192];
@ -291,6 +301,13 @@ GLOBAL struct {
struct atomic_i64_padded current_scheduler_cycle;
struct atomic_i64_padded current_scheduler_cycle_period_ns;
/* Fibers */
i16 num_fibers;
i16 first_free_fiber_id;
struct arena *fiber_names_arena;
struct ticket_mutex fibers_lock;
struct fiber fibers[SYS_MAX_FIBERS];
/* Wait lists */
struct atomic_u64_padded waiter_wake_gen;
struct ticket_mutex wait_lists_arena_lock;
@ -300,27 +317,8 @@ GLOBAL struct {
struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS];
struct wait_bin wait_time_bins[NUM_WAIT_TIME_BINS];
/* Fibers */
i16 num_fibers;
i16 first_free_fiber_id;
struct arena *fiber_names_arena;
struct ticket_mutex fibers_lock;
struct fiber fibers[SYS_MAX_FIBERS];
/* Jobs */
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
/* Workers */
struct atomic_i32_padded workers_shutdown;
struct atomic_i64_padded num_jobs_in_queue;
struct snc_mutex workers_wake_mutex;
struct snc_cv workers_wake_cv;
i32 num_worker_threads;
struct arena *worker_threads_arena;
struct sys_thread **worker_threads;
struct worker_ctx *worker_contexts;
/* Job pools */
struct job_pool job_pools[NUM_SYS_POOLS];
} G = ZI, DEBUG_ALIAS(G, G_sys_win32);
@ -328,8 +326,6 @@ GLOBAL struct {
INTERNAL struct fiber *fiber_from_id(i16 id);
INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber);
INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority);
INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind);
@ -539,10 +535,13 @@ INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers)
/* Resume jobs */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
i32 num_workers_to_wake[NUM_SYS_POOLS] = ZI;
for (i32 i = 0; i < num_fibers; ++i) {
struct fiber *fiber = fibers[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(fiber->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
enum sys_pool pool_kind = fiber->job_pool;
++num_workers_to_wake[pool_kind];
struct job_pool *pool = &G.job_pools[pool_kind];
struct job_queue *queue = &pool->job_queues[fiber->job_priority];
tm_lock(&queue->lock);
{
struct job_info *info = 0;
@ -572,13 +571,19 @@ INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers)
/* Wake workers */
if (num_fibers > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(num_workers_to_wake); ++pool_kind) {
i32 wake_count = num_workers_to_wake[pool_kind];
if (wake_count > 0) {
struct job_pool *pool = &G.job_pools[pool_kind];
struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_fibers);
snc_cv_signal(&G.workers_wake_cv, num_fibers);
atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, wake_count);
snc_cv_signal(&pool->workers_wake_cv, wake_count);
}
snc_unlock(&lock);
}
}
}
}
INTERNAL void wake_address(void *addr, i32 count)
@ -761,6 +766,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
fiber->job_func = 0;
fiber->job_sig = 0;
fiber->job_id = 0;
fiber->job_pool = 0;
fiber->job_priority = 0;
fiber->job_counter = 0;
fiber->yield_param = 0;
@ -796,16 +802,18 @@ i16 sys_current_fiber_id(void)
return (i16)(i64)GetFiberData();
}
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct snc_counter *counter)
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter)
{
if (count > 0) {
if (counter) {
snc_counter_add(counter, count);
}
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */
enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority);
struct job_queue *queue = &G.job_queues[queue_kind];
/* A job cannot create a job with a pool / priority than itself */
pool_kind = clamp_i32(pool_kind, fiber->job_pool, NUM_SYS_POOLS - 1);
priority = clamp_i32(priority, fiber->job_priority, NUM_SYS_PRIORITIES - 1);
struct job_pool *pool = &G.job_pools[pool_kind];
struct job_queue *queue = &pool->job_queues[priority];
tm_lock(&queue->lock);
{
struct job_info *info = 0;
@ -828,14 +836,15 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
queue->last = info;
}
tm_unlock(&queue->lock);
}
/* Wake workers */
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count);
snc_cv_signal(&G.workers_wake_cv, count);
atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, count);
snc_cv_signal(&pool->workers_wake_cv, count);
}
snc_unlock(&lock);
}
}
/* ========================== *
@ -901,48 +910,33 @@ INTERNAL void job_fiber_entry(void *id_ptr)
* Job worker thread
* ========================== */
INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority)
{
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
return (enum job_queue_kind)priority;
}
INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind)
{
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
return (enum sys_priority)queue_kind;
}
INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
{
struct worker_ctx *ctx = worker_ctx_arg;
enum sys_pool pool_kind = ctx->pool_kind;
struct job_pool *pool = &G.job_pools[pool_kind];
(UNUSED)ctx;
{
/* TODO: Heuristic pinning */
/* TODO: Pin non-worker threads to other cores */
HANDLE thread_handle = GetCurrentThread();
b32 success = 0;
if (pool->thread_priority) {
b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0;
ASSERT(success);
(UNUSED)success;
}
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
success = SetThreadPriority(thread_handle, priority);
if (pool->thread_affinity_mask) {
b32 success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0;
ASSERT(success);
#if 0
u64 affinity_mask = (u64)1 << (ctx->id * 2);
success = SetThreadAffinityMask(thread_handle, affinity_mask) != 0;
ASSERT(success);
#endif
(UNUSED)success;
}
}
i32 worker_fiber_id = sys_current_fiber_id();
struct job_queue *queues[countof(G.job_queues)] = ZI;
for (u32 i = 0; i < countof(G.job_queues); ++i) {
queues[i] = &G.job_queues[i];
}
struct fiber *job_fiber = 0;
b32 shutdown = 0;
while (!shutdown) {
@ -955,14 +949,12 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
struct snc_counter *job_counter = 0;
{
//__profnc("Pull job", RGB32_F(0.75, 0.75, 0));
for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) {
struct job_queue *queue = queues[queue_index];
for (enum sys_priority priority = 0; priority < (i32)countof(pool->job_queues) && !job_func; ++priority) {
struct job_queue *queue = &pool->job_queues[priority];
if (queue) {
tm_lock(&queue->lock);
{
struct job_info *info = queue->first;
job_priority = (enum sys_priority)queue->kind;
job_priority = job_priority_from_queue_kind(queue->kind);
while (info && !job_func) {
struct job_info *next = info->next;
b32 dequeue = 0;
@ -970,7 +962,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_id = info->num_dispatched++;
if (job_id < info->count) {
/* Pick job */
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1);
atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, -1);
job_priority = priority;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
@ -981,8 +974,9 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
}
} else {
/* This job is to be resumed from a yield */
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1);
atomic_i64_fetch_add(&pool->num_jobs_in_queue.v, -1);
job_fiber_id = info->fiber_id;
job_priority = priority;
job_id = info->num_dispatched;
job_func = info->func;
job_sig = info->sig;
@ -1026,6 +1020,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_fiber->job_func = job_func;
job_fiber->job_sig = job_sig;
job_fiber->job_id = job_id;
job_fiber->job_pool = pool_kind;
job_fiber->job_priority = job_priority;
job_fiber->job_counter = job_counter;
job_fiber->parent_id = worker_fiber_id;
@ -1184,13 +1179,13 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
}
/* Wait */
struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex);
struct snc_lock wake_lock = snc_lock_s(&pool->workers_wake_mutex);
{
shutdown = atomic_i32_fetch(&G.workers_shutdown.v);
while (atomic_i64_fetch(&G.num_jobs_in_queue.v) <= 0 && !shutdown) {
shutdown = atomic_i32_fetch(&pool->workers_shutdown.v);
while (atomic_i64_fetch(&pool->num_jobs_in_queue.v) <= 0 && !shutdown) {
//__profnc("Wait for job", RGB32_F(0.75, 0.75, 0));
snc_cv_wait(&G.workers_wake_cv, &wake_lock);
shutdown = atomic_i32_fetch(&G.workers_shutdown.v);
snc_cv_wait(&pool->workers_wake_cv, &wake_lock);
shutdown = atomic_i32_fetch(&pool->workers_shutdown.v);
}
}
snc_unlock(&wake_lock);
@ -1228,7 +1223,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
}
i64 last_cycle_ns = 0;
while (!atomic_i32_fetch(&G.workers_shutdown.v)) {
while (!atomic_i32_fetch(&G.shutdown)) {
__profn("Job scheduler cycle");
{
__profn("Job scheduler wait");
@ -1283,23 +1278,61 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, 0, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
/* Start workers */
//G.num_worker_threads = 1;
G.num_worker_threads = 4;
G.worker_threads_arena = arena_alloc(GIBI(64));
G.worker_threads = arena_push_array(G.worker_threads_arena, struct sys_thread *, G.num_worker_threads);
G.worker_contexts = arena_push_array(G.worker_threads_arena, struct worker_ctx, G.num_worker_threads);
for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct worker_ctx *ctx = &G.worker_contexts[i];
/* TODO: Heuristic worker count & priorities */
for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) {
struct job_pool *pool = &G.job_pools[pool_kind];
struct string name_fmt = ZI;
i32 prof_group = 0;
switch (pool_kind) {
default: ASSERT(0); break;
case SYS_POOL_DEDICATED:
{
name_fmt = LIT("Dedicated worker #%F");
prof_group = PROF_THREAD_GROUP_WORKERS_DEDICATED;
pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL;
pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL;
pool->num_worker_threads = 4;
} break;
case SYS_POOL_BACKGROUND:
{
name_fmt = LIT("Background worker #%F");
prof_group = PROF_THREAD_GROUP_WORKERS_BACKGROUND;
pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL;
pool->thread_priority = 0;
pool->num_worker_threads = 4;
} break;
case SYS_POOL_BLOCKING:
{
name_fmt = LIT("Blocking worker #%F");
prof_group = PROF_THREAD_GROUP_WORKERS_BLOCKING;
pool->thread_affinity_mask = 0xFFFFFFFFFFFFFFFFULL;
pool->thread_priority = 0;
pool->num_worker_threads = 4;
} break;
}
pool->worker_threads_arena = arena_alloc(GIBI(64));
pool->worker_threads = arena_push_array(pool->worker_threads_arena, struct sys_thread *, pool->num_worker_threads);
pool->worker_contexts = arena_push_array(pool->worker_threads_arena, struct worker_ctx, pool->num_worker_threads);
for (i32 i = 0; i < pool->num_worker_threads; ++i) {
struct worker_ctx *ctx = &pool->worker_contexts[i];
ctx->pool_kind = pool_kind;
ctx->id = i;
struct string name = string_format(scratch.arena, LIT("Worker #%F"), FMT_SINT(i));
G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
struct string name = string_format(scratch.arena, name_fmt, FMT_SINT(i));
pool->worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, prof_group + i);
}
}
/* Wait on workers */
for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct sys_thread *worker_thread = G.worker_threads[i];
for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) {
struct job_pool *pool = &G.job_pools[pool_kind];
for (i32 i = 0; i < pool->num_worker_threads; ++i) {
struct sys_thread *worker_thread = pool->worker_threads[i];
sys_thread_wait_release(worker_thread);
}
}
/* Wait on scheduler */
sys_thread_wait_release(scheduler_thread);
@ -2254,7 +2287,7 @@ INTERNAL SYS_THREAD_DEF(window_present_thread_entry_point, arg)
struct sys_window_present_job_sig sig = ZI;
sig.window = (struct sys_window *)window;
sig.events = events;
sys_run(1, window->present_job, &sig, SYS_PRIORITY_HIGH, &counter);
sys_run(1, window->present_job, &sig, SYS_POOL_DEDICATED, SYS_PRIORITY_HIGH, &counter);
snc_counter_wait(&counter);
}
arena_reset(events_arena);
@ -3189,24 +3222,27 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
G.timer_start_qpc = qpc.QuadPart;
}
/* Init wait lists */
G.wait_lists_arena = arena_alloc(GIBI(64));
/* Init fibers */
G.num_fibers = 1; /* Fiber at index 0 always nil */
G.fiber_names_arena = arena_alloc(GIBI(64));
/* Init job queues */
for (u32 i = 0; i < countof(G.job_queues); ++i) {
struct job_queue *queue = &G.job_queues[i];
queue->kind = (enum job_queue_kind)i;
queue->arena = arena_alloc(GIBI(64));
}
/* Init wait lists */
G.wait_lists_arena = arena_alloc(GIBI(64));
/* Convert main thread to fiber */
fiber_alloc(FIBER_KIND_CONVERTED_THREAD);
/* Init job pools */
for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) {
struct job_pool *pool = &G.job_pools[pool_kind];
/* Init queues */
for (enum sys_priority priority = 0; priority < (i32)countof(pool->job_queues); ++priority) {
struct job_queue *queue = &pool->job_queues[priority];
queue->arena = arena_alloc(GIBI(64));
}
}
u64 cmdline_len = wstr_len(cmdline_wstr, countof(G.cmdline_args_wstr) - 1);
MEMCPY(G.cmdline_args_wstr, cmdline_wstr, cmdline_len * sizeof(*cmdline_wstr));
G.cmdline_args_wstr[cmdline_len] = 0;
@ -3322,13 +3358,21 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
}
}
/* Signal shutdown */
atomic_i32_fetch_set(&G.shutdown, 1);
/* Shutdown test thread */
{
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
atomic_i32_fetch_set(&G.workers_shutdown.v, 1);
snc_cv_signal(&G.workers_wake_cv, I32_MAX);
for (enum sys_pool pool_kind = 0; pool_kind < (i32)countof(G.job_pools); ++pool_kind) {
struct job_pool *pool = &G.job_pools[pool_kind];
struct snc_lock lock = snc_lock_e(&pool->workers_wake_mutex);
{
atomic_i32_fetch_set(&pool->workers_shutdown.v, 1);
snc_cv_signal(&pool->workers_wake_cv, I32_MAX);
}
snc_unlock(&lock);
}
}
sys_thread_wait_release(test_thread);
/* Find any dangling threads that haven't exited gracefully by now */

View File

@ -242,8 +242,8 @@ struct user_startup_receipt user_startup(struct font_startup_receipt *font_sr,
//log_register_callback(debug_console_log_callback, LOG_LEVEL_SUCCESS);
log_register_callback(debug_console_log_callback, LOG_LEVEL_DEBUG);
/* Start jobs */
sys_run(1, local_sim_job, 0, SYS_PRIORITY_HIGH, &G.shutdown_job_counters);
/* Start sim job */
sys_run(1, local_sim_job, 0, SYS_POOL_DEDICATED, SYS_PRIORITY_HIGH, &G.shutdown_job_counters);
sys_on_exit(&user_shutdown);
G.window = sys_window_alloc(user_update_job);