fix dangling resource watch monitor thread

This commit is contained in:
jacob 2025-05-13 09:25:48 -05:00
parent bca9c68022
commit c3d0869707
4 changed files with 157 additions and 83 deletions

View File

@ -26,11 +26,13 @@ GLOBAL struct {
struct sys_thread resource_watch_monitor_thread;
struct sys_thread resource_watch_dispatch_thread;
struct sys_watch watch;
struct atomic_i32 watch_shutdown;
struct sys_mutex watch_dispatcher_mutex;
struct arena watch_dispatcher_info_arena;
struct sys_watch_info_list watch_dispatcher_info_list;
struct sys_condition_variable watch_dispatcher_cv;
b32 watch_dispatcher_shutdown;
struct sys_mutex watch_callbacks_mutex;
resource_watch_callback *watch_callbacks[64];
@ -66,6 +68,7 @@ struct resource_startup_receipt resource_startup(void)
#endif
#if RESOURCE_RELOADING
G.watch = sys_watch_alloc(LIT("res"));
G.watch_callbacks_mutex = sys_mutex_alloc();
G.watch_dispatcher_mutex = sys_mutex_alloc();
@ -147,14 +150,13 @@ b32 resource_exists(struct string path)
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(resource_shutdown)
{
__prof;
/* Wait for dispatcher thread to finish before shutting down */
struct sys_lock lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex);
{
G.watch_dispatcher_shutdown = true;
}
sys_mutex_unlock(&lock);
atomic_i32_eval_exchange(&G.watch_shutdown, 1);
sys_condition_variable_broadcast(&G.watch_dispatcher_cv);
sys_watch_wake(&G.watch);
sys_thread_wait_release(&G.resource_watch_dispatch_thread);
sys_thread_wait_release(&G.resource_watch_monitor_thread);
}
void resource_register_watch_callback(resource_watch_callback *callback)
@ -174,13 +176,11 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_monitor_thread_entry_poi
{
(UNUSED)_;
struct temp_arena scratch = scratch_begin_no_conflict();
struct sys_watch watch = sys_watch_alloc(LIT("res"));
/* NOTE: We let OS force-shutdown this thread */
volatile i32 run = true;
while (run) {
while (!atomic_i32_eval(&G.watch_shutdown)) {
struct temp_arena temp = arena_temp_begin(scratch.arena);
struct sys_watch_info_list res = sys_watch_wait(temp.arena, &watch);
struct sys_watch_info_list res = sys_watch_wait(temp.arena, &G.watch);
if (res.first && !atomic_i32_eval(&G.watch_shutdown)) {
struct sys_lock lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex);
{
struct sys_watch_info_list list_part = sys_watch_info_copy(&G.watch_dispatcher_info_arena, res);
@ -194,10 +194,10 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_monitor_thread_entry_poi
}
sys_mutex_unlock(&lock);
sys_condition_variable_broadcast(&G.watch_dispatcher_cv);
}
arena_temp_end(temp);
}
sys_watch_release(&watch);
scratch_end(scratch);
}
@ -215,15 +215,16 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_
struct temp_arena scratch = scratch_begin_no_conflict();
struct sys_lock watch_dispatcher_lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex);
while (!G.watch_dispatcher_shutdown) {
if (G.watch_dispatcher_info_arena.pos > 0) {
while (!atomic_i32_eval(&G.watch_shutdown)) {
sys_condition_variable_wait(&G.watch_dispatcher_cv, &watch_dispatcher_lock);
if (!atomic_i32_eval(&G.watch_shutdown) && G.watch_dispatcher_info_arena.pos > 0) {
/* Unlock and sleep a bit so duplicate events pile up */
{
sys_mutex_unlock(&watch_dispatcher_lock);
sys_sleep(WATCH_DISPATCHER_DELAY_SECONDS);
watch_dispatcher_lock = sys_mutex_lock_e(&G.watch_dispatcher_mutex);
}
if (!G.watch_dispatcher_shutdown) {
if (!atomic_i32_eval(&G.watch_shutdown)) {
struct temp_arena temp = arena_temp_begin(scratch.arena);
/* Pull watch info from queue */
@ -234,6 +235,7 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_
/* Unlock and run callbacks */
sys_mutex_unlock(&watch_dispatcher_lock);
{
__profscope(run_resource_watch_callbacks);
struct fixed_dict dedup_dict = fixed_dict_init(temp.arena, WATCH_DISPATCHER_DEDUP_DICT_BINS);
for (struct sys_watch_info *info = watch_info_list.first; info; info = info->next) {
/* Do not run callbacks for the same file more than once */
@ -258,10 +260,6 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(resource_watch_dispatcher_thread_entry_
arena_temp_end(temp);
}
}
if (!G.watch_dispatcher_shutdown) {
sys_condition_variable_wait(&G.watch_dispatcher_cv, &watch_dispatcher_lock);
}
}
scratch_end(scratch);

View File

@ -58,6 +58,7 @@ enum cache_node_kind {
enum cache_node_state {
CACHE_NODE_STATE_NONE,
CACHE_NODE_STATE_QUEUEING,
CACHE_NODE_STATE_QUEUED,
CACHE_NODE_STATE_WORKING,
CACHE_NODE_STATE_LOADED
@ -76,7 +77,7 @@ struct cache_node_hash {
struct cache_node {
enum cache_node_kind kind;
struct cache_node_hash hash;
struct atomic_u32 state;
struct atomic_i32 state;
struct atomic_u64 refcount_struct; /* Cast eval to `cache_node_refcount` */
/* Allocated data */
@ -85,6 +86,9 @@ struct cache_node {
struct sprite_texture *texture;
struct sprite_sheet *sheet;
/* Work */
struct work_handle work;
/* Hash list */
struct cache_node *next_in_bin;
struct cache_node *prev_in_bin;
@ -350,7 +354,7 @@ INTERNAL void cache_node_load_texture(struct cache_node *n, struct sprite_tag ta
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING);
atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING);
struct string path = tag.path;
logf_info("Loading sprite texture [%F] \"%F\"", FMT_HEX(n->hash.v), FMT_STR(path));
@ -395,7 +399,7 @@ INTERNAL void cache_node_load_texture(struct cache_node *n, struct sprite_tag ta
FMT_FLOAT(elapsed),
FMT_UINT(n->memory_usage));
atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED);
atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED);
scratch_end(scratch);
}
@ -649,7 +653,7 @@ INTERNAL void cache_node_load_sheet(struct cache_node *n, struct sprite_tag tag)
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING);
atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_WORKING);
struct string path = tag.path;
logf_info("Loading sprite sheet [%F] \"%F\"", FMT_HEX(n->hash.v), FMT_STR(path));
@ -688,7 +692,7 @@ INTERNAL void cache_node_load_sheet(struct cache_node *n, struct sprite_tag tag)
FMT_UINT(n->memory_usage));
atomic_u32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED);
atomic_i32_eval_exchange(&n->state, CACHE_NODE_STATE_LOADED);
scratch_end(scratch);
}
@ -884,7 +888,7 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_
struct cache_node *n = node_lookup_touch(scope, tag, kind);
u32 state = atomic_u32_eval(&n->state);
enum cache_node_state state = atomic_i32_eval(&n->state);
if (state == CACHE_NODE_STATE_LOADED) {
switch (kind) {
case CACHE_NODE_KIND_TEXTURE: { res = n->texture; } break;
@ -892,7 +896,7 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_
default: { sys_panic(LIT("Unknown sprite cache node kind")); } break;
}
} else if (state == CACHE_NODE_STATE_NONE) {
if (atomic_u32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUED) == CACHE_NODE_STATE_NONE) {
if (atomic_i32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_NONE, CACHE_NODE_STATE_QUEUEING) == CACHE_NODE_STATE_NONE) {
/* Node is new, load texture */
if (await) {
switch (kind) {
@ -907,10 +911,10 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_
default: { sys_panic(LIT("Unknown sprite cache node kind")); } break;
}
} else {
struct sys_lock lock = sys_mutex_lock_e(&G.load_cmds_mutex);
/* Allocate cmd */
struct load_cmd *cmd = NULL;
struct sys_lock lock = sys_mutex_lock_e(&G.load_cmds_mutex);
{
if (G.first_free_load_cmd) {
cmd = G.first_free_load_cmd;
G.first_free_load_cmd = cmd->next_free;
@ -930,16 +934,30 @@ INTERNAL void *data_from_tag_internal(struct sprite_scope *scope, struct sprite_
/* Cmd holds reference to node */
node_refcount_add(n, 1);
}
sys_mutex_unlock(&lock);
/* Push work */
work_push_task(&sprite_load_task, cmd, WORK_PRIORITY_NORMAL);
sys_mutex_unlock(&lock);
n->work = work_push_task(&sprite_load_task, cmd, WORK_PRIORITY_NORMAL);
atomic_i32_eval_compare_exchange(&n->state, CACHE_NODE_STATE_QUEUEING, CACHE_NODE_STATE_QUEUED);
}
}
}
/* FIXME: Spinlock until resource loaded if async */
/* TODO: Spinlock */
if (await && state != CACHE_NODE_STATE_LOADED) {
while (true) {
state = atomic_i32_eval(&n->state);
if (state == CACHE_NODE_STATE_LOADED) {
break;
} else if (state >= CACHE_NODE_STATE_QUEUED) {
work_wait(n->work);
} else {
/* Spinlock until work is ready to be waited on or sprite finishes loading */
ix_pause();
}
}
}
return res;
}
@ -1124,10 +1142,6 @@ INTERNAL SORT_COMPARE_FUNC_DEF(evict_sort, arg_a, arg_b, udata)
i32 res = (cycle_b > cycle_a) - (cycle_a > cycle_b);
res += ((a->force_evict > b->force_evict) - (a->force_evict < b->force_evict)) * 2;
if (a->force_evict != b->force_evict) {
DEBUGBREAKABLE;
}
return res;
}

View File

@ -264,7 +264,9 @@ enum sys_watch_info_kind {
SYS_WATCH_INFO_KIND_UNKNOWN,
SYS_WATCH_INFO_KIND_ADDED,
SYS_WATCH_INFO_KIND_REMOVED,
SYS_WATCH_INFO_KIND_MODIFIED
SYS_WATCH_INFO_KIND_MODIFIED,
SYS_WATCH_INFO_KIND_RENAMED_OLD,
SYS_WATCH_INFO_KIND_RENAMED_NEW
};
@ -287,6 +289,7 @@ struct sys_watch_info_list {
struct sys_watch sys_watch_alloc(struct string path);
void sys_watch_release(struct sys_watch *dw);
struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch *dw);
void sys_watch_wake(struct sys_watch *dw);
struct sys_watch_info_list sys_watch_info_copy(struct arena *arena, struct sys_watch_info_list src);
/* ========================== *
@ -438,8 +441,8 @@ struct sys_thread sys_thread_alloc(
struct string thread_name
);
/* Halt and wait for a thread to finish */
void sys_thread_wait_release(struct sys_thread *thread);
void sys_thread_force_release(struct sys_thread *thread);
/* Gets the current executing thread's ID */
u32 sys_thread_id(void);

View File

@ -674,6 +674,7 @@ void sys_file_filter_end(struct sys_file_filter *filter)
struct win32_watch {
HANDLE dir_handle;
HANDLE wake_handle;
struct win32_watch *next_free;
u64 dir_path_len;
@ -724,10 +725,12 @@ struct sys_watch sys_watch_alloc(struct string dir_path)
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
NULL
);
w32_watch->wake_handle = CreateEventW(NULL, FALSE, FALSE, NULL);
scratch_end(scratch);
struct sys_watch watch = ZI;
@ -739,6 +742,7 @@ void sys_watch_release(struct sys_watch *dw)
{
struct win32_watch *w32_watch = (struct win32_watch *)dw->handle;
CloseHandle(w32_watch->dir_handle);
CloseHandle(w32_watch->wake_handle);
struct sys_lock lock = sys_mutex_lock_e(&G.watches_mutex);
{
@ -756,25 +760,33 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch
DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME |
FILE_NOTIFY_CHANGE_DIR_NAME |
FILE_NOTIFY_CHANGE_ATTRIBUTES |
FILE_NOTIFY_CHANGE_SIZE |
FILE_NOTIFY_CHANGE_LAST_WRITE |
FILE_NOTIFY_CHANGE_CREATION |
FILE_NOTIFY_CHANGE_SECURITY;
FILE_NOTIFY_CHANGE_CREATION;
b32 done = false;
while (!done) {
DWORD num_bytes_returned = 0;
OVERLAPPED ov = ZI;
ov.hEvent = CreateEventW(NULL, FALSE, FALSE, NULL);
ASSERT(ov.hEvent);
BOOL success = ReadDirectoryChangesW(w32_watch->dir_handle,
w32_watch->results_buff,
ARRAY_COUNT(w32_watch->results_buff),
true,
filter,
&num_bytes_returned,
NULL,
&ov,
NULL);
(UNUSED)success;
ASSERT(success);
if (success && num_bytes_returned > 0) {
HANDLE handles[] = {
ov.hEvent,
w32_watch->wake_handle
};
DWORD wait_res = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
if (wait_res == WAIT_OBJECT_0) {
i64 offset = 0;
while (!done) {
FILE_NOTIFY_INFORMATION *res = (FILE_NOTIFY_INFORMATION *)(w32_watch->results_buff + offset);
@ -824,12 +836,12 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch
case FILE_ACTION_RENAMED_NEW_NAME:
{
info->kind = SYS_WATCH_INFO_KIND_ADDED;
info->kind = SYS_WATCH_INFO_KIND_RENAMED_OLD;
} break;
default:
{
info->kind = SYS_WATCH_INFO_KIND_UNKNOWN;
info->kind = SYS_WATCH_INFO_KIND_RENAMED_NEW;
} break;
}
@ -839,12 +851,22 @@ struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch
offset += res->NextEntryOffset;
}
}
} else if (wait_res == WAIT_OBJECT_0 + 1) {
done = true;
} else {
ASSERT(false);
}
}
return list;
}
void sys_watch_wake(struct sys_watch *dw)
{
struct win32_watch *w32_watch = (struct win32_watch *)dw->handle;
SetEvent(w32_watch->wake_handle);
}
struct sys_watch_info_list sys_watch_info_copy(struct arena *arena, struct sys_watch_info_list src_list)
{
struct sys_watch_info_list dst_list = ZI;
@ -1970,6 +1992,26 @@ void sys_thread_wait_release(struct sys_thread *thread)
win32_thread_release(t);
}
void sys_thread_force_release(struct sys_thread *thread)
{
__prof;
struct win32_thread *t = (struct win32_thread *)thread->handle;
HANDLE handle = t->handle;
/* Wait for thread to stop */
if (handle) {
DWORD res = WaitForSingleObject(handle, INFINITE);
TerminateThread(handle, 0);
CloseHandle(handle);
ASSERT(res != WAIT_FAILED);
(UNUSED)res;
}
/* Release thread struct */
win32_thread_release(t);
}
u32 sys_thread_id(void)
{
return GetCurrentThreadId();
@ -2388,21 +2430,38 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
G.panic_event
};
DWORD res = WaitForMultipleObjects(ARRAY_COUNT(wait_handles), wait_handles, false, INFINITE);
if (res == WAIT_OBJECT_0) {
sys_thread_force_release(&app_thread);
}
ASSERT(res != WAIT_FAILED);
(UNUSED)res;
}
}
/* Find any dangling threads that haven't exited gracefully by now */
if (!atomic_i32_eval(&G.panicking)) {
struct sys_lock lock = sys_mutex_lock_s(&G.threads_mutex);
if (G.threads_first) {
struct temp_arena scratch = scratch_begin_no_conflict();
u64 num_dangling_threads = 0;
struct string threads_msg = ZI;
threads_msg.text = arena_dry_push(scratch.arena, u8);
for (struct win32_thread *t = G.threads_first; t; t = t->next) {
struct string name = string_from_cstr(t->thread_name_cstr, ARRAY_COUNT(t->thread_name_cstr));
threads_msg.len += string_format(scratch.arena, LIT(" \"%F\"\n"), FMT_STR(name)).len;
++num_dangling_threads;
}
threads_msg = string_format(scratch.arena, LIT("%F dangling thread(s):\n%F"), FMT_UINT(num_dangling_threads), FMT_STR(threads_msg));
sys_panic(threads_msg);
scratch_end(scratch);
}
sys_mutex_unlock(&lock);
}
/* Check if panicking */
if (atomic_i32_eval(&G.panicking)) {
/* Wait for panic message to be ready */
WaitForSingleObject(G.panic_event, INFINITE);
/* Force stop threads */
for (struct win32_thread *t = G.threads_last; t; t = t->prev) {
HANDLE handle = t->handle;
TerminateThread(handle, 0);
CloseHandle(handle);
}
/* Set error and abort */
error_msg = G.panic_wstr;
goto abort;