From 15d8fb19d2f6f1965ffec89af982790c8cf0bd3f Mon Sep 17 00:00:00 2001 From: jacob Date: Tue, 1 Jul 2025 01:41:53 -0500 Subject: [PATCH] use global lock for jobs --- src/app.c | 2 +- src/job.c | 362 +++++++++++++++++++++-------------------------- src/prof_tracy.h | 4 +- 3 files changed, 164 insertions(+), 204 deletions(-) diff --git a/src/app.c b/src/app.c index 325b49c1..debc03a3 100644 --- a/src/app.c +++ b/src/app.c @@ -236,7 +236,7 @@ void app_entry_point(struct string args_str) i32 worker_count; { /* FIXME: Switch this on to utilize all cores. Only decreasing worker count for testing purposes. */ -#if !PROFILING && !RTC +#if !PROFILING && !RTC || 1 i32 max_worker_count = JOB_MAX_WORKERS; i32 min_worker_count = clamp_i32(NUM_APP_DEDICATED_WORKERS + 2, JOB_MIN_WORKERS, max_worker_count); i32 target_worker_count = (i32)sys_num_logical_processors() * 0.75; diff --git a/src/job.c b/src/job.c index 0b112316..6e93fca3 100644 --- a/src/job.c +++ b/src/job.c @@ -13,7 +13,6 @@ struct worker_job { struct worker_job_queue *queue; - struct sys_mutex *mutex; i32 num_workers; i32 num_dispatched; @@ -23,6 +22,7 @@ struct worker_job { void *sig; struct atomic_u64 gen; + struct sys_mutex *gen_cv_mutex; struct sys_condition_variable *gen_cv; @@ -46,11 +46,10 @@ struct worker_info { }; GLOBAL struct { - struct sys_mutex *free_jobs_mutex; - struct arena *free_jobs_arena; + struct arena *arena; struct worker_job *first_free_job; - struct sys_mutex *queued_jobs_mutex; + struct sys_mutex *mutex; struct worker_job_queue global_queue; struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; u64 queue_submit_gen; @@ -59,6 +58,7 @@ GLOBAL struct { i32 num_worker_threads; b32 workers_shutdown; struct sys_condition_variable *workers_wake_cv; + struct sys_mutex *workers_wait_mutex; struct sys_thread *worker_threads[JOB_MAX_WORKERS]; } G = ZI, DEBUG_ALIAS(G, G_job); @@ -74,10 +74,8 @@ void job_startup(i32 num_workers, struct string *worker_names) __prof; struct arena_temp scratch = scratch_begin_no_conflict(); - G.free_jobs_mutex = sys_mutex_alloc(); - G.free_jobs_arena = arena_alloc(GIGABYTE(64)); - - G.queued_jobs_mutex = sys_mutex_alloc(); + G.arena = arena_alloc(GIGABYTE(64)); + G.mutex = sys_mutex_alloc(); G.workers_wake_cv = sys_condition_variable_alloc(); @@ -101,9 +99,9 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown) { __prof; { - struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); + struct sys_lock lock = sys_mutex_lock_e(G.mutex); G.workers_shutdown = true; - sys_condition_variable_signal(G.workers_wake_cv, U32_MAX); + sys_condition_variable_broadcast(G.workers_wake_cv); sys_mutex_unlock(&lock); } for (i32 i = 0; i < G.num_worker_threads; ++i) { @@ -181,41 +179,43 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) u64 gen = 0; struct worker_job *job = NULL; { - struct sys_mutex *old_mutex = NULL; + struct sys_mutex *old_cv_mutex = NULL; struct sys_condition_variable *old_cv = NULL; { - struct sys_lock lock = sys_mutex_lock_e(G.free_jobs_mutex); + struct sys_lock lock = sys_mutex_lock_e(G.mutex); if (G.first_free_job) { job = G.first_free_job; G.first_free_job = job->next_free; - old_mutex = job->mutex; + old_cv_mutex = job->gen_cv_mutex; old_cv = job->gen_cv; gen = atomic_u64_eval(&job->gen) + 1; } else { - job = arena_push_no_zero(G.free_jobs_arena, struct worker_job); + job = arena_push_no_zero(G.arena, struct worker_job); gen = 1; } sys_mutex_unlock(&lock); } - struct sys_lock lock = old_mutex ? sys_mutex_lock_e(old_mutex) : (struct sys_lock)ZI; - { - atomic_u64_eval_exchange(&job->gen, 0); - MEMZERO_STRUCT(job); - if (old_mutex) { - job->mutex = old_mutex; - job->gen_cv = old_cv; - } else { - job->mutex = sys_mutex_alloc(); - job->gen_cv = sys_condition_variable_alloc(); - } - job->pinned_worker_id = pinned_worker_id; - job->count = job_count; - job->func = job_func; - job->sig = sig; - atomic_u64_eval_exchange(&job->gen, gen); + atomic_u64_eval_exchange(&job->gen, 0); + MEMZERO_STRUCT(job); + if (old_cv_mutex) { + job->gen_cv_mutex = old_cv_mutex; + job->gen_cv = old_cv; + } else { + job->gen_cv_mutex = sys_mutex_alloc(); + job->gen_cv = sys_condition_variable_alloc(); } - if (lock.mutex) { - sys_mutex_unlock(&lock); + job->pinned_worker_id = pinned_worker_id; + job->count = job_count; + job->func = job_func; + job->sig = sig; + atomic_u64_eval_exchange(&job->gen, gen); + { + /* Signal mutex change */ + if (old_cv_mutex) { + struct sys_lock lock = sys_mutex_lock_e(job->gen_cv_mutex); + sys_condition_variable_broadcast(job->gen_cv); + sys_mutex_unlock(&lock); + } } } handle.job = job; @@ -237,7 +237,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) /* Queue job */ if (job_queue) { __profscope(Queue job); - struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); + struct sys_lock lock = sys_mutex_lock_e(G.mutex); { /* Push to queue */ { @@ -277,35 +277,8 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) ctx->pin_depth += is_pinned_to_caller; struct job_data data = ZI; data.sig = sig; - b32 should_release = false; b32 stop = false; while (!stop) { - /* Remove job from queue */ - if (job_queue && job_id == (job_count - 1)) { - __profscope(Dequeue job); - struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); - { - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - } - sys_mutex_unlock(&queue_lock); - } /* Run */ { __profscope(Run job); @@ -314,47 +287,62 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc) } /* Grab new ID or exit job */ { - struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); - if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { - job_id = job->num_dispatched++; - } else { - stop = true; - i32 num_workers = --job->num_workers; - if (num_workers == 0) { - is_done = true; - atomic_u64_eval_add_u64(&job->gen, 1); - should_release = true; - sys_condition_variable_signal(job->gen_cv, U32_MAX); + struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); + { + if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { + job_id = job->num_dispatched++; + } else { + stop = true; + i32 num_workers = --job->num_workers; + if (num_workers == 0) { + __profscope(Release job); + is_done = true; + /* Dequeue */ + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + /* Add to free list */ + { + job->next_free = G.first_free_job; + G.first_free_job = job; + } + /* Signal waiters */ + atomic_u64_eval_add_u64(&job->gen, 1); + { + struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); + sys_condition_variable_broadcast(job->gen_cv); + sys_mutex_unlock(&cv_lock); + } + } } } - sys_mutex_unlock(&job_lock); + sys_mutex_unlock(&queue_lock); } } - if (should_release) { - __profscope(Release job); - struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex); - { - if (job->queue && (job->queue->first == job || job->queue->last == job)) { - DEBUGBREAK; - } - job->next_free = G.first_free_job; - G.first_free_job = job; - } - sys_mutex_unlock(&fj_lock); - } ctx->pin_depth -= is_pinned_to_caller; } /* Wait for job completion */ if (wait && !is_done) { __profscope(Wait for job); - struct sys_lock lock = sys_mutex_lock_e(job->mutex); + struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); is_done = atomic_u64_eval(&job->gen) != handle.gen; while (!is_done) { - sys_condition_variable_wait(job->gen_cv, &lock); + sys_condition_variable_wait(job->gen_cv, &cv_lock); is_done = atomic_u64_eval(&job->gen) != handle.gen; } - sys_mutex_unlock(&lock); + sys_mutex_unlock(&cv_lock); } } return handle; @@ -408,16 +396,16 @@ void job_wait(struct job_handle handle) if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) { i32 job_id = 0; i32 job_count = 0; - struct worker_job_queue *job_queue = job->queue; + struct worker_job_queue *job_queue = NULL; { - struct sys_lock lock = sys_mutex_lock_e(job->mutex); + struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); job_id = job->num_dispatched++; job_count = job->count; job_queue = job->queue; if (job_id < job_count) { ++job->num_workers; } - sys_mutex_unlock(&lock); + sys_mutex_unlock(&queue_lock); } /* Execute job */ @@ -426,34 +414,7 @@ void job_wait(struct job_handle handle) struct job_data data = ZI; data.sig = job->sig; job_func *func = job->func; - b32 should_release = false; while (job_id < job_count) { - /* Remove job from queue */ - if (job_id == (job_count - 1)) { - __profscope(Dequeue job); - struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); - { - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - } - sys_mutex_unlock(&queue_lock); - } /* Run */ { data.id = job_id; @@ -461,45 +422,57 @@ void job_wait(struct job_handle handle) } /* Grab new ID or exit job */ { - struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); - job_id = job->num_dispatched++; - if (job_id >= job_count) { - i32 num_workers = --job->num_workers; - if (num_workers == 0) { + struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex); + { + job_id = job->num_dispatched++; + if (job_id >= job_count) { + __profscope(Release job); is_done = true; + /* Dequeue */ + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + /* Add to free list */ + { + job->next_free = G.first_free_job; + G.first_free_job = job; + } + /* Signal waiters */ atomic_u64_eval_add_u64(&job->gen, 1); - should_release = true; - sys_condition_variable_signal(job->gen_cv, U32_MAX); + { + struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); + sys_condition_variable_broadcast(job->gen_cv); + sys_mutex_unlock(&cv_lock); + } } } - sys_mutex_unlock(&job_lock); + sys_mutex_unlock(&queue_lock); } } - if (should_release) { - __profscope(Release job); - struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex); - { - if (job->queue && (job->queue->first == job || job->queue->last == job)) { - DEBUGBREAK; - } - job->next_free = G.first_free_job; - G.first_free_job = job; - } - sys_mutex_unlock(&fj_lock); - } } } /* Wait for job completion */ if (!is_done) { __profscope(Wait for job); - struct sys_lock lock = sys_mutex_lock_e(job->mutex); + struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex); is_done = atomic_u64_eval(&job->gen) != handle.gen; while (!is_done) { - sys_condition_variable_wait(job->gen_cv, &lock); + sys_condition_variable_wait(job->gen_cv, &cv_lock); is_done = atomic_u64_eval(&job->gen) != handle.gen; } - sys_mutex_unlock(&lock); + sys_mutex_unlock(&cv_lock); } } } @@ -519,7 +492,7 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; u64 seen_queue_submit_gen = 0; - struct sys_lock queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + struct sys_lock queue_lock = sys_mutex_lock_s(G.mutex); while (!G.workers_shutdown) { sys_mutex_unlock(&queue_lock); @@ -531,29 +504,25 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) struct worker_job *job = NULL; { __profscope(Pick job); - queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + queue_lock = sys_mutex_lock_e(G.mutex); { seen_queue_submit_gen = G.queue_submit_gen; for (i32 queue_index = 0; queue_index < (i32)ARRAY_COUNT(queues); ++queue_index) { struct worker_job_queue *queue = queues[queue_index]; struct worker_job *tmp = queue->first; while (!job && tmp) { - struct sys_lock job_lock = sys_mutex_lock_e(tmp->mutex); - { - i32 tmp_id = tmp->num_dispatched; - i32 tmp_count = tmp->count; - if (tmp_id < tmp_count) { - /* Pick job */ - ++tmp->num_workers; - ++tmp->num_dispatched; - job = tmp; - job_id = tmp_id; - job_count = tmp_count; - job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id; - job_queue = tmp->queue; - } + i32 tmp_id = tmp->num_dispatched; + i32 tmp_count = tmp->count; + if (tmp_id < tmp_count) { + /* Pick job */ + ++tmp->num_workers; + ++tmp->num_dispatched; + job = tmp; + job_id = tmp_id; + job_count = tmp_count; + job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id; + job_queue = tmp->queue; } - sys_mutex_unlock(&job_lock); tmp = tmp->next; } } @@ -561,33 +530,6 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) sys_mutex_unlock(&queue_lock); } - /* Remove job from queue */ - if (job_id == (job_count - 1)) { - __profscope(Dequeue job); - queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex); - { - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - struct worker_job *prev = job->prev; - struct worker_job *next = job->next; - if (prev) { - prev->next = next; - } else { - job_queue->first = next; - } - if (next) { - next->prev = prev; - } else { - job_queue->last = prev; - } - if (job->prev == job || job->next == job) { - DEBUGBREAKABLE; - } - } - sys_mutex_unlock(&queue_lock); - } - /* Execute job */ if (job) { __profscope(Execute job); @@ -596,45 +538,63 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg) struct job_data data = ZI; data.sig = job->sig; job_func *func = job->func; - b32 should_release = false; - while (job_id < job_count) { + b32 stop = job_id >= job_count; + while (!stop) { /* Run */ { data.id = job_id; func(data); } - /* Grab new ID */ + /* Grab new ID or stop */ { - struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); + queue_lock = sys_mutex_lock_e(G.mutex); job_id = job->num_dispatched++; if (job_id >= job_count) { + stop = true; i32 num_workers = --job->num_workers; if (num_workers == 0) { + __profscope(Release job); + /* Dequeue */ + { + struct worker_job *prev = job->prev; + struct worker_job *next = job->next; + if (prev) { + prev->next = next; + } else { + job_queue->first = next; + } + if (next) { + next->prev = prev; + } else { + job_queue->last = prev; + } + } + /* Add to free list */ + { + job->next_free = G.first_free_job; + G.first_free_job = job; + } + /* Signal waiters */ atomic_u64_eval_add_u64(&job->gen, 1); - should_release = true; - sys_condition_variable_signal(job->gen_cv, U32_MAX); + { + struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex); + sys_condition_variable_broadcast(job->gen_cv); + sys_mutex_unlock(&cv_lock); + } } } - sys_mutex_unlock(&job_lock); + sys_mutex_unlock(&queue_lock); } } - if (should_release) { - __profscope(Release job); - struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex); - { - job->next_free = G.first_free_job; - G.first_free_job = job; - } - sys_mutex_unlock(&fj_lock); - } atomic_i32_eval_add(&G.num_idle_worker_threads, 1); ctx->pin_depth -= job_is_pinned_to_worker; } - queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); + queue_lock = sys_mutex_lock_s(G.mutex); if (!G.workers_shutdown && G.queue_submit_gen == seen_queue_submit_gen) { //__profscope(Worker sleep); sys_condition_variable_wait(G.workers_wake_cv, &queue_lock); } } + sys_mutex_unlock(&queue_lock); } diff --git a/src/prof_tracy.h b/src/prof_tracy.h index ac8e89c9..366e8f5e 100644 --- a/src/prof_tracy.h +++ b/src/prof_tracy.h @@ -14,7 +14,7 @@ # define TRACY_MANUAL_LIFETIME # define TRACY_DELAYED_INIT #endif -#if 0 +#if 1 /* Disable system tracing (very slow) */ # define TRACY_NO_CALLSTACK # define TRACY_NO_SYSTEM_TRACING @@ -22,7 +22,7 @@ #include STRINGIZE(TRACY_INCLUDE_PATH) #define PROFILING_CAPTURE_FRAME_IMAGE 0 -#define PROFILING_LOCKS 1 +#define PROFILING_LOCKS 0 #define PROFILING_D3D 1 #define PROFILING_CMD_WSTR L"tracy-profiler.exe -a 127.0.0.1"