From adcff577ce817e7d6ce869d561bd560b5741e09c Mon Sep 17 00:00:00 2001 From: jacob Date: Wed, 30 Jul 2025 23:52:22 -0500 Subject: [PATCH] watch layer refactor --- build.c | 2 +- src/app/app_core.c | 2 +- src/gp/gp_core_dx12.c | 8 +- src/platform/platform_win32.c | 3 - src/playback/playback_win32.c | 1 - src/sprite/sprite_core.c | 7 +- src/user/user_core.c | 3 - src/watch/watch_core.c | 247 ++++++++++++++++------------------ src/watch/watch_core.h | 73 ++++++++-- 9 files changed, 191 insertions(+), 155 deletions(-) diff --git a/build.c b/build.c index 9e3e3b12..ca9a09fe 100644 --- a/build.c +++ b/build.c @@ -510,7 +510,7 @@ void OnBuild(StringList cli_args) String warnings = Lit("/WX /Wall " "/options:strict " - "/wd4820 /wd4201 /wd5220 /wd4514 /wd4244 /wd5045 /wd4242 /wd4061 /wd4189 /wd4723 /wd5246 /wd4324 /wd4464 /wd4577"); + "/wd4820 /wd4201 /wd5220 /wd4514 /wd4244 /wd5045 /wd4242 /wd4061 /wd4189 /wd4723 /wd5246 /wd4324 /wd4464 /wd4577 /wd4100"); StringListAppend(&perm, &compile_warnings, warnings); StringListAppend(&perm, &link_warnings, Lit("/WX")); diff --git a/src/app/app_core.c b/src/app/app_core.c index 2c5147f4..04db0524 100644 --- a/src/app/app_core.c +++ b/src/app/app_core.c @@ -234,7 +234,7 @@ void P_AppStartup(String args_str) /* Global systems */ R_Startup(); - watch_startup(); + W_Startup(); gp_startup(); /* Subsystems */ diff --git a/src/gp/gp_core_dx12.c b/src/gp/gp_core_dx12.c index b86fa37f..31c3992e 100644 --- a/src/gp/gp_core_dx12.c +++ b/src/gp/gp_core_dx12.c @@ -303,7 +303,7 @@ struct dx12_upload_job_sig { struct dx12_resource *resource; void *data; }; internal P_JobDef(dx12_upload_job, job); #if RESOURCE_RELOADING -internal WATCH_CALLBACK_FUNC_DEF(pipeline_watch_callback, name); +internal W_CallbackFuncDef(pipeline_watch_callback, name); #endif /* ========================== * @@ -430,7 +430,7 @@ void gp_startup(void) /* Register callbacks */ #if RESOURCE_RELOADING - watch_register_callback(pipeline_watch_callback); + W_RegisterCallback(pipeline_watch_callback); #endif P_OnExit(gp_shutdown); @@ -1312,7 +1312,7 @@ internal void pipeline_register(u64 num_pipelines, struct pipeline **pipelines) } #if RESOURCE_RELOADING -internal WATCH_CALLBACK_FUNC_DEF(pipeline_watch_callback, name) +internal W_CallbackFuncDef(pipeline_watch_callback, name) { __prof; TempArena scratch = BeginScratchNoConflict(); @@ -3495,8 +3495,6 @@ void gp_present(G_Swapchain *gp_swapchain, Vec2I32 backbuffer_resolution, G_Reso internal P_JobDef(dx12_evictor_job, _) { - (UNUSED)_; - u64 completed_targets[DX12_NUM_QUEUES] = ZI; b32 shutdown = 0; diff --git a/src/platform/platform_win32.c b/src/platform/platform_win32.c index 87b02894..fe2a3e4c 100644 --- a/src/platform/platform_win32.c +++ b/src/platform/platform_win32.c @@ -1104,7 +1104,6 @@ P_W32_ThreadDef(P_W32_JobSchedulerEntryFunc, _) { struct P_W32_SharedCtx *g = &P_W32_shared_ctx; - (UNUSED)_; { i32 priority = THREAD_PRIORITY_TIME_CRITICAL; b32 success = SetThreadPriority(GetCurrentThread(), priority); @@ -3349,7 +3348,6 @@ void P_W32_InitBtnTable(void) P_JobDef(P_W32_AppStartupJob, _) { - (UNUSED)_; P_W32_SharedCtx *g = &P_W32_shared_ctx; TempArena scratch = BeginScratchNoConflict(); { @@ -3364,7 +3362,6 @@ P_JobDef(P_W32_AppShutdownJob, _) { __prof; P_W32_SharedCtx *g = &P_W32_shared_ctx; - (UNUSED)_; i32 num_funcs = Atomic32Fetch(&g->num_exit_funcs); for (i32 i = num_funcs - 1; i >= 0; --i) { diff --git a/src/playback/playback_win32.c b/src/playback/playback_win32.c index 65a523a0..8b20f344 100644 --- a/src/playback/playback_win32.c +++ b/src/playback/playback_win32.c @@ -188,7 +188,6 @@ P_JobDef(PB_WSP_PlaybackJob, _) { __prof; PB_WSP_SharedState *g = &PB_WSP_shared_state; - (UNUSED)_; /* FIXME: If playback fails at any point and mixer stops advancing, we * need to halt mixer to prevent memory leak when sounds are played. */ diff --git a/src/sprite/sprite_core.c b/src/sprite/sprite_core.c index 9bfb2be0..3d632606 100644 --- a/src/sprite/sprite_core.c +++ b/src/sprite/sprite_core.c @@ -187,7 +187,7 @@ internal P_JobDef(sprite_load_job, arg); internal P_JobDef(sprite_evictor_job, _); #if RESOURCE_RELOADING -internal WATCH_CALLBACK_FUNC_DEF(sprite_watch_callback, info); +internal W_CallbackFuncDef(sprite_watch_callback, info); #endif S_StartupReceipt sprite_startup(void) @@ -233,7 +233,7 @@ S_StartupReceipt sprite_startup(void) P_Run(1, sprite_evictor_job, 0, P_Pool_Background, P_Priority_Low, &G.shutdown_counter); P_OnExit(&sprite_shutdown); - watch_register_callback(&sprite_watch_callback); + W_RegisterCallback(&sprite_watch_callback); return (S_StartupReceipt) { 0 }; } @@ -1157,7 +1157,7 @@ internal void reload_if_exists(S_Scope *scope, S_Tag tag, enum cache_entry_kind } } -internal WATCH_CALLBACK_FUNC_DEF(sprite_watch_callback, name) +internal W_CallbackFuncDef(sprite_watch_callback, name) { S_Scope *scope = sprite_scope_begin(); @@ -1210,7 +1210,6 @@ internal MergesortCompareFuncDef(evict_sort, arg_a, arg_b, udata) */ internal P_JobDef(sprite_evictor_job, _) { - (UNUSED)_; b32 shutdown = 0; while (!shutdown) { { diff --git a/src/user/user_core.c b/src/user/user_core.c index fddae6bb..2b44ea4b 100644 --- a/src/user/user_core.c +++ b/src/user/user_core.c @@ -2081,7 +2081,6 @@ internal void user_update(P_Window *window) internal P_JobDef(user_update_job, _) { - (UNUSED)_; i64 time_ns = P_TimeNs(); while (!Atomic32Fetch(&G.shutdown)) { P_Window *window = G.window; @@ -2179,8 +2178,6 @@ struct sim_decode_queue { internal P_JobDef(local_sim_job, _) { - (UNUSED)_; - #if 0 struct host_listen_address local_listen_addr = host_listen_address_from_local_name(Lit("LOCAL_SIM")); struct host_listen_address net_listen_addr = host_listen_address_from_net_port(12345); diff --git a/src/watch/watch_core.c b/src/watch/watch_core.c index 68faeed7..cb0d60be 100644 --- a/src/watch/watch_core.c +++ b/src/watch/watch_core.c @@ -1,114 +1,117 @@ -#if RESOURCE_RELOADING +W_SharedState W_shared_state = ZI; -struct watch_event { - String name; - struct watch_event *next; -}; +//////////////////////////////// +//~ Startup -Global struct { - P_Watch *watch; - Atomic32 watch_shutdown; - P_Counter watch_jobs_counter; - - P_Mutex watch_dispatcher_mutex; - Arena *watch_events_arena; - struct watch_event *first_watch_event; - struct watch_event *last_watch_event; - P_Cv watch_dispatcher_cv; - - P_Mutex watch_callbacks_mutex; - watch_callback *watch_callbacks[64]; - u64 num_watch_callbacks; -} G = ZI, DebugAlias(G, G_watch); - -/* ========================== * - * Startup - * ========================== */ - -internal P_JobDef(watch_monitor_job, _); -internal P_JobDef(watch_dispatcher_job, _); -internal P_ExitFuncDef(watch_shutdown); - -void watch_startup(void) +void W_Startup(void) { - G.watch = P_AllocWatch(Lit("./")); + W_SharedState *g = &W_shared_state; + g->watch = P_AllocWatch(Lit("./")); - G.watch_events_arena = AllocArena(Gibi(64)); + g->watch_events_arena = AllocArena(Gibi(64)); - P_Run(1, watch_monitor_job, 0, P_Pool_Floating, P_Priority_Low, &G.watch_jobs_counter); - P_Run(1, watch_dispatcher_job, 0, P_Pool_Background, P_Priority_Low, &G.watch_jobs_counter); - P_OnExit(&watch_shutdown); + P_Run(1, W_MonitorJob, 0, P_Pool_Floating, P_Priority_Low, &g->watch_jobs_counter); + P_Run(1, W_DispatcherJob, 0, P_Pool_Background, P_Priority_Low, &g->watch_jobs_counter); + P_OnExit(&W_Shutdown); } -/* ========================== * - * Watch - * ========================== */ - -internal P_ExitFuncDef(watch_shutdown) +P_ExitFuncDef(W_Shutdown) { __prof; - Atomic32FetchSet(&G.watch_shutdown, 1); - + W_SharedState *g = &W_shared_state; + Atomic32FetchSet(&g->W_Shutdown, 1); { - P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); - P_SignalCv(&G.watch_dispatcher_cv, I32Max); - P_WakeWatch(G.watch); + P_Lock lock = P_LockE(&g->watch_dispatcher_mutex); + P_SignalCv(&g->watch_dispatcher_cv, I32Max); + P_WakeWatch(g->watch); P_Unlock(&lock); } - P_WaitOnCounter(&G.watch_jobs_counter); + P_WaitOnCounter(&g->watch_jobs_counter); } -void watch_register_callback(watch_callback *callback) +//////////////////////////////// +//~ Callback + +void W_RegisterCallback(W_CallbackFunc *callback) { - P_Lock lock = P_LockE(&G.watch_callbacks_mutex); + W_SharedState *g = &W_shared_state; + P_Lock lock = P_LockE(&g->watch_callbacks_mutex); { - if (G.num_watch_callbacks < countof(G.watch_callbacks)) { - G.watch_callbacks[G.num_watch_callbacks++] = callback; - } else { + if (g->num_watch_callbacks < countof(g->watch_callbacks)) + { + g->watch_callbacks[g->num_watch_callbacks++] = callback; + } + else + { P_Panic(Lit("Max resource watch callbacks reached")); } } P_Unlock(&lock); } -internal P_JobDef(watch_monitor_job, _) +P_JobDef(W_RunCallbackJob, job) +{ + __prof; + W_RunCallbackJobSig *sig = job.sig; + String name = sig->name; + W_CallbackFunc *callback = sig->callbacks[job.id]; + callback(name); +} + +//////////////////////////////// +//~ Monitor job + +/* NOTE: We separate the responsibilities of monitoring directory changes + * & dispatching watch callbacks into two separate jobs so that we can delay + * the dispatch, allowing for deduplication of file modification notifications. */ + +P_JobDef(W_MonitorJob, _) { - (UNUSED)_; TempArena scratch = BeginScratchNoConflict(); + W_SharedState *g = &W_shared_state; String ignored[] = { Lit(".vs"), Lit(".git") }; - while (!Atomic32Fetch(&G.watch_shutdown)) { + while (!Atomic32Fetch(&g->W_Shutdown)) + { TempArena temp = BeginTempArena(scratch.arena); - P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, G.watch); - if (info_list.first && !Atomic32Fetch(&G.watch_shutdown)) { - P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); + P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, g->watch); + if (info_list.first && !Atomic32Fetch(&g->W_Shutdown)) + { + P_Lock lock = P_LockE(&g->watch_dispatcher_mutex); { - for (P_WatchInfo *info = info_list.first; info; info = info->next) { + for (P_WatchInfo *info = info_list.first; info; info = info->next) + { String name_src = info->name; b32 ignore = 0; - for (u32 i = 0; i < countof(ignored); ++i) { - if (StringStartsWith(name_src, ignored[i])) { + for (u32 i = 0; i < countof(ignored); ++i) + { + if (StringStartsWith(name_src, ignored[i])) + { ignore = 1; break; } } - if (!ignore) { - struct watch_event *e = PushStruct(G.watch_events_arena, struct watch_event); - e->name = CopyString(G.watch_events_arena, name_src); - if (G.last_watch_event) { - G.last_watch_event->next = e; - } else { - G.first_watch_event = e; + if (!ignore) + { + W_Event *e = PushStruct(g->watch_events_arena, W_Event); + e->name = CopyString(g->watch_events_arena, name_src); + if (g->last_watch_event) + { + g->last_watch_event->next = e; } - G.last_watch_event = e; + else + { + g->first_watch_event = e; + } + g->last_watch_event = e; } } } - P_SignalCv(&G.watch_dispatcher_cv, I32Max); + P_SignalCv(&g->watch_dispatcher_cv, I32Max); P_Unlock(&lock); } EndTempArena(temp); @@ -117,95 +120,88 @@ internal P_JobDef(watch_monitor_job, _) EndScratch(scratch); } -/* NOTE: We separate the responsibilities of monitoring directory changes - * & dispatching watch callbacks into two separate jobs so that we can delay - * the dispatch, allowing for deduplication of file modification notifications. */ +//////////////////////////////// +//~ Dispatcher job -#define WATCH_DISPATCHER_DELAY_SECONDS 0.050 -#define WATCH_DISPATCHER_DEDUP_DICT_BINS 128 - -struct watch_callback_job_sig { - String name; - watch_callback **callbacks; -}; - -internal P_JobDef(watch_callback_job, job) +P_JobDef(W_DispatcherJob, _) { - __prof; - struct watch_callback_job_sig *sig = job.sig; - String name = sig->name; - watch_callback *callback = sig->callbacks[job.id]; - callback(name); -} - -internal P_JobDef(watch_dispatcher_job, _) -{ - (UNUSED)_; + W_SharedState *g = &W_shared_state; b32 shutdown = 0; - while (!shutdown) { + while (!shutdown) + { { TempArena scratch = BeginScratchNoConflict(); - struct watch_event *first_watch_event = 0; - struct watch_event *last_watch_event = 0; + W_Event *first_watch_event = 0; + W_Event *last_watch_event = 0; /* Delay so that duplicate events pile up */ { __profn("Delay"); - P_Wait(0, 0, 0, NsFromSeconds(WATCH_DISPATCHER_DELAY_SECONDS)); + P_Wait(0, 0, 0, NsFromSeconds(W_DispatcherDelaySeconds)); } /* Pull watch events from queue */ { - P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); - for (struct watch_event *src_event = G.first_watch_event; src_event; src_event = src_event->next) { - struct watch_event *e = PushStruct(scratch.arena, struct watch_event); + P_Lock lock = P_LockE(&g->watch_dispatcher_mutex); + for (W_Event *src_event = g->first_watch_event; src_event; src_event = src_event->next) + { + W_Event *e = PushStruct(scratch.arena, W_Event); e->name = CopyString(scratch.arena, src_event->name); - if (last_watch_event) { + if (last_watch_event) + { last_watch_event->next = e; - } else { + } + else + { first_watch_event = e; } last_watch_event = e; } - G.first_watch_event = 0; - G.last_watch_event = 0; - ResetArena(G.watch_events_arena); + g->first_watch_event = 0; + g->last_watch_event = 0; + ResetArena(g->watch_events_arena); P_Unlock(&lock); } /* Build callbacks array */ u64 num_callbacks = 0; - watch_callback **callbacks = 0; - P_Lock callbacks_lock = P_LockS(&G.watch_callbacks_mutex); + W_CallbackFunc **callbacks = 0; + P_Lock callbacks_lock = P_LockS(&g->watch_callbacks_mutex); { - num_callbacks = G.num_watch_callbacks; - callbacks = PushStructsNoZero(scratch.arena, watch_callback *, num_callbacks); - for (u64 i = 0; i < num_callbacks; ++i) { - callbacks[i] = G.watch_callbacks[i]; + num_callbacks = g->num_watch_callbacks; + callbacks = PushStructsNoZero(scratch.arena, W_CallbackFunc *, num_callbacks); + for (u64 i = 0; i < num_callbacks; ++i) + { + callbacks[i] = g->watch_callbacks[i]; } } P_Unlock(&callbacks_lock); /* Run callbacks */ { - Dict *dedup_dict = InitDict(scratch.arena, WATCH_DISPATCHER_DEDUP_DICT_BINS); - for (struct watch_event *e = first_watch_event; e; e = e->next) { + Dict *dedup_dict = InitDict(scratch.arena, W_DispatcherDedupBins); + for (W_Event *e = first_watch_event; e; e = e->next) + { __profn("Dispatch"); /* Do not run callbacks for the same file more than once */ b32 skip = 0; u64 hash = HashFnv64(Fnv64Basis, e->name); - if (DictValueFromHash(dedup_dict, hash) == 1) { + if (DictValueFromHash(dedup_dict, hash) == 1) + { skip = 1; - } else { + } + else + { SetDictValue(scratch.arena, dedup_dict, hash, 1); } - if (!skip) { - struct watch_callback_job_sig sig = ZI; + if (!skip) + { + W_RunCallbackJobSig sig = ZI; sig.name = e->name; sig.callbacks = callbacks; P_Counter counter = ZI; - P_Run(num_callbacks, watch_callback_job, &sig, P_Pool_Background, P_Priority_Low, &counter); + P_Run(num_callbacks, W_RunCallbackJob, &sig, P_Pool_Background, P_Priority_Low, &counter); P_WaitOnCounter(&counter); } } @@ -215,22 +211,15 @@ internal P_JobDef(watch_dispatcher_job, _) } /* Wait for event */ - P_Lock lock = P_LockS(&G.watch_dispatcher_mutex); + P_Lock lock = P_LockS(&g->watch_dispatcher_mutex); { - shutdown = Atomic32Fetch(&G.watch_shutdown); - while (!shutdown && !G.first_watch_event) { - P_WaitOnCv(&G.watch_dispatcher_cv, &lock); - shutdown = Atomic32Fetch(&G.watch_shutdown); + shutdown = Atomic32Fetch(&g->W_Shutdown); + while (!shutdown && !g->first_watch_event) + { + P_WaitOnCv(&g->watch_dispatcher_cv, &lock); + shutdown = Atomic32Fetch(&g->W_Shutdown); } } P_Unlock(&lock); } } - -#else /* RESOURCE_RELOADING */ - -void watch_startup(void) -{ -} - -#endif /* RESOURCE_RELOADING */ diff --git a/src/watch/watch_core.h b/src/watch/watch_core.h index 993f6ff8..982b3aa0 100644 --- a/src/watch/watch_core.h +++ b/src/watch/watch_core.h @@ -1,10 +1,67 @@ -#define WATCH_CALLBACK_FUNC_DEF(func_name, arg_name) void func_name(String arg_name) -typedef WATCH_CALLBACK_FUNC_DEF(watch_callback, name); +//////////////////////////////// +//~ Callback types -void watch_startup(void); +#define W_CallbackFuncDef(func_name, arg_name) void func_name(String arg_name) +typedef W_CallbackFuncDef(W_CallbackFunc, name); -#if RESOURCE_RELOADING -void watch_register_callback(watch_callback *callback); -#else -#define watch_register_callback(callback) -#endif +Struct(W_RunCallbackJobSig) +{ + String name; + W_CallbackFunc **callbacks; +}; + +//////////////////////////////// +//~ Event types + +Struct(W_Event) +{ + String name; + W_Event *next; +}; + +//////////////////////////////// +//~ Shared state + +#define W_DispatcherDelaySeconds 0.050 +#define W_DispatcherDedupBins 128 + +Struct(W_SharedState) +{ + P_Watch *watch; + Atomic32 W_Shutdown; + P_Counter watch_jobs_counter; + + P_Mutex watch_dispatcher_mutex; + Arena *watch_events_arena; + W_Event *first_watch_event; + W_Event *last_watch_event; + P_Cv watch_dispatcher_cv; + + P_Mutex watch_callbacks_mutex; + W_CallbackFunc *watch_callbacks[64]; + u64 num_watch_callbacks; +}; + +extern W_SharedState W_shared_state; + +//////////////////////////////// +//~ Startup + +void W_Startup(void); +P_ExitFuncDef(W_Shutdown); + +//////////////////////////////// +//~ Watch operations + +void W_RegisterCallback(W_CallbackFunc *callback); + +//////////////////////////////// +//~ Callback job + +P_JobDef(W_RunCallbackJob, job); + +//////////////////////////////// +//~ Long running jobs + +P_JobDef(W_MonitorJob, job); +P_JobDef(W_DispatcherJob, job);