From c3d0869707ae706271ee597c080dbec2ec0bbbfd Mon Sep 17 00:00:00 2001 From: jacob Date: Tue, 13 May 2025 09:25:48 -0500 Subject: [PATCH] fix dangling resource watch monitor thread --- src/resource.c | 60 ++++++++++++++++---------------- src/sprite.c | 82 ++++++++++++++++++++++++++------------------ src/sys.h | 7 ++-- src/sys_win32.c | 91 ++++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 157 insertions(+), 83 deletions(-) diff --git a/src/resource.c b/src/resource.c index 4d882890..c6cc2643 100644 --- a/src/resource.c +++ b/src/resource.c @@ -26,11 +26,13 @@ GLOBAL struct { struct sys_thread resource_watch_monitor_thread; struct sys_thread resource_watch_dispatch_thread; + struct sys_watch watch; + struct atomic_i32 watch_shutdown; + struct sys_mutex watch_dispatcher_mutex; struct arena watch_dispatcher_info_arena; struct sys_watch_info_list watch_dispatcher_info_list; struct sys_condition_variable watch_dispatcher_cv; - b32 watch_dispatcher_shutdown; struct sys_mutex watch_callbacks_mutex; resource_watch_callback *watch_callbacks[64]; @@ -66,6 +68,7 @@ struct resource_startup_receipt resource_startup(void) #endif #if RESOURCE_RELOADING + G.watch = sys_watch_alloc(LIT("res")); G.watch_callbacks_mutex = sys_mutex_alloc(); G.watch_dispatcher_mutex = sys_mutex_alloc(); @@ -147,14 +150,13 @@ b32 resource_exists(struct string path) INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(resource_shutdown) { __prof; - /* Wait for dispatcher thread to finish before shutting down */ - struct sys_lock lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex); - { - G.watch_dispatcher_shutdown = true; - } - sys_mutex_unlock(&lock); + atomic_i32_eval_exchange(&G.watch_shutdown, 1); + sys_condition_variable_broadcast(&G.watch_dispatcher_cv); + sys_watch_wake(&G.watch); + sys_thread_wait_release(&G.resource_watch_dispatch_thread); + sys_thread_wait_release(&G.resource_watch_monitor_thread); } void resource_register_watch_callback(resource_watch_callback *callback) @@ -174,30 +176,28 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_monitor_thread_entry_poi { (UNUSED)_; struct temp_arena scratch = scratch_begin_no_conflict(); - struct sys_watch watch = sys_watch_alloc(LIT("res")); - /* NOTE: We let OS force-shutdown this thread */ - volatile i32 run = true; - while (run) { + while (!atomic_i32_eval(&G.watch_shutdown)) { struct temp_arena temp = arena_temp_begin(scratch.arena); - struct sys_watch_info_list res = sys_watch_wait(temp.arena, &watch); - struct sys_lock lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex); - { - struct sys_watch_info_list list_part = sys_watch_info_copy(&G.watch_dispatcher_info_arena, res); - if (G.watch_dispatcher_info_list.last) { - G.watch_dispatcher_info_list.last->next = list_part.first; - list_part.first->prev = G.watch_dispatcher_info_list.last; - G.watch_dispatcher_info_list.last = list_part.last; - } else { - G.watch_dispatcher_info_list = list_part; + struct sys_watch_info_list res = sys_watch_wait(temp.arena, &G.watch); + if (res.first && !atomic_i32_eval(&G.watch_shutdown)) { + struct sys_lock lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex); + { + struct sys_watch_info_list list_part = sys_watch_info_copy(&G.watch_dispatcher_info_arena, res); + if (G.watch_dispatcher_info_list.last) { + G.watch_dispatcher_info_list.last->next = list_part.first; + list_part.first->prev = G.watch_dispatcher_info_list.last; + G.watch_dispatcher_info_list.last = list_part.last; + } else { + G.watch_dispatcher_info_list = list_part; + } } + sys_mutex_unlock(&lock); + sys_condition_variable_broadcast(&G.watch_dispatcher_cv); } - sys_mutex_unlock(&lock); - sys_condition_variable_broadcast(&G.watch_dispatcher_cv); arena_temp_end(temp); } - sys_watch_release(&watch); scratch_end(scratch); } @@ -215,15 +215,16 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_ struct temp_arena scratch = scratch_begin_no_conflict(); struct sys_lock watch_dispatcher_lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex); - while (!G.watch_dispatcher_shutdown) { - if (G.watch_dispatcher_info_arena.pos > 0) { + while (!atomic_i32_eval(&G.watch_shutdown)) { + sys_condition_variable_wait(&G.watch_dispatcher_cv, &watch_dispatcher_lock); + if (!atomic_i32_eval(&G.watch_shutdown) && G.watch_dispatcher_info_arena.pos > 0) { /* Unlock and sleep a bit so duplicate events pile up */ { sys_mutex_unlock(&watch_dispatcher_lock); sys_sleep(WATCH_DISPATCHER_DELAY_SECONDS); watch_dispatcher_lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex); } - if (!G.watch_dispatcher_shutdown) { + if (!atomic_i32_eval(&G.watch_shutdown)) { struct temp_arena temp = arena_temp_begin(scratch.arena); /* Pull watch info from queue */ @@ -234,6 +235,7 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_ /* Unlock and run callbacks */ sys_mutex_unlock(&watch_dispatcher_lock); { + __profscope(run_resource_watch_callbacks); struct fixed_dict dedup_dict = fixed_dict_init(temp.arena, WATCH_DISPATCHER_DEDUP_DICT_BINS); for (struct sys_watch_info *info = watch_info_list.first; info; info = info->next) { /* Do not run callbacks for the same file more than once */ @@ -258,10 +260,6 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_ arena_temp_end(temp); } } - - if (!G.watch_dispatcher_shutdown) { - sys_condition_variable_wait(&G.watch_dispatcher_cv, &watch_dispatcher_lock); - } } scratch_end(scratch); diff --git a/src/sprite.c b/src/sprite.c index d2283ada..ae25fb48 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -58,6 +58,7 @@ enum cache_node_kind { enum cache_node_state { CACHE_NODE_STATE_NONE, + CACHE_NODE_STATE_QUEUEING, CACHE_NODE_STATE_QUEUED, CACHE_NODE_STATE_WORKING, CACHE_NODE_STATE_LOADED @@ -76,7 +77,7 @@ struct cache_node_hash { struct cache_node { enum cache_node_kind kind; struct cache_node_hash hash; - struct atomic_u32 state; + struct atomic_i32 state; struct atomic_u64 refcount_struct; /* Cast eval to `cache_node_refcount` */ /* Allocated data */ @@ -85,6 +86,9 @@ struct cache_node { struct sprite_texture *texture; struct sprite_sheet *sheet; + /* Work */ + struct work_handle work; + /* Hash list */ struct cache_node *next_in_bin; struct cache_node *prev_in_bin; @@ -350,7 +354,7 @@ INTERNAL void cache_node_load_texture(struct cache_node *n, struct sprite_tag ta __prof; struct temp_arena scratch = scratch_begin_no_conflict(); - atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING); + atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING); struct string path = tag.path; logf_info("Loading sprite texture [%F] \"%F\"", FMT_HEX(n->hash.v), FMT_STR(path)); @@ -395,7 +399,7 @@ INTERNAL void cache_node_load_texture(struct cache_node *n, struct sprite_tag ta FMT_FLOAT(elapsed), FMT_UINT(n->memory_usage)); - atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED); + atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED); scratch_end(scratch); } @@ -649,7 +653,7 @@ INTERNAL void cache_node_load_sheet(struct cache_node *n, struct sprite_tag tag) __prof; struct temp_arena scratch = scratch_begin_no_conflict(); - atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING); + atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING); struct string path = tag.path; logf_info("Loading sprite sheet [%F] \"%F\"", FMT_HEX(n->hash.v), FMT_STR(path)); @@ -688,7 +692,7 @@ INTERNAL void cache_node_load_sheet(struct cache_node *n, struct sprite_tag tag) FMT_UINT(n->memory_usage)); - atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED); + atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED); scratch_end(scratch); } @@ -884,7 +888,7 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_ struct cache_node *n = node_lookup_touch(scope, tag, kind); - u32 state = atomic_u32_eval(&n->state); + enum cache_node_state state = atomic_i32_eval(&n->state); if (state == CACHE_NODE_STATE_LOADED) { switch (kind) { case CACHE_NODE_KIND_TEXTURE: { res = n->texture; } break; @@ -892,7 +896,7 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_ default: { sys_panic(LIT("Unknown sprite cache node kind")); } break; } } else if (state == CACHE_NODE_STATE_NONE) { - if (atomic_u32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUED) == CACHE_NODE_STATE_NONE) { + if (atomic_i32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUEING) == CACHE_NODE_STATE_NONE) { /* Node is new, load texture */ if (await) { switch (kind) { @@ -907,39 +911,53 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_ default: { sys_panic(LIT("Unknown sprite cache node kind")); } break; } } else { - struct sys_lock lock = sys_mutex_lock_e(&G.load_cmds_mutex); - /* Allocate cmd */ struct load_cmd *cmd = NULL; - if (G.first_free_load_cmd) { - cmd = G.first_free_load_cmd; - G.first_free_load_cmd = cmd->next_free; - MEMZERO_STRUCT(cmd); - } else { - cmd = arena_push_zero(&G.load_cmds_arena, struct load_cmd); - } - - /* Initialize cmd */ - cmd->cache_node = n; - cmd->tag = tag; + struct sys_lock lock = sys_mutex_lock_e(&G.load_cmds_mutex); { - u64 copy_len = min_u64(tag.path.len, ARRAY_COUNT(cmd->tag_path_buff)); - cmd->tag.path.text = cmd->tag_path_buff; - MEMCPY(cmd->tag.path.text, tag.path.text, copy_len); - } + if (G.first_free_load_cmd) { + cmd = G.first_free_load_cmd; + G.first_free_load_cmd = cmd->next_free; + MEMZERO_STRUCT(cmd); + } else { + cmd = arena_push_zero(&G.load_cmds_arena, struct load_cmd); + } - /* Cmd holds reference to node */ - node_refcount_add(n, 1); + /* Initialize cmd */ + cmd->cache_node = n; + cmd->tag = tag; + { + u64 copy_len = min_u64(tag.path.len, ARRAY_COUNT(cmd->tag_path_buff)); + cmd->tag.path.text = cmd->tag_path_buff; + MEMCPY(cmd->tag.path.text, tag.path.text, copy_len); + } + + /* Cmd holds reference to node */ + node_refcount_add(n, 1); + } + sys_mutex_unlock(&lock); /* Push work */ - work_push_task(&sprite_load_task, cmd, WORK_PRIORITY_NORMAL); - - sys_mutex_unlock(&lock); + n->work = work_push_task(&sprite_load_task, cmd, WORK_PRIORITY_NORMAL); + atomic_i32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_QUEUEING, CACHE_NODE_STATE_QUEUED); } } } - /* FIXME: Spinlock until resource loaded if async */ + /* TODO: Spinlock */ + if (await && state != CACHE_NODE_STATE_LOADED) { + while (true) { + state = atomic_i32_eval(&n->state); + if (state == CACHE_NODE_STATE_LOADED) { + break; + } else if (state >= CACHE_NODE_STATE_QUEUED) { + work_wait(n->work); + } else { + /* Spinlock until work is ready to be waited on or sprite finishes loading */ + ix_pause(); + } + } + } return res; } @@ -1124,10 +1142,6 @@ INTERNAL SORT_COMPARE_FUNC_DEF(evict_sort, arg_a, arg_b, udata) i32 res = (cycle_b > cycle_a) - (cycle_a > cycle_b); res += ((a->force_evict > b->force_evict) - (a->force_evict < b->force_evict)) * 2; - if (a->force_evict != b->force_evict) { - DEBUGBREAKABLE; - } - return res; } diff --git a/src/sys.h b/src/sys.h index 4c7398dc..498a8f99 100644 --- a/src/sys.h +++ b/src/sys.h @@ -264,7 +264,9 @@ enum sys_watch_info_kind { SYS_WATCH_INFO_KIND_UNKNOWN, SYS_WATCH_INFO_KIND_ADDED, SYS_WATCH_INFO_KIND_REMOVED, - SYS_WATCH_INFO_KIND_MODIFIED + SYS_WATCH_INFO_KIND_MODIFIED, + SYS_WATCH_INFO_KIND_RENAMED_OLD, + SYS_WATCH_INFO_KIND_RENAMED_NEW }; @@ -287,6 +289,7 @@ struct sys_watch_info_list { struct sys_watch sys_watch_alloc(struct string path); void sys_watch_release(struct sys_watch *dw); struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch *dw); +void sys_watch_wake(struct sys_watch *dw); struct sys_watch_info_list sys_watch_info_copy(struct arena *arena, struct sys_watch_info_list src); /* ========================== * @@ -438,8 +441,8 @@ struct sys_thread sys_thread_alloc( struct string thread_name ); -/* Halt and wait for a thread to finish */ void sys_thread_wait_release(struct sys_thread *thread); +void sys_thread_force_release(struct sys_thread *thread); /* Gets the current executing thread's ID */ u32 sys_thread_id(void); diff --git a/src/sys_win32.c b/src/sys_win32.c index 48d7f963..5bb5b5f1 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -674,6 +674,7 @@ void sys_file_filter_end(struct sys_file_filter *filter) struct win32_watch { HANDLE dir_handle; + HANDLE wake_handle; struct win32_watch *next_free; u64 dir_path_len; @@ -724,10 +725,12 @@ struct sys_watch sys_watch_alloc(struct string dir_path) FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL ); + w32_watch->wake_handle = CreateEventW(NULL, FALSE, FALSE, NULL); + scratch_end(scratch); struct sys_watch watch = ZI; @@ -739,6 +742,7 @@ void sys_watch_release(struct sys_watch *dw) { struct win32_watch *w32_watch = (struct win32_watch *)dw->handle; CloseHandle(w32_watch->dir_handle); + CloseHandle(w32_watch->wake_handle); struct sys_lock lock = sys_mutex_lock_e(&G.watches_mutex); { @@ -756,25 +760,33 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | - FILE_NOTIFY_CHANGE_ATTRIBUTES | - FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_LAST_WRITE | - FILE_NOTIFY_CHANGE_CREATION | - FILE_NOTIFY_CHANGE_SECURITY; + FILE_NOTIFY_CHANGE_CREATION; b32 done = false; while (!done) { - DWORD num_bytes_returned = 0; + OVERLAPPED ov = ZI; + ov.hEvent = CreateEventW(NULL, FALSE, FALSE, NULL); + ASSERT(ov.hEvent); + BOOL success = ReadDirectoryChangesW(w32_watch->dir_handle, w32_watch->results_buff, ARRAY_COUNT(w32_watch->results_buff), true, filter, - &num_bytes_returned, NULL, + &ov, NULL); + (UNUSED)success; + ASSERT(success); - if (success && num_bytes_returned > 0) { + HANDLE handles[] = { + ov.hEvent, + w32_watch->wake_handle + }; + DWORD wait_res = WaitForMultipleObjects(2, handles, FALSE, INFINITE); + + if (wait_res == WAIT_OBJECT_0) { i64 offset = 0; while (!done) { FILE_NOTIFY_INFORMATION *res = (FILE_NOTIFY_INFORMATION *)(w32_watch->results_buff + offset); @@ -824,12 +836,12 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch case FILE_ACTION_RENAMED_NEW_NAME: { - info->kind = SYS_WATCH_INFO_KIND_ADDED; + info->kind = SYS_WATCH_INFO_KIND_RENAMED_OLD; } break; default: { - info->kind = SYS_WATCH_INFO_KIND_UNKNOWN; + info->kind = SYS_WATCH_INFO_KIND_RENAMED_NEW; } break; } @@ -839,12 +851,22 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch offset += res->NextEntryOffset; } } + } else if (wait_res == WAIT_OBJECT_0 + 1) { + done = true; + } else { + ASSERT(false); } } return list; } +void sys_watch_wake(struct sys_watch *dw) +{ + struct win32_watch *w32_watch = (struct win32_watch *)dw->handle; + SetEvent(w32_watch->wake_handle); +} + struct sys_watch_info_list sys_watch_info_copy(struct arena *arena, struct sys_watch_info_list src_list) { struct sys_watch_info_list dst_list = ZI; @@ -1970,6 +1992,26 @@ void sys_thread_wait_release(struct sys_thread *thread) win32_thread_release(t); } +void sys_thread_force_release(struct sys_thread *thread) +{ + __prof; + struct win32_thread *t = (struct win32_thread *)thread->handle; + HANDLE handle = t->handle; + + /* Wait for thread to stop */ + if (handle) { + DWORD res = WaitForSingleObject(handle, INFINITE); + TerminateThread(handle, 0); + CloseHandle(handle); + + ASSERT(res != WAIT_FAILED); + (UNUSED)res; + } + + /* Release thread struct */ + win32_thread_release(t); +} + u32 sys_thread_id(void) { return GetCurrentThreadId(); @@ -2388,21 +2430,38 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, G.panic_event }; DWORD res = WaitForMultipleObjects(ARRAY_COUNT(wait_handles), wait_handles, false, INFINITE); + if (res == WAIT_OBJECT_0) { + sys_thread_force_release(&app_thread); + } ASSERT(res != WAIT_FAILED); (UNUSED)res; } } + /* Find any dangling threads that haven't exited gracefully by now */ + if (!atomic_i32_eval(&G.panicking)) { + struct sys_lock lock = sys_mutex_lock_s(&G.threads_mutex); + if (G.threads_first) { + struct temp_arena scratch = scratch_begin_no_conflict(); + u64 num_dangling_threads = 0; + struct string threads_msg = ZI; + threads_msg.text = arena_dry_push(scratch.arena, u8); + for (struct win32_thread *t = G.threads_first; t; t = t->next) { + struct string name = string_from_cstr(t->thread_name_cstr, ARRAY_COUNT(t->thread_name_cstr)); + threads_msg.len += string_format(scratch.arena, LIT(" \"%F\"\n"), FMT_STR(name)).len; + ++num_dangling_threads; + } + threads_msg = string_format(scratch.arena, LIT("%F dangling thread(s):\n%F"), FMT_UINT(num_dangling_threads), FMT_STR(threads_msg)); + sys_panic(threads_msg); + scratch_end(scratch); + } + sys_mutex_unlock(&lock); + } + /* Check if panicking */ if (atomic_i32_eval(&G.panicking)) { /* Wait for panic message to be ready */ WaitForSingleObject(G.panic_event, INFINITE); - /* Force stop threads */ - for (struct win32_thread *t = G.threads_last; t; t = t->prev) { - HANDLE handle = t->handle; - TerminateThread(handle, 0); - CloseHandle(handle); - } /* Set error and abort */ error_msg = G.panic_wstr; goto abort;