use global lock for jobs

This commit is contained in:
jacob 2025-07-01 01:41:53 -05:00
parent 892daa5ed2
commit 15d8fb19d2
3 changed files with 164 additions and 204 deletions

View File

@ -236,7 +236,7 @@ void app_entry_point(struct string args_str)
i32 worker_count; i32 worker_count;
{ {
/* FIXME: Switch this on to utilize all cores. Only decreasing worker count for testing purposes. */ /* FIXME: Switch this on to utilize all cores. Only decreasing worker count for testing purposes. */
#if !PROFILING && !RTC #if !PROFILING && !RTC || 1
i32 max_worker_count = JOB_MAX_WORKERS; i32 max_worker_count = JOB_MAX_WORKERS;
i32 min_worker_count = clamp_i32(NUM_APP_DEDICATED_WORKERS + 2, JOB_MIN_WORKERS, max_worker_count); i32 min_worker_count = clamp_i32(NUM_APP_DEDICATED_WORKERS + 2, JOB_MIN_WORKERS, max_worker_count);
i32 target_worker_count = (i32)sys_num_logical_processors() * 0.75; i32 target_worker_count = (i32)sys_num_logical_processors() * 0.75;

362
src/job.c
View File

@ -13,7 +13,6 @@
struct worker_job { struct worker_job {
struct worker_job_queue *queue; struct worker_job_queue *queue;
struct sys_mutex *mutex;
i32 num_workers; i32 num_workers;
i32 num_dispatched; i32 num_dispatched;
@ -23,6 +22,7 @@ struct worker_job {
void *sig; void *sig;
struct atomic_u64 gen; struct atomic_u64 gen;
struct sys_mutex *gen_cv_mutex;
struct sys_condition_variable *gen_cv; struct sys_condition_variable *gen_cv;
@ -46,11 +46,10 @@ struct worker_info {
}; };
GLOBAL struct { GLOBAL struct {
struct sys_mutex *free_jobs_mutex; struct arena *arena;
struct arena *free_jobs_arena;
struct worker_job *first_free_job; struct worker_job *first_free_job;
struct sys_mutex *queued_jobs_mutex; struct sys_mutex *mutex;
struct worker_job_queue global_queue; struct worker_job_queue global_queue;
struct worker_job_queue pinned_queues[JOB_MAX_WORKERS]; struct worker_job_queue pinned_queues[JOB_MAX_WORKERS];
u64 queue_submit_gen; u64 queue_submit_gen;
@ -59,6 +58,7 @@ GLOBAL struct {
i32 num_worker_threads; i32 num_worker_threads;
b32 workers_shutdown; b32 workers_shutdown;
struct sys_condition_variable *workers_wake_cv; struct sys_condition_variable *workers_wake_cv;
struct sys_mutex *workers_wait_mutex;
struct sys_thread *worker_threads[JOB_MAX_WORKERS]; struct sys_thread *worker_threads[JOB_MAX_WORKERS];
} G = ZI, DEBUG_ALIAS(G, G_job); } G = ZI, DEBUG_ALIAS(G, G_job);
@ -74,10 +74,8 @@ void job_startup(i32 num_workers, struct string *worker_names)
__prof; __prof;
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();
G.free_jobs_mutex = sys_mutex_alloc(); G.arena = arena_alloc(GIGABYTE(64));
G.free_jobs_arena = arena_alloc(GIGABYTE(64)); G.mutex = sys_mutex_alloc();
G.queued_jobs_mutex = sys_mutex_alloc();
G.workers_wake_cv = sys_condition_variable_alloc(); G.workers_wake_cv = sys_condition_variable_alloc();
@ -101,9 +99,9 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(job_shutdown)
{ {
__prof; __prof;
{ {
struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); struct sys_lock lock = sys_mutex_lock_e(G.mutex);
G.workers_shutdown = true; G.workers_shutdown = true;
sys_condition_variable_signal(G.workers_wake_cv, U32_MAX); sys_condition_variable_broadcast(G.workers_wake_cv);
sys_mutex_unlock(&lock); sys_mutex_unlock(&lock);
} }
for (i32 i = 0; i < G.num_worker_threads; ++i) { for (i32 i = 0; i < G.num_worker_threads; ++i) {
@ -181,41 +179,43 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc)
u64 gen = 0; u64 gen = 0;
struct worker_job *job = NULL; struct worker_job *job = NULL;
{ {
struct sys_mutex *old_mutex = NULL; struct sys_mutex *old_cv_mutex = NULL;
struct sys_condition_variable *old_cv = NULL; struct sys_condition_variable *old_cv = NULL;
{ {
struct sys_lock lock = sys_mutex_lock_e(G.free_jobs_mutex); struct sys_lock lock = sys_mutex_lock_e(G.mutex);
if (G.first_free_job) { if (G.first_free_job) {
job = G.first_free_job; job = G.first_free_job;
G.first_free_job = job->next_free; G.first_free_job = job->next_free;
old_mutex = job->mutex; old_cv_mutex = job->gen_cv_mutex;
old_cv = job->gen_cv; old_cv = job->gen_cv;
gen = atomic_u64_eval(&job->gen) + 1; gen = atomic_u64_eval(&job->gen) + 1;
} else { } else {
job = arena_push_no_zero(G.free_jobs_arena, struct worker_job); job = arena_push_no_zero(G.arena, struct worker_job);
gen = 1; gen = 1;
} }
sys_mutex_unlock(&lock); sys_mutex_unlock(&lock);
} }
struct sys_lock lock = old_mutex ? sys_mutex_lock_e(old_mutex) : (struct sys_lock)ZI; atomic_u64_eval_exchange(&job->gen, 0);
{ MEMZERO_STRUCT(job);
atomic_u64_eval_exchange(&job->gen, 0); if (old_cv_mutex) {
MEMZERO_STRUCT(job); job->gen_cv_mutex = old_cv_mutex;
if (old_mutex) { job->gen_cv = old_cv;
job->mutex = old_mutex; } else {
job->gen_cv = old_cv; job->gen_cv_mutex = sys_mutex_alloc();
} else { job->gen_cv = sys_condition_variable_alloc();
job->mutex = sys_mutex_alloc();
job->gen_cv = sys_condition_variable_alloc();
}
job->pinned_worker_id = pinned_worker_id;
job->count = job_count;
job->func = job_func;
job->sig = sig;
atomic_u64_eval_exchange(&job->gen, gen);
} }
if (lock.mutex) { job->pinned_worker_id = pinned_worker_id;
sys_mutex_unlock(&lock); job->count = job_count;
job->func = job_func;
job->sig = sig;
atomic_u64_eval_exchange(&job->gen, gen);
{
/* Signal mutex change */
if (old_cv_mutex) {
struct sys_lock lock = sys_mutex_lock_e(job->gen_cv_mutex);
sys_condition_variable_broadcast(job->gen_cv);
sys_mutex_unlock(&lock);
}
} }
} }
handle.job = job; handle.job = job;
@ -237,7 +237,7 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc)
/* Queue job */ /* Queue job */
if (job_queue) { if (job_queue) {
__profscope(Queue job); __profscope(Queue job);
struct sys_lock lock = sys_mutex_lock_e(G.queued_jobs_mutex); struct sys_lock lock = sys_mutex_lock_e(G.mutex);
{ {
/* Push to queue */ /* Push to queue */
{ {
@ -277,35 +277,8 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc)
ctx->pin_depth += is_pinned_to_caller; ctx->pin_depth += is_pinned_to_caller;
struct job_data data = ZI; struct job_data data = ZI;
data.sig = sig; data.sig = sig;
b32 should_release = false;
b32 stop = false; b32 stop = false;
while (!stop) { while (!stop) {
/* Remove job from queue */
if (job_queue && job_id == (job_count - 1)) {
__profscope(Dequeue job);
struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex);
{
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
}
sys_mutex_unlock(&queue_lock);
}
/* Run */ /* Run */
{ {
__profscope(Run job); __profscope(Run job);
@ -314,47 +287,62 @@ INTERNAL struct job_handle job_dispatch_ex(struct job_desc desc)
} }
/* Grab new ID or exit job */ /* Grab new ID or exit job */
{ {
struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex);
if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) { {
job_id = job->num_dispatched++; if (job->num_dispatched < job_count && (wait || job->num_workers <= 1)) {
} else { job_id = job->num_dispatched++;
stop = true; } else {
i32 num_workers = --job->num_workers; stop = true;
if (num_workers == 0) { i32 num_workers = --job->num_workers;
is_done = true; if (num_workers == 0) {
atomic_u64_eval_add_u64(&job->gen, 1); __profscope(Release job);
should_release = true; is_done = true;
sys_condition_variable_signal(job->gen_cv, U32_MAX); /* Dequeue */
{
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
}
/* Add to free list */
{
job->next_free = G.first_free_job;
G.first_free_job = job;
}
/* Signal waiters */
atomic_u64_eval_add_u64(&job->gen, 1);
{
struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex);
sys_condition_variable_broadcast(job->gen_cv);
sys_mutex_unlock(&cv_lock);
}
}
} }
} }
sys_mutex_unlock(&job_lock); sys_mutex_unlock(&queue_lock);
} }
} }
if (should_release) {
__profscope(Release job);
struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex);
{
if (job->queue && (job->queue->first == job || job->queue->last == job)) {
DEBUGBREAK;
}
job->next_free = G.first_free_job;
G.first_free_job = job;
}
sys_mutex_unlock(&fj_lock);
}
ctx->pin_depth -= is_pinned_to_caller; ctx->pin_depth -= is_pinned_to_caller;
} }
/* Wait for job completion */ /* Wait for job completion */
if (wait && !is_done) { if (wait && !is_done) {
__profscope(Wait for job); __profscope(Wait for job);
struct sys_lock lock = sys_mutex_lock_e(job->mutex); struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex);
is_done = atomic_u64_eval(&job->gen) != handle.gen; is_done = atomic_u64_eval(&job->gen) != handle.gen;
while (!is_done) { while (!is_done) {
sys_condition_variable_wait(job->gen_cv, &lock); sys_condition_variable_wait(job->gen_cv, &cv_lock);
is_done = atomic_u64_eval(&job->gen) != handle.gen; is_done = atomic_u64_eval(&job->gen) != handle.gen;
} }
sys_mutex_unlock(&lock); sys_mutex_unlock(&cv_lock);
} }
} }
return handle; return handle;
@ -408,16 +396,16 @@ void job_wait(struct job_handle handle)
if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) { if (worker_id >= 0 && (job_pinned_worker < 0 || job_pinned_worker == worker_id)) {
i32 job_id = 0; i32 job_id = 0;
i32 job_count = 0; i32 job_count = 0;
struct worker_job_queue *job_queue = job->queue; struct worker_job_queue *job_queue = NULL;
{ {
struct sys_lock lock = sys_mutex_lock_e(job->mutex); struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex);
job_id = job->num_dispatched++; job_id = job->num_dispatched++;
job_count = job->count; job_count = job->count;
job_queue = job->queue; job_queue = job->queue;
if (job_id < job_count) { if (job_id < job_count) {
++job->num_workers; ++job->num_workers;
} }
sys_mutex_unlock(&lock); sys_mutex_unlock(&queue_lock);
} }
/* Execute job */ /* Execute job */
@ -426,34 +414,7 @@ void job_wait(struct job_handle handle)
struct job_data data = ZI; struct job_data data = ZI;
data.sig = job->sig; data.sig = job->sig;
job_func *func = job->func; job_func *func = job->func;
b32 should_release = false;
while (job_id < job_count) { while (job_id < job_count) {
/* Remove job from queue */
if (job_id == (job_count - 1)) {
__profscope(Dequeue job);
struct sys_lock queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex);
{
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
}
sys_mutex_unlock(&queue_lock);
}
/* Run */ /* Run */
{ {
data.id = job_id; data.id = job_id;
@ -461,45 +422,57 @@ void job_wait(struct job_handle handle)
} }
/* Grab new ID or exit job */ /* Grab new ID or exit job */
{ {
struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); struct sys_lock queue_lock = sys_mutex_lock_e(G.mutex);
job_id = job->num_dispatched++; {
if (job_id >= job_count) { job_id = job->num_dispatched++;
i32 num_workers = --job->num_workers; if (job_id >= job_count) {
if (num_workers == 0) { __profscope(Release job);
is_done = true; is_done = true;
/* Dequeue */
{
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
}
/* Add to free list */
{
job->next_free = G.first_free_job;
G.first_free_job = job;
}
/* Signal waiters */
atomic_u64_eval_add_u64(&job->gen, 1); atomic_u64_eval_add_u64(&job->gen, 1);
should_release = true; {
sys_condition_variable_signal(job->gen_cv, U32_MAX); struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex);
sys_condition_variable_broadcast(job->gen_cv);
sys_mutex_unlock(&cv_lock);
}
} }
} }
sys_mutex_unlock(&job_lock); sys_mutex_unlock(&queue_lock);
} }
} }
if (should_release) {
__profscope(Release job);
struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex);
{
if (job->queue && (job->queue->first == job || job->queue->last == job)) {
DEBUGBREAK;
}
job->next_free = G.first_free_job;
G.first_free_job = job;
}
sys_mutex_unlock(&fj_lock);
}
} }
} }
/* Wait for job completion */ /* Wait for job completion */
if (!is_done) { if (!is_done) {
__profscope(Wait for job); __profscope(Wait for job);
struct sys_lock lock = sys_mutex_lock_e(job->mutex); struct sys_lock cv_lock = sys_mutex_lock_s(job->gen_cv_mutex);
is_done = atomic_u64_eval(&job->gen) != handle.gen; is_done = atomic_u64_eval(&job->gen) != handle.gen;
while (!is_done) { while (!is_done) {
sys_condition_variable_wait(job->gen_cv, &lock); sys_condition_variable_wait(job->gen_cv, &cv_lock);
is_done = atomic_u64_eval(&job->gen) != handle.gen; is_done = atomic_u64_eval(&job->gen) != handle.gen;
} }
sys_mutex_unlock(&lock); sys_mutex_unlock(&cv_lock);
} }
} }
} }
@ -519,7 +492,7 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg)
struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue }; struct worker_job_queue *queues[] = { &G.pinned_queues[worker_id], &G.global_queue };
u64 seen_queue_submit_gen = 0; u64 seen_queue_submit_gen = 0;
struct sys_lock queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); struct sys_lock queue_lock = sys_mutex_lock_s(G.mutex);
while (!G.workers_shutdown) { while (!G.workers_shutdown) {
sys_mutex_unlock(&queue_lock); sys_mutex_unlock(&queue_lock);
@ -531,29 +504,25 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg)
struct worker_job *job = NULL; struct worker_job *job = NULL;
{ {
__profscope(Pick job); __profscope(Pick job);
queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); queue_lock = sys_mutex_lock_e(G.mutex);
{ {
seen_queue_submit_gen = G.queue_submit_gen; seen_queue_submit_gen = G.queue_submit_gen;
for (i32 queue_index = 0; queue_index < (i32)ARRAY_COUNT(queues); ++queue_index) { for (i32 queue_index = 0; queue_index < (i32)ARRAY_COUNT(queues); ++queue_index) {
struct worker_job_queue *queue = queues[queue_index]; struct worker_job_queue *queue = queues[queue_index];
struct worker_job *tmp = queue->first; struct worker_job *tmp = queue->first;
while (!job && tmp) { while (!job && tmp) {
struct sys_lock job_lock = sys_mutex_lock_e(tmp->mutex); i32 tmp_id = tmp->num_dispatched;
{ i32 tmp_count = tmp->count;
i32 tmp_id = tmp->num_dispatched; if (tmp_id < tmp_count) {
i32 tmp_count = tmp->count; /* Pick job */
if (tmp_id < tmp_count) { ++tmp->num_workers;
/* Pick job */ ++tmp->num_dispatched;
++tmp->num_workers; job = tmp;
++tmp->num_dispatched; job_id = tmp_id;
job = tmp; job_count = tmp_count;
job_id = tmp_id; job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id;
job_count = tmp_count; job_queue = tmp->queue;
job_is_pinned_to_worker = tmp->pinned_worker_id == worker_id;
job_queue = tmp->queue;
}
} }
sys_mutex_unlock(&job_lock);
tmp = tmp->next; tmp = tmp->next;
} }
} }
@ -561,33 +530,6 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg)
sys_mutex_unlock(&queue_lock); sys_mutex_unlock(&queue_lock);
} }
/* Remove job from queue */
if (job_id == (job_count - 1)) {
__profscope(Dequeue job);
queue_lock = sys_mutex_lock_e(G.queued_jobs_mutex);
{
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
if (job->prev == job || job->next == job) {
DEBUGBREAKABLE;
}
}
sys_mutex_unlock(&queue_lock);
}
/* Execute job */ /* Execute job */
if (job) { if (job) {
__profscope(Execute job); __profscope(Execute job);
@ -596,45 +538,63 @@ INTERNAL SYS_THREAD_DEF(worker_thread_entry_point, thread_arg)
struct job_data data = ZI; struct job_data data = ZI;
data.sig = job->sig; data.sig = job->sig;
job_func *func = job->func; job_func *func = job->func;
b32 should_release = false; b32 stop = job_id >= job_count;
while (job_id < job_count) { while (!stop) {
/* Run */ /* Run */
{ {
data.id = job_id; data.id = job_id;
func(data); func(data);
} }
/* Grab new ID */ /* Grab new ID or stop */
{ {
struct sys_lock job_lock = sys_mutex_lock_e(job->mutex); queue_lock = sys_mutex_lock_e(G.mutex);
job_id = job->num_dispatched++; job_id = job->num_dispatched++;
if (job_id >= job_count) { if (job_id >= job_count) {
stop = true;
i32 num_workers = --job->num_workers; i32 num_workers = --job->num_workers;
if (num_workers == 0) { if (num_workers == 0) {
__profscope(Release job);
/* Dequeue */
{
struct worker_job *prev = job->prev;
struct worker_job *next = job->next;
if (prev) {
prev->next = next;
} else {
job_queue->first = next;
}
if (next) {
next->prev = prev;
} else {
job_queue->last = prev;
}
}
/* Add to free list */
{
job->next_free = G.first_free_job;
G.first_free_job = job;
}
/* Signal waiters */
atomic_u64_eval_add_u64(&job->gen, 1); atomic_u64_eval_add_u64(&job->gen, 1);
should_release = true; {
sys_condition_variable_signal(job->gen_cv, U32_MAX); struct sys_lock cv_lock = sys_mutex_lock_e(job->gen_cv_mutex);
sys_condition_variable_broadcast(job->gen_cv);
sys_mutex_unlock(&cv_lock);
}
} }
} }
sys_mutex_unlock(&job_lock); sys_mutex_unlock(&queue_lock);
} }
} }
if (should_release) {
__profscope(Release job);
struct sys_lock fj_lock = sys_mutex_lock_e(G.free_jobs_mutex);
{
job->next_free = G.first_free_job;
G.first_free_job = job;
}
sys_mutex_unlock(&fj_lock);
}
atomic_i32_eval_add(&G.num_idle_worker_threads, 1); atomic_i32_eval_add(&G.num_idle_worker_threads, 1);
ctx->pin_depth -= job_is_pinned_to_worker; ctx->pin_depth -= job_is_pinned_to_worker;
} }
queue_lock = sys_mutex_lock_s(G.queued_jobs_mutex); queue_lock = sys_mutex_lock_s(G.mutex);
if (!G.workers_shutdown && G.queue_submit_gen == seen_queue_submit_gen) { if (!G.workers_shutdown && G.queue_submit_gen == seen_queue_submit_gen) {
//__profscope(Worker sleep); //__profscope(Worker sleep);
sys_condition_variable_wait(G.workers_wake_cv, &queue_lock); sys_condition_variable_wait(G.workers_wake_cv, &queue_lock);
} }
} }
sys_mutex_unlock(&queue_lock);
} }

View File

@ -14,7 +14,7 @@
# define TRACY_MANUAL_LIFETIME # define TRACY_MANUAL_LIFETIME
# define TRACY_DELAYED_INIT # define TRACY_DELAYED_INIT
#endif #endif
#if 0 #if 1
/* Disable system tracing (very slow) */ /* Disable system tracing (very slow) */
# define TRACY_NO_CALLSTACK # define TRACY_NO_CALLSTACK
# define TRACY_NO_SYSTEM_TRACING # define TRACY_NO_SYSTEM_TRACING
@ -22,7 +22,7 @@
#include STRINGIZE(TRACY_INCLUDE_PATH) #include STRINGIZE(TRACY_INCLUDE_PATH)
#define PROFILING_CAPTURE_FRAME_IMAGE 0 #define PROFILING_CAPTURE_FRAME_IMAGE 0
#define PROFILING_LOCKS 1 #define PROFILING_LOCKS 0
#define PROFILING_D3D 1 #define PROFILING_D3D 1
#define PROFILING_CMD_WSTR L"tracy-profiler.exe -a 127.0.0.1" #define PROFILING_CMD_WSTR L"tracy-profiler.exe -a 127.0.0.1"