power_play/src/watch/watch_core.c
2025-07-29 18:12:41 -05:00

237 lines
7.5 KiB
C

#if RESOURCE_RELOADING
struct watch_event {
String name;
struct watch_event *next;
};
GLOBAL struct {
P_Watch *watch;
Atomic32 watch_shutdown;
P_Counter watch_jobs_counter;
P_Mutex watch_dispatcher_mutex;
Arena *watch_events_arena;
struct watch_event *first_watch_event;
struct watch_event *last_watch_event;
P_Cv watch_dispatcher_cv;
P_Mutex watch_callbacks_mutex;
watch_callback *watch_callbacks[64];
u64 num_watch_callbacks;
} G = ZI, DEBUG_ALIAS(G, G_watch);
/* ========================== *
* Startup
* ========================== */
INTERNAL P_JobDef(watch_monitor_job, _);
INTERNAL P_JobDef(watch_dispatcher_job, _);
INTERNAL P_ExitFuncDef(watch_shutdown);
void watch_startup(void)
{
G.watch = P_AllocWatch(LIT("./"));
G.watch_events_arena = arena_alloc(GIBI(64));
P_Run(1, watch_monitor_job, 0, P_Pool_Floating, P_Priority_Low, &G.watch_jobs_counter);
P_Run(1, watch_dispatcher_job, 0, P_Pool_Background, P_Priority_Low, &G.watch_jobs_counter);
P_OnExit(&watch_shutdown);
}
/* ========================== *
* Watch
* ========================== */
INTERNAL P_ExitFuncDef(watch_shutdown)
{
__prof;
atomic32_fetch_set(&G.watch_shutdown, 1);
{
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex);
P_SignalCv(&G.watch_dispatcher_cv, I32_MAX);
P_WakeWatch(G.watch);
P_Unlock(&lock);
}
P_WaitOnCounter(&G.watch_jobs_counter);
}
void watch_register_callback(watch_callback *callback)
{
P_Lock lock = P_LockE(&G.watch_callbacks_mutex);
{
if (G.num_watch_callbacks < countof(G.watch_callbacks)) {
G.watch_callbacks[G.num_watch_callbacks++] = callback;
} else {
P_Panic(LIT("Max resource watch callbacks reached"));
}
}
P_Unlock(&lock);
}
INTERNAL P_JobDef(watch_monitor_job, _)
{
(UNUSED)_;
TempArena scratch = scratch_begin_no_conflict();
String ignored[] = {
LIT(".vs"),
LIT(".git")
};
while (!atomic32_fetch(&G.watch_shutdown)) {
TempArena temp = arena_temp_begin(scratch.arena);
P_WatchInfoList info_list = P_ReadWatchWait(temp.arena, G.watch);
if (info_list.first && !atomic32_fetch(&G.watch_shutdown)) {
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex);
{
for (P_WatchInfo *info = info_list.first; info; info = info->next) {
String name_src = info->name;
b32 ignore = 0;
for (u32 i = 0; i < countof(ignored); ++i) {
if (string_starts_with(name_src, ignored[i])) {
ignore = 1;
break;
}
}
if (!ignore) {
struct watch_event *e = arena_push(G.watch_events_arena, struct watch_event);
e->name = string_copy(G.watch_events_arena, name_src);
if (G.last_watch_event) {
G.last_watch_event->next = e;
} else {
G.first_watch_event = e;
}
G.last_watch_event = e;
}
}
}
P_SignalCv(&G.watch_dispatcher_cv, I32_MAX);
P_Unlock(&lock);
}
arena_temp_end(temp);
}
scratch_end(scratch);
}
/* NOTE: We separate the responsibilities of monitoring directory changes
* & dispatching watch callbacks into two separate jobs so that we can delay
* the dispatch, allowing for deduplication of file modification notifications. */
#define WATCH_DISPATCHER_DELAY_SECONDS 0.050
#define WATCH_DISPATCHER_DEDUP_DICT_BINS 128
struct watch_callback_job_sig {
String name;
watch_callback **callbacks;
};
INTERNAL P_JobDef(watch_callback_job, job)
{
__prof;
struct watch_callback_job_sig *sig = job.sig;
String name = sig->name;
watch_callback *callback = sig->callbacks[job.id];
callback(name);
}
INTERNAL P_JobDef(watch_dispatcher_job, _)
{
(UNUSED)_;
b32 shutdown = 0;
while (!shutdown) {
{
TempArena scratch = scratch_begin_no_conflict();
struct watch_event *first_watch_event = 0;
struct watch_event *last_watch_event = 0;
/* Delay so that duplicate events pile up */
{
__profn("Delay");
P_Wait(0, 0, 0, NS_FROM_SECONDS(WATCH_DISPATCHER_DELAY_SECONDS));
}
/* Pull watch events from queue */
{
P_Lock lock = P_LockE(&G.watch_dispatcher_mutex);
for (struct watch_event *src_event = G.first_watch_event; src_event; src_event = src_event->next) {
struct watch_event *e = arena_push(scratch.arena, struct watch_event);
e->name = string_copy(scratch.arena, src_event->name);
if (last_watch_event) {
last_watch_event->next = e;
} else {
first_watch_event = e;
}
last_watch_event = e;
}
G.first_watch_event = 0;
G.last_watch_event = 0;
arena_reset(G.watch_events_arena);
P_Unlock(&lock);
}
/* Build callbacks array */
u64 num_callbacks = 0;
watch_callback **callbacks = 0;
P_Lock callbacks_lock = P_LockS(&G.watch_callbacks_mutex);
{
num_callbacks = G.num_watch_callbacks;
callbacks = arena_push_array_no_zero(scratch.arena, watch_callback *, num_callbacks);
for (u64 i = 0; i < num_callbacks; ++i) {
callbacks[i] = G.watch_callbacks[i];
}
}
P_Unlock(&callbacks_lock);
/* Run callbacks */
{
Dict *dedup_dict = dict_init(scratch.arena, WATCH_DISPATCHER_DEDUP_DICT_BINS);
for (struct watch_event *e = first_watch_event; e; e = e->next) {
__profn("Dispatch");
/* Do not run callbacks for the same file more than once */
b32 skip = 0;
u64 hash = hash_fnv64(HASH_FNV64_BASIS, e->name);
if (dict_get(dedup_dict, hash) == 1) {
skip = 1;
} else {
dict_set(scratch.arena, dedup_dict, hash, 1);
}
if (!skip) {
struct watch_callback_job_sig sig = ZI;
sig.name = e->name;
sig.callbacks = callbacks;
P_Counter counter = ZI;
P_Run(num_callbacks, watch_callback_job, &sig, P_Pool_Background, P_Priority_Low, &counter);
P_WaitOnCounter(&counter);
}
}
}
scratch_end(scratch);
}
/* Wait for event */
P_Lock lock = P_LockS(&G.watch_dispatcher_mutex);
{
shutdown = atomic32_fetch(&G.watch_shutdown);
while (!shutdown && !G.first_watch_event) {
P_WaitOnCv(&G.watch_dispatcher_cv, &lock);
shutdown = atomic32_fetch(&G.watch_shutdown);
}
}
P_Unlock(&lock);
}
}
#else /* RESOURCE_RELOADING */
void watch_startup(void)
{
}
#endif /* RESOURCE_RELOADING */