From 925ef5a48219c8493f2812852f15c6cfaa8517ab Mon Sep 17 00:00:00 2001 From: jacob Date: Wed, 24 Apr 2024 20:03:07 -0500 Subject: [PATCH] replace semaphore usage w/ condition variable --- src/sheet.c | 4 +-- src/sys.h | 22 ++------------- src/sys_win32.c | 62 +++++++++++------------------------------- src/util.h | 2 +- src/work.c | 71 +++++++++++++++++++++++++++++-------------------- 5 files changed, 62 insertions(+), 99 deletions(-) diff --git a/src/sheet.c b/src/sheet.c index 24940114..001d96cb 100644 --- a/src/sheet.c +++ b/src/sheet.c @@ -257,7 +257,7 @@ INTERNAL struct sheet init_sheet_from_ase_result(struct arena *arena, struct ase return sheet; } -INTERNAL void node_load(struct cache_node *n, struct sheet_tag tag) +INTERNAL void sheet_load(struct cache_node *n, struct sheet_tag tag) { __prof; struct temp_arena scratch = scratch_begin_no_conflict(); @@ -428,7 +428,7 @@ INTERNAL struct sheet *sheet_from_tag_internal(struct sheet_scope *scope, struct } else if (state == CACHE_NODE_STATE_NONE) { if (atomic_u32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUED) == CACHE_NODE_STATE_NONE) { if (await) { - node_load(n, tag); + sheet_load(n, tag); res = &n->sheet; } else { /* TODO */ diff --git a/src/sys.h b/src/sys.h index 6262f6e7..bf43aaec 100644 --- a/src/sys.h +++ b/src/sys.h @@ -302,11 +302,6 @@ struct sys_mutex { u64 handle; #if RTC u64 owner_tid; -# if OS_WINDOWS - wchar_t *owner_name; -# else - char *owner_name; -# endif #endif }; @@ -358,7 +353,7 @@ void sys_rw_mutex_assert_locked_exclusive(struct sys_rw_mutex *mutex); struct sys_condition_variable { u64 handle; #if RTC - struct atomic_i64 num_sleepers; + struct atomic_i64 num_waiters; #endif }; @@ -367,20 +362,7 @@ void sys_condition_variable_release(struct sys_condition_variable *cv); void sys_condition_variable_wait(struct sys_condition_variable *cv, struct sys_mutex *mutex); void sys_condition_variable_wait_time(struct sys_condition_variable *cv, struct sys_mutex *mutex, f64 seconds); void sys_condition_variable_signal(struct sys_condition_variable *cv); - -/* ========================== * - * Semaphore - * ========================== */ - -struct sys_semaphore { - u64 handle; -}; - -struct sys_semaphore sys_semaphore_alloc(u32 max_count); -void sys_semaphore_release(struct sys_semaphore *semaphore); -void sys_semaphore_wait(struct sys_semaphore *semaphore); -void sys_semaphore_wait_timed(struct sys_semaphore *semaphore, f64 seconds); -void sys_semaphore_signal(struct sys_semaphore *semaphore, u32 count); +void sys_condition_variable_broadcast(struct sys_condition_variable *cv); /* ========================== * * Thread local storage diff --git a/src/sys_win32.c b/src/sys_win32.c index 11fe4ce8..1b44c0a9 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -1251,7 +1251,6 @@ void sys_mutex_lock(struct sys_mutex *mutex) AcquireSRWLockExclusive((SRWLOCK *)&mutex->handle); #if RTC mutex->owner_tid = (u64)GetCurrentThreadId(); - GetThreadDescription(GetCurrentThread(), &mutex->owner_name); #endif } @@ -1259,7 +1258,6 @@ void sys_mutex_unlock(struct sys_mutex *mutex) { __prof; #if RTC - mutex->owner_name = L"None"; mutex->owner_tid = 0; #endif ReleaseSRWLockExclusive((SRWLOCK *)&mutex->handle); @@ -1393,7 +1391,7 @@ void sys_condition_variable_release(struct sys_condition_variable *cv) { __prof; /* Condition variable must not have any sleepers (signal before releasing) */ - ASSERT(atomic_i64_eval(&cv->num_sleepers) == 0); + ASSERT(atomic_i64_eval(&cv->num_waiters) == 0); win32_condition_variable_release((struct win32_condition_variable *)cv->handle); } @@ -1401,11 +1399,13 @@ void sys_condition_variable_wait(struct sys_condition_variable *cv, struct sys_m { __prof; #if RTC - atomic_i64_inc_eval(&cv->num_sleepers); + atomic_i64_inc_eval(&cv->num_waiters); + mutex->owner_tid = 0; #endif SleepConditionVariableSRW((PCONDITION_VARIABLE)cv->handle, (SRWLOCK *)&mutex->handle, INFINITE, 0); #if RTC - atomic_i64_dec_eval(&cv->num_sleepers); + mutex->owner_tid = (u64)GetCurrentThreadId(); + atomic_i64_dec_eval(&cv->num_waiters); #endif } @@ -1413,61 +1413,29 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *cv, struct { __prof; #if RTC - atomic_i64_inc_eval(&cv->num_sleepers); + atomic_i64_inc_eval(&cv->num_waiters); + mutex->owner_tid = 0; #endif u32 ms = (u32)math_round_to_int((f32)seconds * 1000.f); SleepConditionVariableSRW((PCONDITION_VARIABLE)cv->handle, (SRWLOCK *)&mutex->handle, ms, 0); #if RTC - atomic_i64_dec_eval(&cv->num_sleepers); + mutex->owner_tid = (u64)GetCurrentThreadId(); + atomic_i64_dec_eval(&cv->num_waiters); #endif } void sys_condition_variable_signal(struct sys_condition_variable *cv) +{ + __prof; + WakeConditionVariable((PCONDITION_VARIABLE)cv->handle); +} + +void sys_condition_variable_broadcast(struct sys_condition_variable *cv) { __prof; WakeAllConditionVariable((PCONDITION_VARIABLE)cv->handle); } -/* ========================== * - * Semaphore - * ========================== */ - -/* TODO: Use similar allocation scheme to mutex & condition var for speed? */ - -struct sys_semaphore sys_semaphore_alloc(u32 max_count) -{ - struct sys_semaphore semaphore = { - .handle = (u64)CreateSemaphoreEx(0, 0, max_count, NULL, 0, SEMAPHORE_ALL_ACCESS) - }; - return semaphore; -} - -void sys_semaphore_release(struct sys_semaphore *semaphore) -{ - __prof; - CloseHandle((HANDLE)semaphore->handle); -} - -void sys_semaphore_wait(struct sys_semaphore *semaphore) -{ - __prof; - WaitForSingleObjectEx((HANDLE)semaphore->handle, INFINITE, FALSE); -} - -void sys_semaphore_wait_timed(struct sys_semaphore *semaphore, f64 seconds) -{ - __prof; - u32 ms = max_u32(1, math_round_to_int((f32)(seconds * 1000.0))); - WaitForSingleObjectEx((HANDLE)semaphore->handle, ms, FALSE); -} - -void sys_semaphore_signal(struct sys_semaphore *semaphore, u32 count) -{ - __prof; - /* FIXME: Implicit mutex release means mutex debug owner info becomes invalid? */ - ReleaseSemaphore((HANDLE)semaphore->handle, count, NULL); -} - /* ========================== * * Thread local storage * ========================== */ diff --git a/src/util.h b/src/util.h index 76cb08ba..312f9094 100644 --- a/src/util.h +++ b/src/util.h @@ -153,7 +153,7 @@ INLINE void sync_flag_set(struct sync_flag *sf) { __prof; if (atomic_i32_eval_compare_exchange(&sf->flag, 0, 1) == 0) { - sys_condition_variable_signal(&sf->cv); + sys_condition_variable_broadcast(&sf->cv); } } diff --git a/src/work.c b/src/work.c index 446d5100..77e149cd 100644 --- a/src/work.c +++ b/src/work.c @@ -69,10 +69,10 @@ struct work_task { GLOBAL struct { struct arena arena; + b32 workers_shutdown; struct sys_mutex mutex; - struct sys_semaphore semaphore; + struct sys_condition_variable cv; - struct atomic_i32 workers_shutdown; u32 worker_count; u32 idle_worker_count; struct worker *worker_head; @@ -116,33 +116,31 @@ struct work_startup_receipt work_startup(u32 num_worker_threads) G.arena = arena_alloc(GIGABYTE(64)); G.mutex = sys_mutex_alloc(); - G.semaphore = sys_semaphore_alloc(num_worker_threads); + G.cv = sys_condition_variable_alloc(); G.worker_count = num_worker_threads; G.idle_worker_count = num_worker_threads; app_register_exit_callback(&work_shutdown); /* Initialize threads */ + sys_mutex_lock(&G.mutex); { - sys_mutex_lock(&G.mutex); - { - struct worker *prev = NULL; - for (u32 i = 0; i < num_worker_threads; ++i) { - struct string thread_name = string_format(scratch.arena, - STR("[P0] Worker %F"), - FMT_UINT(i)); + struct worker *prev = NULL; + for (u32 i = 0; i < num_worker_threads; ++i) { + struct string thread_name = string_format(scratch.arena, + STR("[P0] Worker %F"), + FMT_UINT(i)); - struct worker *worker = arena_push_zero(&G.arena, struct worker); - worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name); - if (prev) { - prev->next = worker; - } else { - G.worker_head = worker; - } - prev = worker; + struct worker *worker = arena_push_zero(&G.arena, struct worker); + worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name); + if (prev) { + prev->next = worker; + } else { + G.worker_head = worker; } + prev = worker; } - sys_mutex_unlock(&G.mutex); } + sys_mutex_unlock(&G.mutex); scratch_end(scratch); @@ -152,8 +150,12 @@ struct work_startup_receipt work_startup(u32 num_worker_threads) INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown) { __prof; - atomic_i32_eval_exchange(&G.workers_shutdown, true); - sys_semaphore_signal(&G.semaphore, G.worker_count); + sys_mutex_lock(&G.mutex); + { + G.workers_shutdown = true; + sys_condition_variable_broadcast(&G.cv); + } + sys_mutex_unlock(&G.mutex); for (struct worker *worker = G.worker_head; (worker = worker->next);) { sys_thread_wait_release(&worker->thread); } @@ -271,7 +273,9 @@ INTERNAL void work_schedule_assume_locked(struct work *work) G.scheduled_work_priority_tails[priority] = work; WRITE_BARRIER(); - sys_semaphore_signal(&G.semaphore, min_u32(work->tasks_incomplete, G.worker_count)); + for (u64 i = 0; i < work->tasks_incomplete; ++i) { + sys_condition_variable_signal(&G.cv); + } } INTERNAL void work_unschedule_assume_locked(struct work *work) @@ -352,7 +356,7 @@ INTERNAL b32 work_exec_single_task_maybe_release_assume_locked(struct work *work if (work->tasks_incomplete == 0) { /* Signal finished */ work->status = WORK_STATUS_DONE; - sys_condition_variable_signal(&work->condition_variable_finished); + sys_condition_variable_broadcast(&work->condition_variable_finished); /* Release */ work_release_assume_locked(work); @@ -387,12 +391,20 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data) .is_worker = true }; - while (!atomic_i32_eval(&G.workers_shutdown)) { - sys_semaphore_wait_timed(&G.semaphore, 0.500); /* Timed to poll G.shutdown */ - while (G.scheduled_work_head) { + while (true) { + sys_mutex_lock(&G.mutex); + { + /* Check for exit */ + if (G.workers_shutdown) { + sys_mutex_unlock(&G.mutex); + break; + } + + /* Wait */ + sys_condition_variable_wait(&G.cv, &G.mutex); + /* Do work from top */ - sys_mutex_lock(&G.mutex); - { + while (G.scheduled_work_head) { struct work *work = G.scheduled_work_head; if (work) { __profscope(work_pool_task); @@ -401,8 +413,9 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data) ++G.idle_worker_count; } } - sys_mutex_unlock(&G.mutex); } + + sys_mutex_unlock(&G.mutex); } }