power_play/src/watch/watch_core.c
2025-08-02 09:11:26 -05:00

226 lines
7.0 KiB
C

W_SharedState W_shared_state = ZI;
////////////////////////////////
//~ Startup
void W_Startup(void)
{
W_SharedState *g = &W_shared_state;
g->watch = P_AllocWatch(Lit("./"));
g->watch_events_arena = AllocArena(Gibi(64));
P_Run(1, W_MonitorJob, 0, PoolKind_Floating, PriorityKind_Low, &g->watch_jobs_counter);
P_Run(1, W_DispatcherJob, 0, PoolKind_Background, PriorityKind_Low, &g->watch_jobs_counter);
P_OnExit(&W_Shutdown);
}
P_ExitFuncDef(W_Shutdown)
{
__prof;
W_SharedState *g = &W_shared_state;
Atomic32FetchSet(&g->W_Shutdown, 1);
{
P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
P_SignalCv(&g->watch_dispatcher_cv, I32Max);
P_WakeWatch(g->watch);
P_Unlock(&lock);
}
P_WaitOnCounter(&g->watch_jobs_counter);
}
////////////////////////////////
//~ Callback
void W_RegisterCallback(W_CallbackFunc *callback)
{
W_SharedState *g = &W_shared_state;
P_Lock lock = P_LockE(&g->watch_callbacks_mutex);
{
if (g->num_watch_callbacks < countof(g->watch_callbacks))
{
g->watch_callbacks[g->num_watch_callbacks++] = callback;
}
else
{
P_Panic(Lit("Max resource watch callbacks reached"));
}
}
P_Unlock(&lock);
}
JobDef(W_RunCallbackJob, job)
{
__prof;
W_RunCallbackJobSig *sig = job.sig;
String name = sig->name;
W_CallbackFunc *callback = sig->callbacks[job.id];
callback(name);
}
////////////////////////////////
//~ Monitor job
/* NOTE: We separate the responsibilities of monitoring directory changes
* & dispatching watch callbacks into two separate jobs so that we can delay
* the dispatch, allowing for deduplication of file modification notifications. */
JobDef(W_MonitorJob, _)
{
TempArena scratch = BeginScratchNoConflict();
W_SharedState *g = &W_shared_state;
String ignored[] = {
Lit(".vs"),
Lit(".git")
};
while (!Atomic32Fetch(&g->W_Shutdown))
{
TempArena temp = BeginTempArena(scratch.arena);
P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, g->watch);
if (info_list.first && !Atomic32Fetch(&g->W_Shutdown))
{
P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
{
for (P_WatchInfo *info = info_list.first; info; info = info->next)
{
String name_src = info->name;
b32 ignore = 0;
for (u32 i = 0; i < countof(ignored); ++i)
{
if (StringStartsWith(name_src, ignored[i]))
{
ignore = 1;
break;
}
}
if (!ignore)
{
W_Event *e = PushStruct(g->watch_events_arena, W_Event);
e->name = PushString(g->watch_events_arena, name_src);
if (g->last_watch_event)
{
g->last_watch_event->next = e;
}
else
{
g->first_watch_event = e;
}
g->last_watch_event = e;
}
}
}
P_SignalCv(&g->watch_dispatcher_cv, I32Max);
P_Unlock(&lock);
}
EndTempArena(temp);
}
EndScratch(scratch);
}
////////////////////////////////
//~ Dispatcher job
JobDef(W_DispatcherJob, _)
{
W_SharedState *g = &W_shared_state;
b32 shutdown = 0;
while (!shutdown)
{
{
TempArena scratch = BeginScratchNoConflict();
W_Event *first_watch_event = 0;
W_Event *last_watch_event = 0;
/* Delay so that duplicate events pile up */
{
__profn("Delay");
P_Wait(0, 0, 0, NsFromSeconds(W_DispatcherDelaySeconds));
}
/* Pull watch events from queue */
{
P_Lock lock = P_LockE(&g->watch_dispatcher_mutex);
for (W_Event *src_event = g->first_watch_event; src_event; src_event = src_event->next)
{
W_Event *e = PushStruct(scratch.arena, W_Event);
e->name = PushString(scratch.arena, src_event->name);
if (last_watch_event)
{
last_watch_event->next = e;
}
else
{
first_watch_event = e;
}
last_watch_event = e;
}
g->first_watch_event = 0;
g->last_watch_event = 0;
ResetArena(g->watch_events_arena);
P_Unlock(&lock);
}
/* Build callbacks array */
u64 num_callbacks = 0;
W_CallbackFunc **callbacks = 0;
P_Lock callbacks_lock = P_LockS(&g->watch_callbacks_mutex);
{
num_callbacks = g->num_watch_callbacks;
callbacks = PushStructsNoZero(scratch.arena, W_CallbackFunc *, num_callbacks);
for (u64 i = 0; i < num_callbacks; ++i)
{
callbacks[i] = g->watch_callbacks[i];
}
}
P_Unlock(&callbacks_lock);
/* Run callbacks */
{
Dict *dedup_dict = InitDict(scratch.arena, W_DispatcherDedupBins);
for (W_Event *e = first_watch_event; e; e = e->next)
{
__profn("Dispatch");
/* Do not run callbacks for the same file more than once */
b32 skip = 0;
u64 hash = HashFnv64(Fnv64Basis, e->name);
if (DictValueFromHash(dedup_dict, hash) == 1)
{
skip = 1;
}
else
{
SetDictValue(scratch.arena, dedup_dict, hash, 1);
}
if (!skip)
{
W_RunCallbackJobSig sig = ZI;
sig.name = e->name;
sig.callbacks = callbacks;
Counter counter = ZI;
P_Run(num_callbacks, W_RunCallbackJob, &sig, PoolKind_Background, PriorityKind_Low, &counter);
P_WaitOnCounter(&counter);
}
}
}
EndScratch(scratch);
}
/* Wait for event */
P_Lock lock = P_LockS(&g->watch_dispatcher_mutex);
{
shutdown = Atomic32Fetch(&g->W_Shutdown);
while (!shutdown && !g->first_watch_event)
{
P_WaitOnCv(&g->watch_dispatcher_cv, &lock);
shutdown = Atomic32Fetch(&g->W_Shutdown);
}
}
P_Unlock(&lock);
}
}