use jobs for resource watch

This commit is contained in:
jacob 2025-07-14 17:27:46 -05:00
parent 630dbdbb58
commit 0948e357c6
4 changed files with 21 additions and 30 deletions

View File

@ -28,7 +28,7 @@ INTERNAL BOOL CALLBACK enum_func(HMODULE module, LPCWSTR type, LPCWSTR wstr_entr
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();
struct rc_search_params *params = (struct rc_search_params *)udata; struct rc_search_params *params = (struct rc_search_params *)udata;
struct string entry_name_lower = string_lower(scratch.arena, string_from_wstr_no_limit(scratch.arena, (LPWSTR)wstr_entry_name)); struct string entry_name_lower = string_lower(scratch.arena, string_from_wstr_no_limit(scratch.arena, (LPWSTR)wstr_entry_name));
params->found = false; params->found = 0;
params->data = STRING(0, 0); params->data = STRING(0, 0);
if (string_eq(entry_name_lower, params->name_lower)) { if (string_eq(entry_name_lower, params->name_lower)) {
HRSRC hres = FindResourceW(module, wstr_entry_name, type); HRSRC hres = FindResourceW(module, wstr_entry_name, type);

View File

@ -23,11 +23,9 @@ GLOBAL struct {
#endif #endif
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
struct sys_thread *resource_watch_monitor_thread;
struct sys_thread *resource_watch_dispatch_thread;
struct sys_watch *watch; struct sys_watch *watch;
struct atomic32 watch_shutdown; struct atomic32 watch_shutdown;
struct snc_counter watch_jobs_counter;
struct snc_mutex watch_dispatcher_mutex; struct snc_mutex watch_dispatcher_mutex;
struct arena *watch_dispatcher_info_arena; struct arena *watch_dispatcher_info_arena;
@ -45,8 +43,8 @@ GLOBAL struct {
* ========================== */ * ========================== */
#if RESOURCE_RELOADING #if RESOURCE_RELOADING
INTERNAL SYS_THREAD_DEF(resource_watch_monitor_thread_entry_point, _); INTERNAL SYS_JOB_DEF(resource_watch_monitor_job, _);
INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _); INTERNAL SYS_JOB_DEF(resource_watch_dispatcher_job, _);
INTERNAL SYS_EXIT_FUNC(resource_shutdown); INTERNAL SYS_EXIT_FUNC(resource_shutdown);
#endif #endif
@ -73,9 +71,9 @@ struct resource_startup_receipt resource_startup(void)
G.watch_dispatcher_info_arena = arena_alloc(GIBI(64)); G.watch_dispatcher_info_arena = arena_alloc(GIBI(64));
sys_run(1, resource_watch_monitor_job, 0, SYS_POOL_FLOATING, SYS_PRIORITY_LOW, &G.watch_jobs_counter);
sys_run(1, resource_watch_dispatcher_job, 0, SYS_POOL_BACKGROUND, SYS_PRIORITY_LOW, &G.watch_jobs_counter);
sys_on_exit(&resource_shutdown); sys_on_exit(&resource_shutdown);
G.resource_watch_monitor_thread = sys_thread_alloc(resource_watch_monitor_thread_entry_point, 0, LIT("Resource watch monitor"), PROF_THREAD_GROUP_IO);
G.resource_watch_dispatch_thread = sys_thread_alloc(resource_watch_dispatcher_thread_entry_point, 0, LIT("Resource watch dispatcher"), PROF_THREAD_GROUP_IO);
#endif #endif
@ -166,9 +164,7 @@ INTERNAL SYS_EXIT_FUNC(resource_shutdown)
sys_watch_wake(G.watch); sys_watch_wake(G.watch);
snc_unlock(&lock); snc_unlock(&lock);
} }
snc_counter_wait(&G.watch_jobs_counter);
sys_thread_wait_release(G.resource_watch_dispatch_thread);
sys_thread_wait_release(G.resource_watch_monitor_thread);
} }
void resource_register_watch_callback(resource_watch_callback *callback) void resource_register_watch_callback(resource_watch_callback *callback)
@ -184,14 +180,14 @@ void resource_register_watch_callback(resource_watch_callback *callback)
snc_unlock(&lock); snc_unlock(&lock);
} }
INTERNAL SYS_THREAD_DEF(resource_watch_monitor_thread_entry_point, _) INTERNAL SYS_JOB_DEF(resource_watch_monitor_job, _)
{ {
(UNUSED)_; (UNUSED)_;
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();
while (!atomic32_fetch(&G.watch_shutdown)) { while (!atomic32_fetch(&G.watch_shutdown)) {
struct arena_temp temp = arena_temp_begin(scratch.arena); struct arena_temp temp = arena_temp_begin(scratch.arena);
struct sys_watch_info_list res = sys_watch_wait(temp.arena, G.watch); struct sys_watch_info_list res = sys_watch_read_wait(temp.arena, G.watch);
if (res.first && !atomic32_fetch(&G.watch_shutdown)) { if (res.first && !atomic32_fetch(&G.watch_shutdown)) {
struct snc_lock lock = snc_lock_e(&G.watch_dispatcher_mutex); struct snc_lock lock = snc_lock_e(&G.watch_dispatcher_mutex);
{ {
@ -214,7 +210,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_monitor_thread_entry_point, _)
} }
/* NOTE: We separate the responsibilities of monitoring directory changes /* NOTE: We separate the responsibilities of monitoring directory changes
* & dispatching watch callbacks into two separate threads so that we can delay * & dispatching watch callbacks into two separate jobs so that we can delay
* the dispatch, allowing for deduplication of file modification notifications. */ * the dispatch, allowing for deduplication of file modification notifications. */
#define WATCH_DISPATCHER_DELAY_SECONDS 0.050 #define WATCH_DISPATCHER_DELAY_SECONDS 0.050
@ -234,7 +230,7 @@ INTERNAL SYS_JOB_DEF(resource_watch_callback_job, job)
callback(name); callback(name);
} }
INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _) INTERNAL SYS_JOB_DEF(resource_watch_dispatcher_job, _)
{ {
(UNUSED)_; (UNUSED)_;
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();

View File

@ -1,13 +1,7 @@
#ifndef SYS_H #ifndef SYS_H
#define SYS_H #define SYS_H
struct snc_counter;
/* ========================== * /* ========================== *
* Exit * Exit
@ -87,7 +81,6 @@ struct sys_job_data {
#define SYS_JOB_DEF(job_name, arg_name) void job_name(struct sys_job_data arg_name) #define SYS_JOB_DEF(job_name, arg_name) void job_name(struct sys_job_data arg_name)
typedef SYS_JOB_DEF(sys_job_func, job_data); typedef SYS_JOB_DEF(sys_job_func, job_data);
struct snc_counter;
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter); void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_pool pool_kind, enum sys_priority priority, struct snc_counter *counter);
/* ========================== * /* ========================== *
@ -428,7 +421,7 @@ struct sys_watch *sys_watch_alloc(struct string path);
void sys_watch_release(struct sys_watch *dw); void sys_watch_release(struct sys_watch *dw);
struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch *dw); struct sys_watch_info_list sys_watch_read_wait(struct arena *arena, struct sys_watch *dw);
void sys_watch_wake(struct sys_watch *dw); void sys_watch_wake(struct sys_watch *dw);

View File

@ -181,9 +181,10 @@ struct alignas(64) fiber {
/* ==================================================== */ /* ==================================================== */
char *name_cstr; /* 08 bytes */ char *name_cstr; /* 08 bytes */
/* ==================================================== */ /* ==================================================== */
struct atomic32 wake_lock; /* 04 bytes (4 byte alignment) */ struct atomic16 wake_lock; /* 02 bytes (4 byte alignment) */
i16 id; /* 02 bytes */ i16 id; /* 02 bytes */
i16 parent_id; /* 02 bytes */ i16 parent_id; /* 02 bytes */
i16 unyielding; /* 02 bytes */
/* ==================================================== */ /* ==================================================== */
u64 wait_addr; /* 08 bytes */ u64 wait_addr; /* 08 bytes */
/* ==================================================== */ /* ==================================================== */
@ -469,7 +470,7 @@ void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns)
{ {
struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i16 parent_id = fiber->parent_id; i16 parent_id = fiber->parent_id;
if (parent_id != 0) { if (parent_id != 0 && !fiber->unyielding) {
*fiber->yield_param = (struct yield_param) { *fiber->yield_param = (struct yield_param) {
.kind = YIELD_KIND_WAIT, .kind = YIELD_KIND_WAIT,
.wait = { .wait = {
@ -603,7 +604,7 @@ INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers)
fiber->next_time_waiter = 0; fiber->next_time_waiter = 0;
} }
/* Unlock fiber */ /* Unlock fiber */
atomic32_fetch_set(&fiber->wake_lock, 0); atomic16_fetch_set(&fiber->wake_lock, 0);
} }
/* Unlock wait bins */ /* Unlock wait bins */
if (wait_time_bin != 0) tm_unlock(&wait_time_bin->lock); if (wait_time_bin != 0) tm_unlock(&wait_time_bin->lock);
@ -687,7 +688,7 @@ INTERNAL void wake_address(void *addr, i32 count)
if (wait_addr_list) { if (wait_addr_list) {
fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters); fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters);
for (struct fiber *fiber = fiber_from_id(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = fiber_from_id(fiber->next_addr_waiter)) { for (struct fiber *fiber = fiber_from_id(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = fiber_from_id(fiber->next_addr_waiter)) {
if (atomic32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) { if (atomic16_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) {
fibers[num_fibers] = fiber; fibers[num_fibers] = fiber;
++num_fibers; ++num_fibers;
} }
@ -738,7 +739,7 @@ INTERNAL void wake_time(u64 time)
/* Set waiter wake status & build fibers list */ /* Set waiter wake status & build fibers list */
fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_time_list->num_waiters); fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_time_list->num_waiters);
for (struct fiber *fiber = fiber_from_id(wait_time_list->first_waiter); fiber; fiber = fiber_from_id(fiber->next_time_waiter)) { for (struct fiber *fiber = fiber_from_id(wait_time_list->first_waiter); fiber; fiber = fiber_from_id(fiber->next_time_waiter)) {
if (atomic32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) { if (atomic16_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) {
fibers[num_fibers] = fiber; fibers[num_fibers] = fiber;
++num_fibers; ++num_fibers;
} }
@ -1118,6 +1119,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_fiber->job_priority = job_priority; job_fiber->job_priority = job_priority;
job_fiber->job_counter = job_counter; job_fiber->job_counter = job_counter;
job_fiber->yield_param = &yield; job_fiber->yield_param = &yield;
job_fiber->unyielding = 0;
b32 done = 0; b32 done = 0;
while (!done) { while (!done) {
job_fiber_resume(job_fiber); job_fiber_resume(job_fiber);
@ -2089,7 +2091,7 @@ void sys_watch_release(struct sys_watch *dw)
snc_unlock(&lock); snc_unlock(&lock);
} }
struct sys_watch_info_list sys_watch_wait(struct arena *arena, struct sys_watch *dw) struct sys_watch_info_list sys_watch_read_wait(struct arena *arena, struct sys_watch *dw)
{ {
__prof; __prof;
struct win32_watch *w32_watch = (struct win32_watch *)dw; struct win32_watch *w32_watch = (struct win32_watch *)dw;