From 1a19a9e693c0022d9891b0258a411e00b8ed23f2 Mon Sep 17 00:00:00 2001 From: jacob Date: Tue, 1 Jul 2025 02:37:46 -0500 Subject: [PATCH] atomic lock for jobs --- src/app.c | 6 +-- src/job.c | 101 +++++++++++++++++++++++++++--------------------- src/sys_win32.c | 7 ---- 3 files changed, 61 insertions(+), 53 deletions(-) diff --git a/src/app.c b/src/app.c index debc03a3..fedc242b 100644 --- a/src/app.c +++ b/src/app.c @@ -235,14 +235,14 @@ void app_entry_point(struct string args_str) i32 worker_count; { - /* FIXME: Switch this on to utilize all cores. Only decreasing worker count for testing purposes. */ -#if !PROFILING && !RTC || 1 + /* FIXME: Switch this on to utilize more cores. Only decreasing worker count for testing purposes. */ +#if !PROFILING && !RTC i32 max_worker_count = JOB_MAX_WORKERS; i32 min_worker_count = clamp_i32(NUM_APP_DEDICATED_WORKERS + 2, JOB_MIN_WORKERS, max_worker_count); i32 target_worker_count = (i32)sys_num_logical_processors() * 0.75; worker_count = clamp_i32(target_worker_count, min_worker_count, max_worker_count); #else - worker_count = 5; + worker_count = 8; #endif } diff --git a/src/job.c b/src/job.c index 6e93fca3..05dd648c 100644 --- a/src/job.c +++ b/src/job.c @@ -5,6 +5,7 @@ #include "string.h" #include "scratch.h" #include "app.h" +#include "intrinsics.h" #if 0 /* FIXME: Remove this (replace with sys_ wrappers) */ @@ -47,18 +48,21 @@ struct worker_info { GLOBAL struct { struct arena *arena; + + struct atomic_i32 atomic_lock; + struct worker_job *first_free_job; - struct sys_mutex *mutex; struct worker_job_queue global_queue; struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; - u64 queue_submit_gen; + + u64 workers_wake_gen; + struct sys_condition_variable *workers_wake_cv; + struct sys_mutex *workers_wake_mutex; struct atomic_i32 num_idle_worker_threads; i32 num_worker_threads; b32 workers_shutdown; - struct sys_condition_variable *workers_wake_cv; - struct sys_mutex *workers_wait_mutex; struct sys_thread *worker_threads[JOB_MAX_WORKERS]; } G = ZI, DEBUG_ALIAS(G, G_job); @@ -75,9 +79,9 @@ void job_startup(i32 num_workers, struct string *worker_names) struct arena_temp scratch = scratch_begin_no_conflict(); G.arena = arena_alloc(GIGABYTE(64)); - G.mutex = sys_mutex_alloc(); G.workers_wake_cv = sys_condition_variable_alloc(); + G.workers_wake_mutex = sys_mutex_alloc(); if (num_workers < JOB_MIN_WORKERS || num_workers > JOB_MAX_WORKERS) { /* Invalid worker count */ @@ -99,8 +103,9 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) { __prof; { - struct sys_lock lock = sys_mutex_lock_e(G.mutex); + struct sys_lock lock = sys_mutex_lock_e(G.workers_wake_mutex); G.workers_shutdown = true; + ++G.workers_wake_gen; sys_condition_variable_broadcast(G.workers_wake_cv); sys_mutex_unlock(&lock); } @@ -110,6 +115,22 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) } } +/* ========================== * + * Atomic lock + * ========================== */ + +INTERNAL void atomic_lock(void) +{ + while (atomic_i32_eval_compare_exchange(&G.atomic_lock, 0, 1) != 0) { + ix_pause(); + } +} + +INTERNAL void atomic_unlock(void) +{ + atomic_i32_eval_exchange(&G.atomic_lock, 0); +} + /* ========================== * * Worker TLS * ========================== */ @@ -182,7 +203,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) struct sys_mutex *old_cv_mutex = NULL; struct sys_condition_variable *old_cv = NULL; { - struct sys_lock lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); if (G.first_free_job) { job = G.first_free_job; G.first_free_job = job->next_free; @@ -193,7 +214,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) job = arena_push_no_zero(G.arena, struct worker_job); gen = 1; } - sys_mutex_unlock(&lock); + atomic_unlock(); } atomic_u64_eval_exchange(&job->gen, 0); MEMZERO_STRUCT(job); @@ -237,15 +258,10 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) /* Queue job */ if (job_queue) { __profscope(Queue job); - struct sys_lock lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); { /* Push to queue */ { - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - - if (job_queue->last) { job_queue->last->next = job; } else { @@ -253,21 +269,21 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) } job->prev = job_queue->last; job_queue->last = job; - - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } } - ++G.queue_submit_gen; /* Signal workers */ - i32 num_signals = job_count - !!do_work; - if (is_pinned) { - num_signals = G.num_worker_threads; + struct sys_lock wake_lock = sys_mutex_lock_e(G.workers_wake_mutex); + { + ++G.workers_wake_gen; + i32 num_signals = job_count - !!do_work; + if (is_pinned) { + num_signals = G.num_worker_threads; + } + sys_condition_variable_signal(G.workers_wake_cv, num_signals); } - sys_condition_variable_signal(G.workers_wake_cv, num_signals); + sys_mutex_unlock(&wake_lock); } - sys_mutex_unlock(&lock); + atomic_unlock(); } /* Execute job */ @@ -287,7 +303,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) } /* Grab new ID or exit job */ { - struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); { if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { job_id = job->num_dispatched++; @@ -327,7 +343,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) } } } - sys_mutex_unlock(&queue_lock); + atomic_unlock(); } } ctx->pin_depth -= is_pinned_to_caller; @@ -398,14 +414,14 @@ void job_wait(struct job_handle handle) i32 job_count = 0; struct worker_job_queue *job_queue = NULL; { - struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); job_id = job->num_dispatched++; job_count = job->count; job_queue = job->queue; if (job_id < job_count) { ++job->num_workers; } - sys_mutex_unlock(&queue_lock); + atomic_unlock(); } /* Execute job */ @@ -422,7 +438,7 @@ void job_wait(struct job_handle handle) } /* Grab new ID or exit job */ { - struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); { job_id = job->num_dispatched++; if (job_id >= job_count) { @@ -457,7 +473,7 @@ void job_wait(struct job_handle handle) } } } - sys_mutex_unlock(&queue_lock); + atomic_unlock(); } } } @@ -490,11 +506,12 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) ctx->worker_id = worker_id; struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; - u64 seen_queue_submit_gen = 0; + u64 last_wake_gen = 0; - struct sys_lock queue_lock = sys_mutex_lock_s(G.mutex); + struct sys_lock wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); while (!G.workers_shutdown) { - sys_mutex_unlock(&queue_lock); + last_wake_gen = G.workers_wake_gen; + sys_mutex_unlock(&wake_lock); /* Try to pick job from queue */ i32 job_id = 0; @@ -504,9 +521,8 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) struct worker_job *job = NULL; { __profscope(Pick job); - queue_lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); { - seen_queue_submit_gen = G.queue_submit_gen; for (i32 queue_index = 0; queue_index < (i32)ARRAY_COUNT(queues); ++queue_index) { struct worker_job_queue *queue = queues[queue_index]; struct worker_job *tmp = queue->first; @@ -527,7 +543,7 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) } } } - sys_mutex_unlock(&queue_lock); + atomic_unlock(); } /* Execute job */ @@ -547,7 +563,7 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) } /* Grab new ID or stop */ { - queue_lock = sys_mutex_lock_e(G.mutex); + atomic_lock(); job_id = job->num_dispatched++; if (job_id >= job_count) { stop = true; @@ -583,18 +599,17 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) } } } - sys_mutex_unlock(&queue_lock); + atomic_unlock(); } } atomic_i32_eval_add(&G.num_idle_worker_threads, 1); ctx->pin_depth -= job_is_pinned_to_worker; } - queue_lock = sys_mutex_lock_s(G.mutex); - if (!G.workers_shutdown && G.queue_submit_gen == seen_queue_submit_gen) { - //__profscope(Worker sleep); - sys_condition_variable_wait(G.workers_wake_cv, &queue_lock); + wake_lock = sys_mutex_lock_s(G.workers_wake_mutex); + if (!G.workers_shutdown && G.workers_wake_gen == last_wake_gen) { + sys_condition_variable_wait(G.workers_wake_cv, &wake_lock); } } - sys_mutex_unlock(&queue_lock); + sys_mutex_unlock(&wake_lock); } diff --git a/src/sys_win32.c b/src/sys_win32.c index 4ff7ff42..9fa25e8a 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -1636,7 +1636,6 @@ void sys_mutex_release(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) { - __prof; struct win32_mutex *m = (struct win32_mutex *)mutex; __proflock_before_exclusive_lock(m->profiling_ctx); AcquireSRWLockExclusive((SRWLOCK *)&m->srwlock); @@ -1653,7 +1652,6 @@ struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) { - __prof; struct win32_mutex *m = (struct win32_mutex *)mutex; __proflock_before_shared_lock(m->profiling_ctx); AcquireSRWLockShared((SRWLOCK *)&m->srwlock); @@ -1668,7 +1666,6 @@ struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) void sys_mutex_unlock(struct sys_lock *lock) { - __prof; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; #if RTC atomic_i64_eval_add(&m->count, -1); @@ -1748,7 +1745,6 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv) void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; @@ -1786,7 +1782,6 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; @@ -1825,7 +1820,6 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; /* Windows will wake all waiters if many single-wakes occur anyway, so we * might as well wake all ourselves. @@ -1841,7 +1835,6 @@ void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 co void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv) { - __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; WakeAllConditionVariable(&cv->condition_variable); }