watch layer refactor

This commit is contained in:
jacob 2025-07-30 23:52:22 -05:00
parent 3f2abf5b3e
commit adcff577ce
9 changed files with 191 additions and 155 deletions

View File

@ -510,7 +510,7 @@ void OnBuild(StringList cli_args)
String warnings = Lit("/WX /Wall " String warnings = Lit("/WX /Wall "
"/options:strict " "/options:strict "
"/wd4820 /wd4201 /wd5220 /wd4514 /wd4244 /wd5045 /wd4242 /wd4061 /wd4189 /wd4723 /wd5246 /wd4324 /wd4464 /wd4577"); "/wd4820 /wd4201 /wd5220 /wd4514 /wd4244 /wd5045 /wd4242 /wd4061 /wd4189 /wd4723 /wd5246 /wd4324 /wd4464 /wd4577 /wd4100");
StringListAppend(&perm, &compile_warnings, warnings); StringListAppend(&perm, &compile_warnings, warnings);
StringListAppend(&perm, &link_warnings, Lit("/WX")); StringListAppend(&perm, &link_warnings, Lit("/WX"));

View File

@ -234,7 +234,7 @@ void P_AppStartup(String args_str)
/* Global systems */ /* Global systems */
R_Startup(); R_Startup();
watch_startup(); W_Startup();
gp_startup(); gp_startup();
/* Subsystems */ /* Subsystems */

View File

@ -303,7 +303,7 @@ struct dx12_upload_job_sig { struct dx12_resource *resource; void *data; };
internal P_JobDef(dx12_upload_job, job); internal P_JobDef(dx12_upload_job, job);
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
internal WATCH_CALLBACK_FUNC_DEF(pipeline_watch_callback, name); internal W_CallbackFuncDef(pipeline_watch_callback, name);
#endif #endif
/* ========================== * /* ========================== *
@ -430,7 +430,7 @@ void gp_startup(void)
/* Register callbacks */ /* Register callbacks */
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
watch_register_callback(pipeline_watch_callback); W_RegisterCallback(pipeline_watch_callback);
#endif #endif
P_OnExit(gp_shutdown); P_OnExit(gp_shutdown);
@ -1312,7 +1312,7 @@ internal void pipeline_register(u64 num_pipelines, struct pipeline **pipelines)
} }
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
internal WATCH_CALLBACK_FUNC_DEF(pipeline_watch_callback, name) internal W_CallbackFuncDef(pipeline_watch_callback, name)
{ {
__prof; __prof;
TempArena scratch = BeginScratchNoConflict(); TempArena scratch = BeginScratchNoConflict();
@ -3495,8 +3495,6 @@ void gp_present(G_Swapchain *gp_swapchain, Vec2I32 backbuffer_resolution, G_Reso
internal P_JobDef(dx12_evictor_job, _) internal P_JobDef(dx12_evictor_job, _)
{ {
(UNUSED)_;
u64 completed_targets[DX12_NUM_QUEUES] = ZI; u64 completed_targets[DX12_NUM_QUEUES] = ZI;
b32 shutdown = 0; b32 shutdown = 0;

View File

@ -1104,7 +1104,6 @@ P_W32_ThreadDef(P_W32_JobSchedulerEntryFunc, _)
{ {
struct P_W32_SharedCtx *g = &P_W32_shared_ctx; struct P_W32_SharedCtx *g = &P_W32_shared_ctx;
(UNUSED)_;
{ {
i32 priority = THREAD_PRIORITY_TIME_CRITICAL; i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
b32 success = SetThreadPriority(GetCurrentThread(), priority); b32 success = SetThreadPriority(GetCurrentThread(), priority);
@ -3349,7 +3348,6 @@ void P_W32_InitBtnTable(void)
P_JobDef(P_W32_AppStartupJob, _) P_JobDef(P_W32_AppStartupJob, _)
{ {
(UNUSED)_;
P_W32_SharedCtx *g = &P_W32_shared_ctx; P_W32_SharedCtx *g = &P_W32_shared_ctx;
TempArena scratch = BeginScratchNoConflict(); TempArena scratch = BeginScratchNoConflict();
{ {
@ -3364,7 +3362,6 @@ P_JobDef(P_W32_AppShutdownJob, _)
{ {
__prof; __prof;
P_W32_SharedCtx *g = &P_W32_shared_ctx; P_W32_SharedCtx *g = &P_W32_shared_ctx;
(UNUSED)_;
i32 num_funcs = Atomic32Fetch(&g->num_exit_funcs); i32 num_funcs = Atomic32Fetch(&g->num_exit_funcs);
for (i32 i = num_funcs - 1; i >= 0; --i) for (i32 i = num_funcs - 1; i >= 0; --i)
{ {

View File

@ -188,7 +188,6 @@ P_JobDef(PB_WSP_PlaybackJob, _)
{ {
__prof; __prof;
PB_WSP_SharedState *g = &PB_WSP_shared_state; PB_WSP_SharedState *g = &PB_WSP_shared_state;
(UNUSED)_;
/* FIXME: If playback fails at any point and mixer stops advancing, we /* FIXME: If playback fails at any point and mixer stops advancing, we
* need to halt mixer to prevent memory leak when sounds are played. */ * need to halt mixer to prevent memory leak when sounds are played. */

View File

@ -187,7 +187,7 @@ internal P_JobDef(sprite_load_job, arg);
internal P_JobDef(sprite_evictor_job, _); internal P_JobDef(sprite_evictor_job, _);
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
internal WATCH_CALLBACK_FUNC_DEF(sprite_watch_callback, info); internal W_CallbackFuncDef(sprite_watch_callback, info);
#endif #endif
S_StartupReceipt sprite_startup(void) S_StartupReceipt sprite_startup(void)
@ -233,7 +233,7 @@ S_StartupReceipt sprite_startup(void)
P_Run(1, sprite_evictor_job, 0, P_Pool_Background, P_Priority_Low, &G.shutdown_counter); P_Run(1, sprite_evictor_job, 0, P_Pool_Background, P_Priority_Low, &G.shutdown_counter);
P_OnExit(&sprite_shutdown); P_OnExit(&sprite_shutdown);
watch_register_callback(&sprite_watch_callback); W_RegisterCallback(&sprite_watch_callback);
return (S_StartupReceipt) { 0 }; return (S_StartupReceipt) { 0 };
} }
@ -1157,7 +1157,7 @@ internal void reload_if_exists(S_Scope *scope, S_Tag tag, enum cache_entry_kind
} }
} }
internal WATCH_CALLBACK_FUNC_DEF(sprite_watch_callback, name) internal W_CallbackFuncDef(sprite_watch_callback, name)
{ {
S_Scope *scope = sprite_scope_begin(); S_Scope *scope = sprite_scope_begin();
@ -1210,7 +1210,6 @@ internal MergesortCompareFuncDef(evict_sort, arg_a, arg_b, udata)
*/ */
internal P_JobDef(sprite_evictor_job, _) internal P_JobDef(sprite_evictor_job, _)
{ {
(UNUSED)_;
b32 shutdown = 0; b32 shutdown = 0;
while (!shutdown) { while (!shutdown) {
{ {

View File

@ -2081,7 +2081,6 @@ internal void user_update(P_Window *window)
internal P_JobDef(user_update_job, _) internal P_JobDef(user_update_job, _)
{ {
(UNUSED)_;
i64 time_ns = P_TimeNs(); i64 time_ns = P_TimeNs();
while (!Atomic32Fetch(&G.shutdown)) { while (!Atomic32Fetch(&G.shutdown)) {
P_Window *window = G.window; P_Window *window = G.window;
@ -2179,8 +2178,6 @@ struct sim_decode_queue {
internal P_JobDef(local_sim_job, _) internal P_JobDef(local_sim_job, _)
{ {
(UNUSED)_;
#if 0 #if 0
struct host_listen_address local_listen_addr = host_listen_address_from_local_name(Lit("LOCAL_SIM")); struct host_listen_address local_listen_addr = host_listen_address_from_local_name(Lit("LOCAL_SIM"));
struct host_listen_address net_listen_addr = host_listen_address_from_net_port(12345); struct host_listen_address net_listen_addr = host_listen_address_from_net_port(12345);

View File

@ -1,114 +1,117 @@
#if RESOURCE_RELOADING W_SharedState W_shared_state = ZI;
struct watch_event { ////////////////////////////////
String name; //~ Startup
struct watch_event *next;
};
Global struct { void W_Startup(void)
P_Watch *watch;
Atomic32 watch_shutdown;
P_Counter watch_jobs_counter;
P_Mutex watch_dispatcher_mutex;
Arena *watch_events_arena;
struct watch_event *first_watch_event;
struct watch_event *last_watch_event;
P_Cv watch_dispatcher_cv;
P_Mutex watch_callbacks_mutex;
watch_callback *watch_callbacks[64];
u64 num_watch_callbacks;
} G = ZI, DebugAlias(G, G_watch);
/* ========================== *
* Startup
* ========================== */
internal P_JobDef(watch_monitor_job, _);
internal P_JobDef(watch_dispatcher_job, _);
internal P_ExitFuncDef(watch_shutdown);
void watch_startup(void)
{ {
G.watch = P_AllocWatch(Lit("./")); W_SharedState *g = &W_shared_state;
g->watch = P_AllocWatch(Lit("./"));
G.watch_events_arena = AllocArena(Gibi(64)); g->watch_events_arena = AllocArena(Gibi(64));
P_Run(1, watch_monitor_job, 0, P_Pool_Floating, P_Priority_Low, &G.watch_jobs_counter); P_Run(1, W_MonitorJob, 0, P_Pool_Floating, P_Priority_Low, &g->watch_jobs_counter);
P_Run(1, watch_dispatcher_job, 0, P_Pool_Background, P_Priority_Low, &G.watch_jobs_counter); P_Run(1, W_DispatcherJob, 0, P_Pool_Background, P_Priority_Low, &g->watch_jobs_counter);
P_OnExit(&watch_shutdown); P_OnExit(&W_Shutdown);
} }
/* ========================== * P_ExitFuncDef(W_Shutdown)
* Watch
* ========================== */
internal P_ExitFuncDef(watch_shutdown)
{ {
__prof; __prof;
Atomic32FetchSet(&G.watch_shutdown, 1); W_SharedState *g = &W_shared_state;
Atomic32FetchSet(&g->W_Shutdown, 1);
{ {
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
P_SignalCv(&G.watch_dispatcher_cv, I32Max); P_SignalCv(&g->watch_dispatcher_cv, I32Max);
P_WakeWatch(G.watch); P_WakeWatch(g->watch);
P_Unlock(&lock); P_Unlock(&lock);
} }
P_WaitOnCounter(&G.watch_jobs_counter); P_WaitOnCounter(&g->watch_jobs_counter);
} }
void watch_register_callback(watch_callback *callback) ////////////////////////////////
//~ Callback
void W_RegisterCallback(W_CallbackFunc *callback)
{ {
P_Lock lock = P_LockE(&G.watch_callbacks_mutex); W_SharedState *g = &W_shared_state;
P_Lock lock = P_LockE(&g->watch_callbacks_mutex);
{ {
if (G.num_watch_callbacks < countof(G.watch_callbacks)) { if (g->num_watch_callbacks < countof(g->watch_callbacks))
G.watch_callbacks[G.num_watch_callbacks++] = callback; {
} else { g->watch_callbacks[g->num_watch_callbacks++] = callback;
}
else
{
P_Panic(Lit("Max resource watch callbacks reached")); P_Panic(Lit("Max resource watch callbacks reached"));
} }
} }
P_Unlock(&lock); P_Unlock(&lock);
} }
internal P_JobDef(watch_monitor_job, _) P_JobDef(W_RunCallbackJob, job)
{
__prof;
W_RunCallbackJobSig *sig = job.sig;
String name = sig->name;
W_CallbackFunc *callback = sig->callbacks[job.id];
callback(name);
}
////////////////////////////////
//~ Monitor job
/* NOTE: We separate the responsibilities of monitoring directory changes
* & dispatching watch callbacks into two separate jobs so that we can delay
* the dispatch, allowing for deduplication of file modification notifications. */
P_JobDef(W_MonitorJob, _)
{ {
(UNUSED)_;
TempArena scratch = BeginScratchNoConflict(); TempArena scratch = BeginScratchNoConflict();
W_SharedState *g = &W_shared_state;
String ignored[] = { String ignored[] = {
Lit(".vs"), Lit(".vs"),
Lit(".git") Lit(".git")
}; };
while (!Atomic32Fetch(&G.watch_shutdown)) { while (!Atomic32Fetch(&g->W_Shutdown))
{
TempArena temp = BeginTempArena(scratch.arena); TempArena temp = BeginTempArena(scratch.arena);
P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, G.watch); P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, g->watch);
if (info_list.first && !Atomic32Fetch(&G.watch_shutdown)) { if (info_list.first && !Atomic32Fetch(&g->W_Shutdown))
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); {
P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
{ {
for (P_WatchInfo *info = info_list.first; info; info = info->next) { for (P_WatchInfo *info = info_list.first; info; info = info->next)
{
String name_src = info->name; String name_src = info->name;
b32 ignore = 0; b32 ignore = 0;
for (u32 i = 0; i < countof(ignored); ++i) { for (u32 i = 0; i < countof(ignored); ++i)
if (StringStartsWith(name_src, ignored[i])) { {
if (StringStartsWith(name_src, ignored[i]))
{
ignore = 1; ignore = 1;
break; break;
} }
} }
if (!ignore) { if (!ignore)
struct watch_event *e = PushStruct(G.watch_events_arena, struct watch_event); {
e->name = CopyString(G.watch_events_arena, name_src); W_Event *e = PushStruct(g->watch_events_arena, W_Event);
if (G.last_watch_event) { e->name = CopyString(g->watch_events_arena, name_src);
G.last_watch_event->next = e; if (g->last_watch_event)
} else { {
G.first_watch_event = e; g->last_watch_event->next = e;
} }
G.last_watch_event = e; else
{
g->first_watch_event = e;
}
g->last_watch_event = e;
} }
} }
} }
P_SignalCv(&G.watch_dispatcher_cv, I32Max); P_SignalCv(&g->watch_dispatcher_cv, I32Max);
P_Unlock(&lock); P_Unlock(&lock);
} }
EndTempArena(temp); EndTempArena(temp);
@ -117,95 +120,88 @@ internal P_JobDef(watch_monitor_job, _)
EndScratch(scratch); EndScratch(scratch);
} }
/* NOTE: We separate the responsibilities of monitoring directory changes ////////////////////////////////
* & dispatching watch callbacks into two separate jobs so that we can delay //~ Dispatcher job
* the dispatch, allowing for deduplication of file modification notifications. */
#define WATCH_DISPATCHER_DELAY_SECONDS 0.050 P_JobDef(W_DispatcherJob, _)
#define WATCH_DISPATCHER_DEDUP_DICT_BINS 128
struct watch_callback_job_sig {
String name;
watch_callback **callbacks;
};
internal P_JobDef(watch_callback_job, job)
{ {
__prof; W_SharedState *g = &W_shared_state;
struct watch_callback_job_sig *sig = job.sig;
String name = sig->name;
watch_callback *callback = sig->callbacks[job.id];
callback(name);
}
internal P_JobDef(watch_dispatcher_job, _)
{
(UNUSED)_;
b32 shutdown = 0; b32 shutdown = 0;
while (!shutdown) { while (!shutdown)
{
{ {
TempArena scratch = BeginScratchNoConflict(); TempArena scratch = BeginScratchNoConflict();
struct watch_event *first_watch_event = 0; W_Event *first_watch_event = 0;
struct watch_event *last_watch_event = 0; W_Event *last_watch_event = 0;
/* Delay so that duplicate events pile up */ /* Delay so that duplicate events pile up */
{ {
__profn("Delay"); __profn("Delay");
P_Wait(0, 0, 0, NsFromSeconds(WATCH_DISPATCHER_DELAY_SECONDS)); P_Wait(0, 0, 0, NsFromSeconds(W_DispatcherDelaySeconds));
} }
/* Pull watch events from queue */ /* Pull watch events from queue */
{ {
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex); P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
for (struct watch_event *src_event = G.first_watch_event; src_event; src_event = src_event->next) { for (W_Event *src_event = g->first_watch_event; src_event; src_event = src_event->next)
struct watch_event *e = PushStruct(scratch.arena, struct watch_event); {
W_Event *e = PushStruct(scratch.arena, W_Event);
e->name = CopyString(scratch.arena, src_event->name); e->name = CopyString(scratch.arena, src_event->name);
if (last_watch_event) { if (last_watch_event)
{
last_watch_event->next = e; last_watch_event->next = e;
} else { }
else
{
first_watch_event = e; first_watch_event = e;
} }
last_watch_event = e; last_watch_event = e;
} }
G.first_watch_event = 0; g->first_watch_event = 0;
G.last_watch_event = 0; g->last_watch_event = 0;
ResetArena(G.watch_events_arena); ResetArena(g->watch_events_arena);
P_Unlock(&lock); P_Unlock(&lock);
} }
/* Build callbacks array */ /* Build callbacks array */
u64 num_callbacks = 0; u64 num_callbacks = 0;
watch_callback **callbacks = 0; W_CallbackFunc **callbacks = 0;
P_Lock callbacks_lock = P_LockS(&G.watch_callbacks_mutex); P_Lock callbacks_lock = P_LockS(&g->watch_callbacks_mutex);
{ {
num_callbacks = G.num_watch_callbacks; num_callbacks = g->num_watch_callbacks;
callbacks = PushStructsNoZero(scratch.arena, watch_callback *, num_callbacks); callbacks = PushStructsNoZero(scratch.arena, W_CallbackFunc *, num_callbacks);
for (u64 i = 0; i < num_callbacks; ++i) { for (u64 i = 0; i < num_callbacks; ++i)
callbacks[i] = G.watch_callbacks[i]; {
callbacks[i] = g->watch_callbacks[i];
} }
} }
P_Unlock(&callbacks_lock); P_Unlock(&callbacks_lock);
/* Run callbacks */ /* Run callbacks */
{ {
Dict *dedup_dict = InitDict(scratch.arena, WATCH_DISPATCHER_DEDUP_DICT_BINS); Dict *dedup_dict = InitDict(scratch.arena, W_DispatcherDedupBins);
for (struct watch_event *e = first_watch_event; e; e = e->next) { for (W_Event *e = first_watch_event; e; e = e->next)
{
__profn("Dispatch"); __profn("Dispatch");
/* Do not run callbacks for the same file more than once */ /* Do not run callbacks for the same file more than once */
b32 skip = 0; b32 skip = 0;
u64 hash = HashFnv64(Fnv64Basis, e->name); u64 hash = HashFnv64(Fnv64Basis, e->name);
if (DictValueFromHash(dedup_dict, hash) == 1) { if (DictValueFromHash(dedup_dict, hash) == 1)
{
skip = 1; skip = 1;
} else { }
else
{
SetDictValue(scratch.arena, dedup_dict, hash, 1); SetDictValue(scratch.arena, dedup_dict, hash, 1);
} }
if (!skip) { if (!skip)
struct watch_callback_job_sig sig = ZI; {
W_RunCallbackJobSig sig = ZI;
sig.name = e->name; sig.name = e->name;
sig.callbacks = callbacks; sig.callbacks = callbacks;
P_Counter counter = ZI; P_Counter counter = ZI;
P_Run(num_callbacks, watch_callback_job, &sig, P_Pool_Background, P_Priority_Low, &counter); P_Run(num_callbacks, W_RunCallbackJob, &sig, P_Pool_Background, P_Priority_Low, &counter);
P_WaitOnCounter(&counter); P_WaitOnCounter(&counter);
} }
} }
@ -215,22 +211,15 @@ internal P_JobDef(watch_dispatcher_job, _)
} }
/* Wait for event */ /* Wait for event */
P_Lock lock = P_LockS(&G.watch_dispatcher_mutex); P_Lock lock = P_LockS(&g->watch_dispatcher_mutex);
{ {
shutdown = Atomic32Fetch(&G.watch_shutdown); shutdown = Atomic32Fetch(&g->W_Shutdown);
while (!shutdown && !G.first_watch_event) { while (!shutdown && !g->first_watch_event)
P_WaitOnCv(&G.watch_dispatcher_cv, &lock); {
shutdown = Atomic32Fetch(&G.watch_shutdown); P_WaitOnCv(&g->watch_dispatcher_cv, &lock);
shutdown = Atomic32Fetch(&g->W_Shutdown);
} }
} }
P_Unlock(&lock); P_Unlock(&lock);
} }
} }
#else /* RESOURCE_RELOADING */
void watch_startup(void)
{
}
#endif /* RESOURCE_RELOADING */

View File

@ -1,10 +1,67 @@
#define WATCH_CALLBACK_FUNC_DEF(func_name, arg_name) void func_name(String arg_name) ////////////////////////////////
typedef WATCH_CALLBACK_FUNC_DEF(watch_callback, name); //~ Callback types
void watch_startup(void); #define W_CallbackFuncDef(func_name, arg_name) void func_name(String arg_name)
typedef W_CallbackFuncDef(W_CallbackFunc, name);
#if RESOURCE_RELOADING Struct(W_RunCallbackJobSig)
void watch_register_callback(watch_callback *callback); {
#else String name;
#define watch_register_callback(callback) W_CallbackFunc **callbacks;
#endif };
////////////////////////////////
//~ Event types
Struct(W_Event)
{
String name;
W_Event *next;
};
////////////////////////////////
//~ Shared state
#define W_DispatcherDelaySeconds 0.050
#define W_DispatcherDedupBins 128
Struct(W_SharedState)
{
P_Watch *watch;
Atomic32 W_Shutdown;
P_Counter watch_jobs_counter;
P_Mutex watch_dispatcher_mutex;
Arena *watch_events_arena;
W_Event *first_watch_event;
W_Event *last_watch_event;
P_Cv watch_dispatcher_cv;
P_Mutex watch_callbacks_mutex;
W_CallbackFunc *watch_callbacks[64];
u64 num_watch_callbacks;
};
extern W_SharedState W_shared_state;
////////////////////////////////
//~ Startup
void W_Startup(void);
P_ExitFuncDef(W_Shutdown);
////////////////////////////////
//~ Watch operations
void W_RegisterCallback(W_CallbackFunc *callback);
////////////////////////////////
//~ Callback job
P_JobDef(W_RunCallbackJob, job);
////////////////////////////////
//~ Long running jobs
P_JobDef(W_MonitorJob, job);
P_JobDef(W_DispatcherJob, job);