replace semaphore usage w/ condition variable

This commit is contained in:
jacob 2024-04-24 20:03:07 -05:00
parent e3830fccae
commit 925ef5a482
5 changed files with 62 additions and 99 deletions

View File

@ -257,7 +257,7 @@ INTERNAL struct sheet init_sheet_from_ase_result(struct arena *arena, struct ase
return sheet;
}
INTERNAL void node_load(struct cache_node *n, struct sheet_tag tag)
INTERNAL void sheet_load(struct cache_node *n, struct sheet_tag tag)
{
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
@ -428,7 +428,7 @@ INTERNAL struct sheet *sheet_from_tag_internal(struct sheet_scope *scope, struct
} else if (state == CACHE_NODE_STATE_NONE) {
if (atomic_u32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUED) == CACHE_NODE_STATE_NONE) {
if (await) {
node_load(n, tag);
sheet_load(n, tag);
res = &n->sheet;
} else {
/* TODO */

View File

@ -302,11 +302,6 @@ struct sys_mutex {
u64 handle;
#if RTC
u64 owner_tid;
# if OS_WINDOWS
wchar_t *owner_name;
# else
char *owner_name;
# endif
#endif
};
@ -358,7 +353,7 @@ void sys_rw_mutex_assert_locked_exclusive(struct sys_rw_mutex *mutex);
struct sys_condition_variable {
u64 handle;
#if RTC
struct atomic_i64 num_sleepers;
struct atomic_i64 num_waiters;
#endif
};
@ -367,20 +362,7 @@ void sys_condition_variable_release(struct sys_condition_variable *cv);
void sys_condition_variable_wait(struct sys_condition_variable *cv, struct sys_mutex *mutex);
void sys_condition_variable_wait_time(struct sys_condition_variable *cv, struct sys_mutex *mutex, f64 seconds);
void sys_condition_variable_signal(struct sys_condition_variable *cv);
/* ========================== *
* Semaphore
* ========================== */
struct sys_semaphore {
u64 handle;
};
struct sys_semaphore sys_semaphore_alloc(u32 max_count);
void sys_semaphore_release(struct sys_semaphore *semaphore);
void sys_semaphore_wait(struct sys_semaphore *semaphore);
void sys_semaphore_wait_timed(struct sys_semaphore *semaphore, f64 seconds);
void sys_semaphore_signal(struct sys_semaphore *semaphore, u32 count);
void sys_condition_variable_broadcast(struct sys_condition_variable *cv);
/* ========================== *
* Thread local storage

View File

@ -1251,7 +1251,6 @@ void sys_mutex_lock(struct sys_mutex *mutex)
AcquireSRWLockExclusive((SRWLOCK *)&mutex->handle);
#if RTC
mutex->owner_tid = (u64)GetCurrentThreadId();
GetThreadDescription(GetCurrentThread(), &mutex->owner_name);
#endif
}
@ -1259,7 +1258,6 @@ void sys_mutex_unlock(struct sys_mutex *mutex)
{
__prof;
#if RTC
mutex->owner_name = L"None";
mutex->owner_tid = 0;
#endif
ReleaseSRWLockExclusive((SRWLOCK *)&mutex->handle);
@ -1393,7 +1391,7 @@ void sys_condition_variable_release(struct sys_condition_variable *cv)
{
__prof;
/* Condition variable must not have any sleepers (signal before releasing) */
ASSERT(atomic_i64_eval(&cv->num_sleepers) == 0);
ASSERT(atomic_i64_eval(&cv->num_waiters) == 0);
win32_condition_variable_release((struct win32_condition_variable *)cv->handle);
}
@ -1401,11 +1399,13 @@ void sys_condition_variable_wait(struct sys_condition_variable *cv, struct sys_m
{
__prof;
#if RTC
atomic_i64_inc_eval(&cv->num_sleepers);
atomic_i64_inc_eval(&cv->num_waiters);
mutex->owner_tid = 0;
#endif
SleepConditionVariableSRW((PCONDITION_VARIABLE)cv->handle, (SRWLOCK *)&mutex->handle, INFINITE, 0);
#if RTC
atomic_i64_dec_eval(&cv->num_sleepers);
mutex->owner_tid = (u64)GetCurrentThreadId();
atomic_i64_dec_eval(&cv->num_waiters);
#endif
}
@ -1413,61 +1413,29 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *cv, struct
{
__prof;
#if RTC
atomic_i64_inc_eval(&cv->num_sleepers);
atomic_i64_inc_eval(&cv->num_waiters);
mutex->owner_tid = 0;
#endif
u32 ms = (u32)math_round_to_int((f32)seconds * 1000.f);
SleepConditionVariableSRW((PCONDITION_VARIABLE)cv->handle, (SRWLOCK *)&mutex->handle, ms, 0);
#if RTC
atomic_i64_dec_eval(&cv->num_sleepers);
mutex->owner_tid = (u64)GetCurrentThreadId();
atomic_i64_dec_eval(&cv->num_waiters);
#endif
}
void sys_condition_variable_signal(struct sys_condition_variable *cv)
{
__prof;
WakeConditionVariable((PCONDITION_VARIABLE)cv->handle);
}
void sys_condition_variable_broadcast(struct sys_condition_variable *cv)
{
__prof;
WakeAllConditionVariable((PCONDITION_VARIABLE)cv->handle);
}
/* ========================== *
* Semaphore
* ========================== */
/* TODO: Use similar allocation scheme to mutex & condition var for speed? */
struct sys_semaphore sys_semaphore_alloc(u32 max_count)
{
struct sys_semaphore semaphore = {
.handle = (u64)CreateSemaphoreEx(0, 0, max_count, NULL, 0, SEMAPHORE_ALL_ACCESS)
};
return semaphore;
}
void sys_semaphore_release(struct sys_semaphore *semaphore)
{
__prof;
CloseHandle((HANDLE)semaphore->handle);
}
void sys_semaphore_wait(struct sys_semaphore *semaphore)
{
__prof;
WaitForSingleObjectEx((HANDLE)semaphore->handle, INFINITE, FALSE);
}
void sys_semaphore_wait_timed(struct sys_semaphore *semaphore, f64 seconds)
{
__prof;
u32 ms = max_u32(1, math_round_to_int((f32)(seconds * 1000.0)));
WaitForSingleObjectEx((HANDLE)semaphore->handle, ms, FALSE);
}
void sys_semaphore_signal(struct sys_semaphore *semaphore, u32 count)
{
__prof;
/* FIXME: Implicit mutex release means mutex debug owner info becomes invalid? */
ReleaseSemaphore((HANDLE)semaphore->handle, count, NULL);
}
/* ========================== *
* Thread local storage
* ========================== */

View File

@ -153,7 +153,7 @@ INLINE void sync_flag_set(struct sync_flag *sf)
{
__prof;
if (atomic_i32_eval_compare_exchange(&sf->flag, 0, 1) == 0) {
sys_condition_variable_signal(&sf->cv);
sys_condition_variable_broadcast(&sf->cv);
}
}

View File

@ -69,10 +69,10 @@ struct work_task {
GLOBAL struct {
struct arena arena;
b32 workers_shutdown;
struct sys_mutex mutex;
struct sys_semaphore semaphore;
struct sys_condition_variable cv;
struct atomic_i32 workers_shutdown;
u32 worker_count;
u32 idle_worker_count;
struct worker *worker_head;
@ -116,33 +116,31 @@ struct work_startup_receipt work_startup(u32 num_worker_threads)
G.arena = arena_alloc(GIGABYTE(64));
G.mutex = sys_mutex_alloc();
G.semaphore = sys_semaphore_alloc(num_worker_threads);
G.cv = sys_condition_variable_alloc();
G.worker_count = num_worker_threads;
G.idle_worker_count = num_worker_threads;
app_register_exit_callback(&work_shutdown);
/* Initialize threads */
sys_mutex_lock(&G.mutex);
{
sys_mutex_lock(&G.mutex);
{
struct worker *prev = NULL;
for (u32 i = 0; i < num_worker_threads; ++i) {
struct string thread_name = string_format(scratch.arena,
STR("[P0] Worker %F"),
FMT_UINT(i));
struct worker *prev = NULL;
for (u32 i = 0; i < num_worker_threads; ++i) {
struct string thread_name = string_format(scratch.arena,
STR("[P0] Worker %F"),
FMT_UINT(i));
struct worker *worker = arena_push_zero(&G.arena, struct worker);
worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name);
if (prev) {
prev->next = worker;
} else {
G.worker_head = worker;
}
prev = worker;
struct worker *worker = arena_push_zero(&G.arena, struct worker);
worker->thread = sys_thread_alloc(&worker_thread_entry_point, NULL, thread_name);
if (prev) {
prev->next = worker;
} else {
G.worker_head = worker;
}
prev = worker;
}
sys_mutex_unlock(&G.mutex);
}
sys_mutex_unlock(&G.mutex);
scratch_end(scratch);
@ -152,8 +150,12 @@ struct work_startup_receipt work_startup(u32 num_worker_threads)
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(work_shutdown)
{
__prof;
atomic_i32_eval_exchange(&G.workers_shutdown, true);
sys_semaphore_signal(&G.semaphore, G.worker_count);
sys_mutex_lock(&G.mutex);
{
G.workers_shutdown = true;
sys_condition_variable_broadcast(&G.cv);
}
sys_mutex_unlock(&G.mutex);
for (struct worker *worker = G.worker_head; (worker = worker->next);) {
sys_thread_wait_release(&worker->thread);
}
@ -271,7 +273,9 @@ INTERNAL void work_schedule_assume_locked(struct work *work)
G.scheduled_work_priority_tails[priority] = work;
WRITE_BARRIER();
sys_semaphore_signal(&G.semaphore, min_u32(work->tasks_incomplete, G.worker_count));
for (u64 i = 0; i < work->tasks_incomplete; ++i) {
sys_condition_variable_signal(&G.cv);
}
}
INTERNAL void work_unschedule_assume_locked(struct work *work)
@ -352,7 +356,7 @@ INTERNAL b32 work_exec_single_task_maybe_release_assume_locked(struct work *work
if (work->tasks_incomplete == 0) {
/* Signal finished */
work->status = WORK_STATUS_DONE;
sys_condition_variable_signal(&work->condition_variable_finished);
sys_condition_variable_broadcast(&work->condition_variable_finished);
/* Release */
work_release_assume_locked(work);
@ -387,12 +391,20 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data)
.is_worker = true
};
while (!atomic_i32_eval(&G.workers_shutdown)) {
sys_semaphore_wait_timed(&G.semaphore, 0.500); /* Timed to poll G.shutdown */
while (G.scheduled_work_head) {
while (true) {
sys_mutex_lock(&G.mutex);
{
/* Check for exit */
if (G.workers_shutdown) {
sys_mutex_unlock(&G.mutex);
break;
}
/* Wait */
sys_condition_variable_wait(&G.cv, &G.mutex);
/* Do work from top */
sys_mutex_lock(&G.mutex);
{
while (G.scheduled_work_head) {
struct work *work = G.scheduled_work_head;
if (work) {
__profscope(work_pool_task);
@ -401,8 +413,9 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(worker_thread_entry_point, thread_data)
++G.idle_worker_count;
}
}
sys_mutex_unlock(&G.mutex);
}
sys_mutex_unlock(&G.mutex);
}
}