From 1abf58d45b785cc02742e457d1a266801d7e3bd6 Mon Sep 17 00:00:00 2001 From: jacob Date: Thu, 11 Sep 2025 08:49:06 -0500 Subject: [PATCH] job refactor to move futexes out of platform layer --- build.bat | 6 +- src/app/app.h | 6 +- src/asset_cache/asset_cache.h | 6 +- src/base/base.h | 42 +- src/base/base_arena.h | 12 +- src/base/base_futex.c | 185 +++ src/base/base_futex.h | 67 + src/base/base_inc.h | 2 + src/base/base_job.h | 149 +- src/base/base_resource.c | 2 +- src/base/base_resource.h | 8 +- src/base/base_snc.c | 56 +- src/base/base_snc.h | 40 +- src/base/base_win32/base_win32.c | 58 +- src/base/base_win32/base_win32.h | 9 +- src/base/base_win32/base_win32_job.c | 1440 ++++------------- src/base/base_win32/base_win32_job.h | 266 +-- src/draw/draw.h | 6 +- src/font/font.c | 13 +- src/gpu/gpu_dx12/gpu_dx12.h | 4 +- src/meta/meta.c | 46 +- src/meta/meta_os/meta_os.h | 1 - .../meta_os/meta_os_win32/meta_os_win32.c | 9 - src/mixer/mixer.h | 6 +- src/platform/platform.h | 22 +- src/platform/platform_log.c | 7 +- src/platform/platform_log.h | 9 +- src/platform/platform_win32/platform_win32.c | 112 +- src/platform/platform_win32/platform_win32.h | 15 +- .../playback_wasapi/playback_wasapi.c | 4 +- .../playback_wasapi/playback_wasapi.h | 8 +- src/pp/pp.c | 10 +- src/pp/pp.h | 8 +- src/pp/pp_draw.gpu | 2 +- src/pp/pp_draw.h | 4 +- src/pp/pp_sim.h | 2 +- src/sound/sound.c | 13 +- src/sprite/sprite.c | 4 +- src/sprite/sprite.h | 6 +- src/tar/tar.h | 4 +- src/ttf/ttf_dwrite/ttf_dwrite.c | 3 +- src/ttf/ttf_dwrite/ttf_dwrite.h | 6 +- 42 files changed, 1076 insertions(+), 1602 deletions(-) create mode 100644 src/base/base_futex.c create mode 100644 src/base/base_futex.h diff --git a/build.bat b/build.bat index 6f535612..146a5982 100644 --- a/build.bat +++ b/build.bat @@ -9,6 +9,10 @@ set program_build_cmd=meta.exe %* set meta_build_cmd=cl.exe ../src/meta/meta.c -Od -Z7 -nologo -diagnostics:column -WX -link -DEBUG:FULL -INCREMENTAL:NO set meta_rebuild_code=1317212284 +if "%force_meta_build%"=="1" ( + if exist meta.exe del meta.exe +) + ::- Meta build :meta_build if not exist meta.exe ( @@ -22,7 +26,7 @@ if not exist meta.exe ( ) ::- Program build -if not "%nobuild%"=="1" ( +if not "%no_program_build%"=="1" ( echo ======== Build ======== %program_build_cmd% set "rc=!errorlevel!" diff --git a/src/app/app.h b/src/app/app.h index 55527662..3ca1fc54 100644 --- a/src/app/app.h +++ b/src/app/app.h @@ -16,15 +16,13 @@ Struct(AppArgList) }; //////////////////////////////// -//~ State +//~ State types Struct(SharedAppState) { Arena *arena; String write_path; -}; - -extern SharedAppState shared_app_state; +} extern shared_app_state; //////////////////////////////// //~ App functions diff --git a/src/asset_cache/asset_cache.h b/src/asset_cache/asset_cache.h index 0101d22d..721381e9 100644 --- a/src/asset_cache/asset_cache.h +++ b/src/asset_cache/asset_cache.h @@ -38,7 +38,7 @@ Struct(AC_Store) }; //////////////////////////////// -//~ State +//~ State types #define AC_MaxAssets 1024 #define AC_AssetLookupTableCapacity (AC_MaxAssets * 4) @@ -58,9 +58,7 @@ Struct(AC_SharedState) u64 dbg_table_count; Mutex dbg_table_mutex; #endif -}; - -extern AC_SharedState AC_shared_state; +} extern AC_shared_state; //////////////////////////////// //~ Startup diff --git a/src/base/base.h b/src/base/base.h index fb063009..4d4ff62c 100644 --- a/src/base/base.h +++ b/src/base/base.h @@ -95,6 +95,10 @@ # error Unknown architecture # endif +//- Cache line size +/* TODO: Just hard-code to something like 128 or 256 if Apple silicon is ever supported */ +#define CachelineSize 64 + //- Windows NTDDI version /* TODO: Remove this */ #if 0 @@ -139,7 +143,7 @@ #if RtcIsEnabled # if CompilerIsMsvc # define Assert(cond) ((cond) ? 1 : (IsRunningInDebugger() ? (*(volatile i32 *)0 = 0) : Panic(Lit(__FILE__ ":" Stringize(__LINE__) ":0: assertion failed: "#cond"")))) -# define DEBUGBREAK __debugbreak +# define DEBUGBREAK __debugbreak() # else # define Assert(cond) ((cond) ? 1 : (__builtin_trap(), 0)) # define DEBUGBREAK __builtin_debugtrap() @@ -405,6 +409,7 @@ void __asan_unpoison_memory_region(void const volatile *add, size_t); //- Struct #define Struct(name) typedef struct name name; struct name #define AlignedStruct(name, n) typedef struct name name; struct alignas(n) name +#define AlignedBlock(n) struct alignas(n) //- Enum #define Enum(name) typedef enum name name; enum name @@ -530,14 +535,14 @@ Struct(Atomic32) { volatile i32 _v; }; Struct(Atomic64) { volatile i64 _v; }; //- Cache-line isolated aligned atomic types -AlignedStruct(Atomic8Padded, 64) { Atomic8 v; u8 _pad[63]; }; -AlignedStruct(Atomic16Padded, 64) { Atomic16 v; u8 _pad[62]; }; -AlignedStruct(Atomic32Padded, 64) { Atomic32 v; u8 _pad[60]; }; -AlignedStruct(Atomic64Padded, 64) { Atomic64 v; u8 _pad[56]; }; -StaticAssert(sizeof(Atomic8Padded) == 64 && alignof(Atomic8Padded) == 64); -StaticAssert(sizeof(Atomic16Padded) == 64 && alignof(Atomic16Padded) == 64); -StaticAssert(sizeof(Atomic32Padded) == 64 && alignof(Atomic32Padded) == 64); -StaticAssert(sizeof(Atomic64Padded) == 64 && alignof(Atomic64Padded) == 64); +AlignedStruct(Atomic8Padded, CachelineSize) { Atomic8 v; }; +AlignedStruct(Atomic16Padded, CachelineSize) { Atomic16 v; }; +AlignedStruct(Atomic32Padded, CachelineSize) { Atomic32 v; }; +AlignedStruct(Atomic64Padded, CachelineSize) { Atomic64 v; }; +StaticAssert(alignof(Atomic8Padded) == CachelineSize && sizeof(Atomic8Padded) % CachelineSize == 0); +StaticAssert(alignof(Atomic16Padded) == CachelineSize && sizeof(Atomic16Padded) % CachelineSize == 0); +StaticAssert(alignof(Atomic32Padded) == CachelineSize && sizeof(Atomic32Padded) % CachelineSize == 0); +StaticAssert(alignof(Atomic64Padded) == CachelineSize && sizeof(Atomic64Padded) % CachelineSize == 0); #if PlatformIsWindows && ArchIsX64 //- 8 bit atomic operations @@ -550,8 +555,8 @@ StaticAssert(sizeof(Atomic64Padded) == 64 && alignof(Atomic64Padded) == 64); #define Atomic16Fetch(x) (x)->_v #define Atomic16FetchSet(x, e) (i16)_InterlockedExchange16(&(x)->_v, (e)) #define Atomic16FetchTestSet(x, c, e) (i16)_InterlockedCompareExchange16(&(x)->_v, (e), (c)) -#define Atomic16FetchTestXor(x, c) (i16)_InterlockedXor16(&(x)->_v, (c)) -#define Atomic16FetchTestAdd(x, a) (i16)_InterlockedExchangeAdd16(&(x)->_v, (a)) +#define Atomic16FetchXor(x, c) (i16)_InterlockedXor16(&(x)->_v, (c)) +#define Atomic16FetchAdd(x, a) (i16)_InterlockedExchangeAdd16(&(x)->_v, (a)) //- 32 bit atomic operations #define Atomic32Fetch(x) (x)->_v #define Atomic32FetchSet(x, e) (i32)_InterlockedExchange((volatile long *)&(x)->_v, (e)) @@ -576,14 +581,14 @@ StaticAssert(sizeof(Atomic64Padded) == 64 && alignof(Atomic64Padded) == 64); #if LanguageIsC Struct(TicketMutex) { - Atomic64Padded ticket; - Atomic64Padded serving; + Atomic16Padded ticket; + Atomic16Padded serving; }; ForceInline void LockTicketMutex(TicketMutex *tm) { - i64 ticket = Atomic64FetchAdd(&tm->ticket.v, 1); - while (Atomic64Fetch(&tm->serving.v) != ticket) + u16 ticket = Atomic16FetchAdd(&tm->ticket.v, 1); + while (Atomic16Fetch(&tm->serving.v) != ticket) { _mm_pause(); } @@ -591,7 +596,7 @@ ForceInline void LockTicketMutex(TicketMutex *tm) ForceInline void UnlockTicketMutex(TicketMutex *tm) { - Atomic64FetchAdd(&tm->serving.v, 1); + Atomic16FetchAdd(&tm->serving.v, 1); } #endif @@ -703,7 +708,7 @@ Struct(ComputeShader) { Resource resource; }; #if LanguageIsC # if PlatformIsWindows -# define FiberId() (*(volatile i16 *)__readgsqword(32)) +# define FiberId() (*(volatile i16 *)__readgsqword(0x20)) # else # error FiberId not implemented # endif @@ -727,8 +732,11 @@ typedef ExitFuncDef(ExitFunc); //- Core hooks StringList GetCommandLineArgs(void); +void Echo(String msg); b32 Panic(String msg); b32 IsRunningInDebugger(void); +i64 TimeNs(void); +u32 GetNumHardwareThreads(void); void TrueRand(String buffer); void OnExit(ExitFunc *func); void SignalExit(i32 code); diff --git a/src/base/base_arena.h b/src/base/base_arena.h index bfbfd9ad..e4cd1c20 100644 --- a/src/base/base_arena.h +++ b/src/base/base_arena.h @@ -1,8 +1,8 @@ //////////////////////////////// //~ Arena types -#define ArenaHeaderSize 64 -#define ArenaBlockSize 16384 +#define ArenaHeaderSize CachelineSize +#define ArenaBlockSize 16384 Struct(Arena) { @@ -25,7 +25,7 @@ Struct(TempArena) }; //////////////////////////////// -//~ State +//~ State types #define ScratchArenasPerCtx 2 @@ -38,9 +38,7 @@ Struct(FiberArenaCtx) Struct(SharedArenaCtx) { FiberArenaCtx arena_contexts[MaxFibers]; -}; - -extern SharedArenaCtx shared_arena_ctx; +} extern shared_arena_ctx; //////////////////////////////// //~ Arena push/pop @@ -125,7 +123,7 @@ void ShrinkArena(Arena *arena); void SetArenaReadonly(Arena *arena); void SetArenaReadWrite(Arena *arena); -Inline void *AlignArena(Arena *arena, u64 align) +Inline void *PushAlign(Arena *arena, u64 align) { Assert(!arena->readonly); if (align > 0) diff --git a/src/base/base_futex.c b/src/base/base_futex.c new file mode 100644 index 00000000..967fba16 --- /dev/null +++ b/src/base/base_futex.c @@ -0,0 +1,185 @@ +SharedFutexState shared_futex_state = ZI; + +//////////////////////////////// +//~ Startup + +void InitFutexSystem(void) +{ + SharedFutexState *g = &shared_futex_state; +} + +//////////////////////////////// +//~ State helpers + +FiberNeqFutexState *FiberNeqFutexStateFromId(i16 fiber_id) +{ + return &shared_futex_state.fiber_neq_states[fiber_id]; +} + +//////////////////////////////// +//~ Not-equal futex operations + +void FutexYieldNeq(volatile void *addr, void *cmp, u8 cmp_size) +{ + SharedFutexState *g = &shared_futex_state; + FutexNeqListBin *bin = &g->neq_bins[RandU64FromSeed((u64)addr) % countof(g->neq_bins)]; + + b32 cancel = 0; + + LockTicketMutex(&bin->tm); + { + /* Now that bin is locked, check if we should cancel insertion based on current value at address */ + { + union + { + volatile void *v; + Atomic8 *v8; + Atomic16 *v16; + Atomic32 *v32; + Atomic64 *v64; + } addr_atomic; + union + { + void *v; + i8 *v8; + i16 *v16; + i32 *v32; + i64 *v64; + } cmp_int; + addr_atomic.v = addr; + cmp_int.v = cmp; + switch(cmp_size) + { + default: Assert(0); cancel = 1; break; /* Invalid futex size */ + case 1: cancel = Atomic8Fetch(addr_atomic.v8) != *cmp_int.v8; break; + case 2: cancel = Atomic16Fetch(addr_atomic.v16) != *cmp_int.v16; break; + case 4: cancel = Atomic32Fetch(addr_atomic.v32) != *cmp_int.v32; break; + case 8: cancel = Atomic64Fetch(addr_atomic.v64) != *cmp_int.v64; break; + } + } + if (!cancel) + { + /* Grab futex list from address */ + FutexNeqList *list = 0; + { + list = bin->first; + for (; list; list = list->next) + { + if (list->addr == addr) break; + } + if (!list) + { + if (bin->first_free) + { + list = bin->first_free; + bin->first_free = list->next; + ZeroStruct(list); + } + else + { + Arena *perm = PermArena(); + PushAlign(perm, CachelineSize); + list = PushStruct(perm, FutexNeqList); + PushAlign(perm, CachelineSize); + list->addr = addr; + } + list->next = bin->first; + bin->first = list; + } + } + + /* Insert */ + { + i16 fiber_id = FiberId(); + FiberNeqFutexState *f = FiberNeqFutexStateFromId(fiber_id); + f->next = list->first; + list->first = fiber_id; + } + } + } + UnlockTicketMutex(&bin->tm); + + /* Suspend */ + if (!cancel) + { + SuspendFiber(); + } +} + +void FutexWakeNeq(void *addr) +{ + SharedFutexState *g = &shared_futex_state; + FutexNeqListBin *bin = &g->neq_bins[RandU64FromSeed((u64)addr) % countof(g->neq_bins)]; + + /* Pull waiting ids */ + i16 first_id = 0; + { + LockTicketMutex(&bin->tm); + { + FutexNeqList *list = bin->first; + for (; list; list = list->next) + { + if (list->addr == addr) break; + } + if (list) + { + first_id = list->first; + /* Free futex list */ + { + FutexNeqList *prev = list->prev; + FutexNeqList *next = list->next; + if (prev) + { + prev->next = next; + } + else + { + bin->first = next; + } + if (next) + { + next->prev = prev; + } + list->next = bin->first_free; + bin->first_free = list; + } + } + } + UnlockTicketMutex(&bin->tm); + } + + /* Resume fibers */ + if (first_id != 0) + { + TempArena scratch = BeginScratchNoConflict(); + i16 *ids = PushDry(scratch.arena, i16); + i16 ids_count = 0; + { + i16 id = first_id; + while (id != 0) + { + FiberNeqFutexState *f = FiberNeqFutexStateFromId(id); + *PushStructNoZero(scratch.arena, i16) = id; + ++ids_count; + id = FiberNeqFutexStateFromId(id)->next; + } + } + ResumeFibers(ids_count, ids); + EndScratch(scratch); + } +} + +//////////////////////////////// +//~ Greater-than-or-equal futex operations + +void FutexYieldGte(volatile void *addr, void *cmp, u8 cmp_size) +{ + /* TODO: Actually implement this. Just emulating via neq for now. */ + FutexYieldNeq(addr, cmp, cmp_size); +} + +void FutexWakeGte(void *addr) +{ + /* TODO: Actually implement this. Just emulating via neq for now. */ + FutexWakeNeq(addr); +} diff --git a/src/base/base_futex.h b/src/base/base_futex.h new file mode 100644 index 00000000..8aa0729c --- /dev/null +++ b/src/base/base_futex.h @@ -0,0 +1,67 @@ +//////////////////////////////// +//~ Neq futex types + +Struct(FutexNeqList) +{ + FutexNeqList *prev; + FutexNeqList *next; + volatile void *addr; + i16 first; +}; + +Struct(FutexNeqListBin) +{ + TicketMutex tm; + FutexNeqList *first; + FutexNeqList *first_free; +}; + +//////////////////////////////// +//~ State types + +#define FutexNeqBinsCount 16384 +#define FutexGteBinsCount 16384 + +AlignedStruct(FiberNeqFutexState, CachelineSize) +{ + i16 next; +}; + +Struct(SharedFutexState) +{ + FiberNeqFutexState fiber_neq_states[MaxFibers]; + FutexNeqListBin neq_bins[FutexNeqBinsCount]; +} extern shared_futex_state; + +//////////////////////////////// +//~ Startup + +void InitFutexSystem(void); + +//////////////////////////////// +//~ State helpers + +FiberNeqFutexState *FiberNeqFutexStateFromId(i16 fiber_id); + +//////////////////////////////// +//~ Not-equal futex operations + +/* Similar to Win32 WaitOnAddress & WakeByAddressAll + * i.e. - Suprious wait until value at address != cmp */ + +void FutexYieldNeq(volatile void *addr, void *cmp, u8 cmp_size); +void FutexWakeNeq(void *addr); + +//////////////////////////////// +//~ Greater-than-or-equal futex operations + +/* Similar to Win32 WaitOnAddress & WakeByAddressAll + * i.e. - Spurious wait until monotonically increasing value at address >= cmp (used for fences) + * + * NOTE: This API is offered for fence-like semantics, where waiters only want to + * wake when the futex progresses past the specified target value, rather than + * wake every time the futex is modified. + **/ + +void FutexYieldGte(volatile void *addr, void *cmp, u8 cmp_size); +void FutexWakeGte(void *addr); diff --git a/src/base/base_inc.h b/src/base/base_inc.h index ad78719d..97e76fbd 100644 --- a/src/base/base_inc.h +++ b/src/base/base_inc.h @@ -8,6 +8,7 @@ # include "base_intrinsics.h" # include "base_memory.h" # include "base_arena.h" +# include "base_futex.h" # include "base_snc.h" # include "base_job.h" # include "base_uid.h" @@ -29,6 +30,7 @@ #if LanguageIsC # include "base_memory.c" # include "base_arena.c" +# include "base_futex.c" # include "base_snc.c" # include "base_uid.c" # include "base_string.c" diff --git a/src/base/base_job.h b/src/base/base_job.h index ca267b3a..463c5961 100644 --- a/src/base/base_job.h +++ b/src/base/base_job.h @@ -1,99 +1,91 @@ //////////////////////////////// -//~ Job queue types +//~ Opaque types -/* Work pools contain their own worker threads with their own thread priorities +Struct(Job); + +//////////////////////////////// +//~ Job pool types + +/* Job pools contain worker threads with their own thread priorities * and affinities based on the intended context of the pool. */ Enum(JobPool) { JobPool_Inherit = -1, - /* Contains critical-priority worker threads affinitized over the entire CPU. + /* Contains worker threads affinitized over the entire CPU. * Meant to take on high-bandwidth temporary work (e.g. loading a level). */ - JobPool_Hyper = 0, + JobPool_Hyper = 0, /* High-priority */ - /* Contains low-priority worker threads affinitized over the entire CPU. - * Meant to take on blocking work that higher-priority workers can continue doing actual work. */ - JobPool_Blocking = 1, + /* Contains worker threads affinitized over the entire CPU. + * Meant to take on blocking work so that affinitized workers workers can continue doing actual work. */ + JobPool_Blocking = 1, /* Normal priority */ - /* Contains low-priority worker threads affinitized to cores that don't interfere with workers in specialized pools. - * Meant to take on asynchronous work from higher priority pools. */ - JobPool_Background = 2, + /* Contains worker threads affinitized to cores that don't interfere with workers in specialized pools. + * Meant to consume asynchronous work from higher priority pools. */ + JobPool_Background = 2, /* Normal priority */ - /* Contains high-priority worker threads affinitized to cores that don't interfere with workers in other specialized pools. - * These pools are meant to only have work pushed onto them from jobs within the same pool (e.g. pushing 100 rendering - * jobs will not interfere with cores running workers on the Sim pool as long as the jobs are pushed onto the User pool). */ - JobPool_Audio = 3, - JobPool_User = 4, - JobPool_Sim = 5, + /* Contains worker threads affinitized to cores that don't interfere with workers in other specialized pools. + * These pools are meant to only have work pushed onto them from jobs within the same pool (e.g. 100 jobs pushed onto the + * User pool will not interfere with cores running workers on the Sim pool). */ + JobPool_Audio = 3, /* Critical priority */ + JobPool_User = 4, /* Above-normal priority */ + JobPool_Sim = 5, /* Above-normal priority */ JobPool_Count }; -/* Job execution order within a pool is based on priority. */ -Enum(JobPriority) -{ - JobPriority_Inherit = -1, - JobPriority_High = 0, - JobPriority_Normal = 1, - JobPriority_Low = 2, +//////////////////////////////// +//~ Job types - JobPriority_Count +Struct(JobCounter) +{ + Fence jobs_completed_fence; + Atomic64Padded num_jobs_dispatched; +}; + +typedef void JobFunc(void *, i32); + +Struct(Job) +{ + Job *next; + + Arena *arena; + JobFunc *func; + JobPool pool; + i32 count; + JobCounter *counter; + void *sig; + + Atomic64Padded num_tasks_completed; }; //////////////////////////////// //~ @hookdecl Init -void InitJobWorkers(void); +void InitJobSystem(void); //////////////////////////////// -//~ @hookdecl Futex +//~ @hookdecl Fiber suspend/resume operations -/* Futex-like wait & wake */ -void FutexYield(volatile void *addr, void *cmp, u32 size, i64 timeout_ns); -void FutexWake(void *addr, i32 count); +void SuspendFiber(void); +void ResumeFibers(i16 fiber_ids_count, i16 *fiber_ids); //////////////////////////////// -//~ @hookdecl Job operations - -typedef void GenericJobFunc(void *, i32); - -Struct(GenericJobDesc) -{ - Arena *arena; - void *sig; - GenericJobFunc *func; - i32 count; - JobPool pool; - JobPriority priority; - Counter *counter; - - /* Internal */ - Atomic32 num_completed; - GenericJobDesc *next_free; -}; - -Struct(JobDescParams) -{ - i32 count; - JobPool pool; - JobPriority priority; - Counter *counter; -}; +//~ @hookdecl Job declaration operations #define EmptySig { i32 _; } -#define PushJobDesc(job, ...) (job##_Desc *)PushJobDesc_(sizeof(job##_Sig), alignof(job##_Sig), job##_Generic, (JobDescParams) { .count = 1, .pool = JobPool_Inherit, .priority = JobPriority_Inherit, .counter = 0, __VA_ARGS__ }) -GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, JobDescParams params); - -#define JobDecl(job, sigdef) \ - typedef struct job##_Sig sigdef job##_Sig; \ - Struct(job##_Desc) { Arena *arena; job##_Sig *sig; GenericJobFunc *func; i32 count; JobPool pool; JobPriority priority; Counter *counter; }; \ - void job(job##_Sig *, i32); \ - inline void job##_Generic(void *sig, i32 id) { job((job##_Sig *)sig, id); } \ +#define JobDecl(job, sigdef) \ + typedef struct job##_Sig sigdef job##_Sig; \ + Struct(job##_Desc) { JobFunc *func; JobPool pool; u32 count; JobCounter *counter; job##_Sig sig; }; \ + void job(job##_Sig *, i32); \ StaticAssert(1) #define JobDef(job, sig_arg, id_arg) void job(job##_Sig *sig_arg, i32 id_arg) +//////////////////////////////// +//~ @hookdecl Job dispatch operations + /* RunJob example usage: * * This example pushes a single 'LoadTextureJob' onto the background job @@ -101,24 +93,23 @@ GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, * and then immediately yielded on in this example, effectively making * the operation synchronous as the caller will block until the job completes: * { - * Counter counter = {0}; - * RunJob(1, LoadTextureJob, JobPool_Background, JobPriority_Low, &counter, .resource = sprite); - * YieldOnCounter(&counter); + * JobCounter counter = {0}; + * RunJob(LoadTextureJob, .pool = JobPool_Background, .counter = &counter, .sig = { .resource = sprite }); + * YieldOnJobs(&counter); * } * */ -#define RunJob(_count, job, _pool, _priority, _counter, ...) \ -do { \ - job##_Desc *__job_desc = (job##_Desc *)PushJobDesc_(sizeof(job##_Sig), alignof(job##_Sig), job##_Generic, (JobDescParams) { .count = _count, .pool = _pool, .priority = _priority, .counter = _counter,}); \ - *__job_desc->sig = (job##_Sig) { __VA_ARGS__ }; \ - RunJobEx((GenericJobDesc *)__job_desc); \ +#define RunJob(name, ...) \ +do { \ + name##_Desc __desc = { .count = 1, .pool = JobPool_Inherit, .func = name, __VA_ARGS__ }; \ + Job *__job = OpenJob(__desc.func, __desc.pool); \ + __job->count = __desc.count; \ + __job->counter = __desc.counter; \ + __job->sig = PushStructNoZero(__job->arena, name##_Sig); \ + CopyBytes(__job->sig, &__desc.sig, sizeof(__desc.sig)); \ + CloseJob(__job); \ } while (0) -void RunJobEx(GenericJobDesc *desc); - -//////////////////////////////// -//~ @hookdecl Helpers - -i64 TimeNs(void); -u32 GetLogicalProcessorCount(void); -i64 GetCurrentSchedulerPeriodNs(void); +Job *OpenJob(JobFunc *func, JobPool pool_kind); +void CloseJob(Job *job); +#define YieldOnJobs(counter) (YieldOnFence(&(counter)->jobs_completed_fence, Atomic64Fetch(&(counter)->num_jobs_dispatched.v))) diff --git a/src/base/base_resource.c b/src/base/base_resource.c index 2cf79ef6..d22e6a60 100644 --- a/src/base/base_resource.c +++ b/src/base/base_resource.c @@ -3,7 +3,7 @@ SharedResourceState shared_resource_state = ZI; //////////////////////////////// //~ Startup -void InitBaseResources(u64 archive_strings_count, String *archive_strings) +void InitResourceSystem(u64 archive_strings_count, String *archive_strings) { SharedResourceState *g = &shared_resource_state; Arena *perm = PermArena(); diff --git a/src/base/base_resource.h b/src/base/base_resource.h index 694f7bc0..71ae39db 100644 --- a/src/base/base_resource.h +++ b/src/base/base_resource.h @@ -18,7 +18,7 @@ Struct(ResourceEntryBin) }; //////////////////////////////// -//~ State +//~ State types #define NumResourceEntryBins 4096 @@ -28,14 +28,12 @@ Struct(SharedResourceState) ResourceEntry *last_entry; u64 entries_count; ResourceEntryBin bins[NumResourceEntryBins]; -}; - -extern SharedResourceState shared_resource_state; +} extern shared_resource_state; //////////////////////////////// //~ Startup -void InitBaseResources(u64 archive_strings_count, String *archive_strings); +void InitResourceSystem(u64 archive_strings_count, String *archive_strings); //////////////////////////////// //~ Resource operations diff --git a/src/base/base_snc.c b/src/base/base_snc.c index f50d898a..228d9ec1 100644 --- a/src/base/base_snc.c +++ b/src/base/base_snc.c @@ -49,7 +49,7 @@ Lock LockSpinE(Mutex *m, i32 spin) } else { - FutexYield(&m->v, &v, 4, I64Max); + FutexYieldNeq(&m->v, &v, 4); spin_cnt = 0; } } @@ -96,7 +96,7 @@ Lock LockSpinS(Mutex *m, i32 spin) } else { - FutexYield(&m->v, &v, 4, I64Max); + FutexYieldNeq(&m->v, &v, 4); spin_cnt = 0; } } @@ -131,7 +131,7 @@ void Unlock(Lock *l) { Atomic32FetchAdd(&m->v, -1); } - FutexWake(&m->v, I32Max); + FutexWakeNeq(&m->v); ZeroStruct(l); } @@ -139,11 +139,6 @@ void Unlock(Lock *l) //~ Condition variable void YieldOnCv(Cv *cv, Lock *l) -{ - YieldOnCvTime(cv, l, I64Max); -} - -void YieldOnCvTime(Cv *cv, Lock *l, i64 timeout_ns) { u64 old_wake_gen = Atomic64Fetch(&cv->wake_gen); Mutex *mutex = l->mutex; @@ -151,7 +146,7 @@ void YieldOnCvTime(Cv *cv, Lock *l, i64 timeout_ns) { Unlock(l); { - FutexYield(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), timeout_ns); + FutexYieldNeq(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen)); } if (exclusive) { @@ -164,10 +159,10 @@ void YieldOnCvTime(Cv *cv, Lock *l, i64 timeout_ns) } } -void SignalCv(Cv *cv, i32 count) +void SignalCv(Cv *cv) { Atomic64FetchAdd(&cv->wake_gen, 1); - FutexWake(&cv->wake_gen, count); + FutexWakeNeq(&cv->wake_gen); } //////////////////////////////// @@ -184,7 +179,7 @@ void AddCounter(Counter *counter, i64 x) i64 new_v = old_v + x; if (old_v > 0 && new_v <= 0) { - FutexWake(&counter->v, I32Max); + FutexWakeNeq(&counter->v); } } @@ -193,7 +188,42 @@ void YieldOnCounter(Counter *counter) i64 v = Atomic64Fetch(&counter->v); while (v > 0) { - FutexYield(&counter->v, &v, sizeof(v), I64Max); + FutexYieldNeq(&counter->v, &v, sizeof(v)); v = Atomic64Fetch(&counter->v); } } + +//////////////////////////////// +//~ Fence + +u64 FetchFence(Fence *fence) +{ + return Atomic64Fetch(&fence->v.v); +} + +u64 FetchSetFence(Fence *fence, u64 x) +{ + u64 fetch = Atomic64FetchSet(&fence->v.v, x); + if (x > fetch) + { + FutexWakeGte(&fence->v.v); + } + return fetch; +} + +u64 FetchAddFence(Fence *fence, u64 x) +{ + u64 fetch = Atomic64FetchAdd(&fence->v.v, x); + FutexWakeGte(&fence->v.v); + return fetch; +} + +void YieldOnFence(Fence *fence, u64 target) +{ + u64 v = Atomic64Fetch(&fence->v.v); + while (v < target) + { + FutexYieldGte(&fence->v.v, &v, sizeof(v)); + v = Atomic64Fetch(&fence->v.v); + } +} diff --git a/src/base/base_snc.h b/src/base/base_snc.h index e4a04c42..240b2e18 100644 --- a/src/base/base_snc.h +++ b/src/base/base_snc.h @@ -3,7 +3,7 @@ #define DefaultMutexSpin 4000 -AlignedStruct(Mutex, 64) +AlignedStruct(Mutex, CachelineSize) { /* Bit 31 = Exclusive lock is held * Bit 30 = Exclusive lock is pending @@ -13,13 +13,9 @@ AlignedStruct(Mutex, 64) #if RtcIsEnabled Atomic32 exclusive_fiber_id; - u8 _pad[56]; -#else - u8 _pad[60]; #endif }; -StaticAssert(sizeof(Mutex) == 64); /* Padding validation */ -StaticAssert(alignof(Mutex) == 64); /* Prevent false sharing */ +StaticAssert(alignof(Mutex) == CachelineSize && sizeof(Mutex) % CachelineSize == 0); Struct(Lock) { @@ -30,24 +26,28 @@ Struct(Lock) //////////////////////////////// //~ Condition variable types -AlignedStruct(Cv, 64) +AlignedStruct(Cv, CachelineSize) { Atomic64 wake_gen; - u8 _pad[56]; }; -StaticAssert(sizeof(Cv) == 64); /* Padding validation */ -StaticAssert(alignof(Cv) == 64); /* Prevent false sharing */ +StaticAssert(alignof(Cv) == CachelineSize && sizeof(Cv) % CachelineSize == 0); //////////////////////////////// //~ Counter types -AlignedStruct(Counter, 64) +AlignedStruct(Counter, CachelineSize) { Atomic64 v; - u8 _pad[56]; }; -StaticAssert(sizeof(Counter) == 64); /* Padding validation */ -StaticAssert(alignof(Counter) == 64); /* Prevent false sharing */ +StaticAssert(alignof(Counter) == CachelineSize && sizeof(Counter) % CachelineSize == 0); + +//////////////////////////////// +//~ Fence types + +Struct(Fence) +{ + Atomic64Padded v; +}; //////////////////////////////// //~ Mutex operations @@ -73,8 +73,7 @@ void Unlock(Lock *lock); //~ Condition variable operations void YieldOnCv(Cv *cv, Lock *lock); -void YieldOnCvTime(Cv *cv, Lock *l, i64 timeout_ns); -void SignalCv(Cv *cv, i32 count); +void SignalCv(Cv *cv); //////////////////////////////// //~ Counter operations @@ -82,3 +81,12 @@ void SignalCv(Cv *cv, i32 count); i64 ValueFromCounter(Counter *counter); void AddCounter(Counter *counter, i64 x); void YieldOnCounter(Counter *counter); + +//////////////////////////////// +//~ Fence operations + +u64 FetchFence(Fence *fence); +u64 FetchSetFence(Fence *fence, u64 x); +u64 FetchAddFence(Fence *fence, u64 x); + +void YieldOnFence(Fence *fence, u64 target); diff --git a/src/base/base_win32/base_win32.c b/src/base/base_win32/base_win32.c index 8d3421d0..a5d572e7 100644 --- a/src/base/base_win32/base_win32.c +++ b/src/base/base_win32/base_win32.c @@ -44,6 +44,15 @@ StringList GetCommandLineArgs(void) return result; } +void Echo(String msg) +{ + HANDLE console_handle = GetStdHandle(STD_OUTPUT_HANDLE); + if (console_handle != INVALID_HANDLE_VALUE) + { + WriteFile(console_handle, msg.text, msg.len, 0, 0); + } +} + b32 Panic(String msg) { char msg_cstr[4096]; @@ -73,6 +82,20 @@ b32 IsRunningInDebugger(void) return IsDebuggerPresent(); } +i64 TimeNs(void) +{ + struct W32_SharedState *g = &W32_shared_state; + LARGE_INTEGER qpc; + QueryPerformanceCounter(&qpc); + i64 result = (qpc.QuadPart - g->timer_start_qpc) * g->ns_per_qpc; + return result; +} + +u32 GetNumHardwareThreads(void) +{ + return GetActiveProcessorCount(ALL_PROCESSOR_GROUPS); +} + void TrueRand(String buffer) { BCryptGenRandom(BCRYPT_RNG_ALG_HANDLE, (u8 *)buffer.text, buffer.len, 0); @@ -216,20 +239,35 @@ i32 W32_Main(void) g->exit_begin_event = CreateEventW(0, 1, 0, 0); g->exit_end_event = CreateEventW(0, 1, 0, 0); + /* Query performance frequency */ + { + LARGE_INTEGER qpf; + QueryPerformanceFrequency(&qpf); + g->ns_per_qpc = 1000000000 / qpf.QuadPart; + } + { + LARGE_INTEGER qpc; + QueryPerformanceCounter(&qpc); + g->timer_start_qpc = qpc.QuadPart; + } + g->main_thread_id = GetCurrentThreadId(); SetThreadDescription(GetCurrentThread(), L"Main thread"); /* Query system info */ GetSystemInfo(&g->info); - /* Startup workers */ - InitJobWorkers(); + /* Init job system */ + InitJobSystem(); + + /* Init futex system */ + InitFutexSystem(); /* Init resources */ { W32_FindEmbeddedDataCtx ctx = ZI; EnumResourceNamesW(0, RT_RCDATA, &W32_FindEmbeddedRcData, (LONG_PTR)&ctx); - InitBaseResources(ctx.embedded_strings_count, ctx.embedded_strings); + InitResourceSystem(ctx.embedded_strings_count, ctx.embedded_strings); } //- App startup @@ -237,7 +275,7 @@ i32 W32_Main(void) /* Startup layers */ if (!Atomic32Fetch(&g->panicking)) { - RunJob(1, W32_StartupLayersJob, JobPool_Hyper, JobPriority_High, 0, 0); + RunJob(W32_StartupLayersJob); } /* Wait for startup end or panic */ @@ -265,7 +303,7 @@ i32 W32_Main(void) /* Run exit callbacks job */ if (!Atomic32Fetch(&g->panicking)) { - RunJob(1, W32_ShutdownLayersJob, JobPool_Hyper, JobPriority_High, 0, 0); + RunJob(W32_ShutdownLayersJob); } /* Wait for exit end or panic */ @@ -293,19 +331,13 @@ i32 W32_Main(void) #if CrtlibIsEnabled # if IsConsoleApp -int main(char **argc, int argv) +int main(UNUSED char **argc, UNUSED int argv) { - LAX argc; - LAX argv; return W32_Main(); } # else -int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, _In_ LPWSTR cmdline_wstr, _In_ int show_code) +int CALLBACK wWinMain(UNUSED _In_ HINSTANCE instance, UNUSED _In_opt_ HINSTANCE prev_instance, UNUSED _In_ LPWSTR cmdline_wstr, UNUSED _In_ int show_code) { - LAX instance; - LAX prev_instance; - LAX cmdline_wstr; - LAX show_code; return W32_Main(); } # endif /* IsConsoleApp */ diff --git a/src/base/base_win32/base_win32.h b/src/base/base_win32/base_win32.h index 49c06752..b55255f6 100644 --- a/src/base/base_win32/base_win32.h +++ b/src/base/base_win32/base_win32.h @@ -20,7 +20,7 @@ Struct(W32_FindEmbeddedDataCtx) }; //////////////////////////////// -//~ State +//~ State types #define W32_MaxOnExitFuncs 4096 @@ -31,6 +31,9 @@ Struct(W32_SharedState) Atomic32 shutdown; Atomic32 exit_code; + i64 timer_start_qpc; + i64 ns_per_qpc; + //- Application control flow Atomic32 panicking; wchar_t panic_wstr[4096]; @@ -42,9 +45,7 @@ Struct(W32_SharedState) //- Exit funcs Atomic32 num_exit_funcs; ExitFunc *exit_funcs[W32_MaxOnExitFuncs]; -}; - -extern W32_SharedState W32_shared_state; +} extern W32_shared_state; //////////////////////////////// //~ Embedded data initialization diff --git a/src/base/base_win32/base_win32_job.c b/src/base/base_win32/base_win32_job.c index b8363567..777ecff6 100644 --- a/src/base/base_win32/base_win32_job.c +++ b/src/base/base_win32/base_win32_job.c @@ -1,58 +1,23 @@ -W32_SharedJobCtx W32_shared_job_ctx = ZI; +W32_SharedJobState W32_shared_job_state = ZI; //////////////////////////////// //~ @hookdef Startup -void InitJobWorkers(void) +void InitJobSystem(void) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; - - /* Init timer */ - { - LARGE_INTEGER qpf; - QueryPerformanceFrequency(&qpf); - g->ns_per_qpc = 1000000000 / qpf.QuadPart; - } - { - LARGE_INTEGER qpc; - QueryPerformanceCounter(&qpc); - g->timer_start_qpc = qpc.QuadPart; - } - /* Init fibers */ + W32_SharedJobState *g = &W32_shared_job_state; g->num_fibers = 1; /* Fiber at index 0 always nil */ - g->fiber_names_arena = AcquireArena(Gibi(64)); - - /* Init job descs */ - g->job_descs_arena = AcquireArena(Gibi(64)); - - /* Convert main thread to fiber */ - W32_AcquireFiber(0); - - /* Init wait lists */ - g->wait_lists_arena = AcquireArena(Gibi(64)); + W32_AcquireFiber(0); /* Convert main thread to fiber */ + Arena *perm = PermArena(); /* Init job pools */ for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) { W32_JobPool *pool = &g->job_pools[pool_kind]; - - /* Init queues */ - for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues); ++priority) - { - W32_JobQueue *queue = &pool->job_queues[priority]; - queue->arena = AcquireArena(Gibi(64)); - } + /* FIXME */ } - /* Init threads pool */ - g->threads_arena = AcquireArena(Gibi(64)); - - /* Start job scheduler */ - Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, W32_DefaultSchedulerPeriodNs); - W32_Thread *scheduler_thread = W32_AcquireThread(W32_JobSchedulerEntryFunc, 0, Lit("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); - LAX scheduler_thread; - //- Start job workers /* TODO: Heuristic worker counts & affinities */ { @@ -64,8 +29,6 @@ void InitJobWorkers(void) i32 prof_group = PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind); switch (pool_kind) { - default: Assert(0); break; - case JobPool_Sim: { name_fmt = Lit("Sim worker #%F"); @@ -113,16 +76,15 @@ void InitJobWorkers(void) pool->thread_priority = THREAD_PRIORITY_HIGHEST; } break; } - pool->worker_threads_arena = AcquireArena(Gibi(64)); - pool->worker_threads = PushStructs(pool->worker_threads_arena, W32_Thread *, pool->num_worker_threads); - pool->worker_contexts = PushStructs(pool->worker_threads_arena, W32_WorkerCtx, pool->num_worker_threads); + pool->worker_threads = PushStructs(perm, W32_Thread *, pool->num_worker_threads); + pool->worker_contexts = PushStructs(perm, W32_WorkerCtx, pool->num_worker_threads); for (i32 i = 0; i < pool->num_worker_threads; ++i) { W32_WorkerCtx *ctx = &pool->worker_contexts[i]; ctx->pool_kind = pool_kind; ctx->id = i; - String name = FormatString(pool->worker_threads_arena, name_fmt, FmtSint(i)); - pool->worker_threads[i] = W32_AcquireThread(W32_JobWorkerEntryFunc, ctx, name, prof_group + i); + String name = FormatString(perm, name_fmt, FmtSint(i)); + pool->worker_threads[i] = W32_StartThread(W32_JobWorkerEntryFunc, ctx, name, prof_group + i); } } } @@ -130,97 +92,31 @@ void InitJobWorkers(void) //OnExit(ShutdownJobs); } -//////////////////////////////// -//~ Shutdown - -#if 0 -void ShutdownJobs(void) -{ - /* Signal shutdown */ - if (!Atomic32Fetch(&g->panicking)) - { - Atomic32FetchSet(&g->shutdown, 1); - for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) - { - W32_JobPool *pool = &g->job_pools[pool_kind]; - LockTicketMutex(&pool->workers_wake_tm); - { - Atomic32FetchSet(&pool->workers_shutdown.v, 1); - Atomic64FetchSet(&pool->num_jobs_in_queue.v, -100000); - WakeByAddressAll(&pool->num_jobs_in_queue); - } - UnlockTicketMutex(&pool->workers_wake_tm); - } - } - - /* Wait on worker threads */ - if (!Atomic32Fetch(&g->panicking)) - { - for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) - { - W32_JobPool *pool = &g->job_pools[pool_kind]; - for (i32 i = 0; i < pool->num_worker_threads; ++i) - { - W32_Thread *worker_thread = pool->worker_threads[i]; - W32_WaitReleaseThread(worker_thread); - } - } - } - - /* Wait on scheduler thread */ - if (!Atomic32Fetch(&g->panicking)) - { - W32_WaitReleaseThread(scheduler_thread); - } - - /* Find any dangling threads that haven't exited gracefully by now */ - if (!Atomic32Fetch(&g->panicking)) - { - LockTicketMutex(&g->threads_tm); - if (g->first_thread) - { - TempArena scratch = BeginScratchNoConflict(); - u64 num_dangling_threads = 0; - String threads_msg = ZI; - threads_msg.text = PushDry(scratch.arena, u8); - for (W32_Thread *t = g->first_thread; t; t = t->next) - { - String name = StringFromCstr(t->thread_name_cstr, countof(t->thread_name_cstr)); - threads_msg.len += StringF(scratch.arena, " \"%F\"\n", FmtString(name)).len; - ++num_dangling_threads; - } - threads_msg = FormatString(scratch.arena, Lit("%F dangling thread(s):\n%F"), FmtUint(num_dangling_threads), FmtString(threads_msg)); - //Panic(threads_msg); - EndScratch(scratch); - } - UnlockTicketMutex(&g->threads_tm); - } -} -#endif - //////////////////////////////// //~ Win32 thread DWORD WINAPI W32_Win32ThreadProc(LPVOID vt) { + /* Convert thread to fiber */ W32_AcquireFiber(0); + Arena *perm = PermArena(); W32_Thread *t = (W32_Thread *)vt; - __profthread(t->thread_name_cstr, t->profiler_group); + char *thread_name_cstr = CstrFromString(perm, t->thread_name); + wchar_t *thread_name_wstr = WstrFromString(perm, t->thread_name); + + __profthread(thread_name_cstr, t->profiler_group); /* Initialize COM */ CoInitializeEx(0, COINIT_MULTITHREADED); /* Set thread name */ - if (t->thread_name_wstr[0] != 0) - { - SetThreadDescription(GetCurrentThread(), t->thread_name_wstr); - } + SetThreadDescription(GetCurrentThread(), thread_name_wstr); //P_LogInfoF("New thread \"%F\" created with ID %F", FmtString(StringFromCstrNoLimit(t->thread_name_cstr)), FmtUint(ThreadId())); /* Enter thread entry point */ - t->entry_point(t->thread_data); + t->entry_point(t->thread_udata); /* Uninitialize COM */ CoUninitialize(); @@ -228,58 +124,21 @@ DWORD WINAPI W32_Win32ThreadProc(LPVOID vt) return 0; } -W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, String thread_name, i32 profiler_group) +W32_Thread *W32_StartThread(W32_ThreadFunc *entry_point, void *thread_udata, String thread_name, i32 profiler_group) { __prof; + W32_SharedJobState *g = &W32_shared_job_state; TempArena scratch = BeginScratchNoConflict(); - W32_SharedJobCtx *g = &W32_shared_job_ctx; + Arena *perm = PermArena(); Assert(entry_point != 0); //P_LogInfoF("Creating thread \"%F\"", FmtString(thread_name)); - /* Acquire thread object */ - W32_Thread *t = 0; - { - LockTicketMutex(&g->threads_tm); - if (g->first_free_thread) - { - t = g->first_free_thread; - g->first_free_thread = t->next; - } - else - { - t = PushStructNoZero(g->threads_arena, W32_Thread); - } - ZeroStruct(t); - if (g->last_thread) - { - g->last_thread->next = t; - t->prev = g->last_thread; - } - else - { - g->first_thread = t; - } - g->last_thread = t; - UnlockTicketMutex(&g->threads_tm); - } - + W32_Thread *t = PushStruct(perm, W32_Thread); + t->thread_name = PushString(perm, thread_name); t->entry_point = entry_point; - t->thread_data = thread_data; + t->thread_udata = thread_udata; t->profiler_group = profiler_group; - /* Copy thread name to params */ - { - u64 CstrLen = MinU64((countof(t->thread_name_cstr) - 1), thread_name.len); - CopyBytes(t->thread_name_cstr, thread_name.text, CstrLen * sizeof(*t->thread_name_cstr)); - t->thread_name_cstr[CstrLen] = 0; - } - { - String16 thread_name16 = String16FromString(scratch.arena, thread_name); - u64 WstrLen = MinU64((countof(t->thread_name_wstr) - 1), thread_name16.len); - CopyBytes(t->thread_name_wstr, thread_name16.text, WstrLen * sizeof(*t->thread_name_wstr)); - t->thread_name_wstr[WstrLen] = 0; - } - t->handle = CreateThread( 0, W32_ThreadStackSize, @@ -291,7 +150,7 @@ W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, St if (!t->handle) { - //Panic(Lit("Failed to create thread")); + Panic(Lit("Failed to create thread")); } EndScratch(scratch); @@ -299,10 +158,10 @@ W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, St } /* Returns 0 if the thread could not release in specified timeout (e.g. because it is still running) */ -b32 W32_TryReleaseThread(W32_Thread *thread, f32 timeout_seconds) +b32 W32_TryEndThread(W32_Thread *thread, f32 timeout_seconds) { __prof; - W32_SharedJobCtx *g = &W32_shared_job_ctx; + W32_SharedJobState *g = &W32_shared_job_state; b32 success = 0; W32_Thread *t = (W32_Thread *)thread; HANDLE handle = t->handle; @@ -316,390 +175,29 @@ b32 W32_TryReleaseThread(W32_Thread *thread, f32 timeout_seconds) /* Release thread */ success = 1; CloseHandle(handle); - { - LockTicketMutex(&g->threads_tm); - { - W32_Thread *prev = t->prev; - W32_Thread *next = t->next; - if (prev) - { - prev->next = next; - } - else - { - g->first_thread = next; - } - if (next) - { - next->prev = prev; - } - else - { - g->last_thread = prev; - } - t->next = g->first_free_thread; - g->first_free_thread = t; - } - UnlockTicketMutex(&g->threads_tm); - } } } return success; } -void W32_WaitReleaseThread(W32_Thread *thread) +void W32_WaitEndThread(W32_Thread *thread) { __prof; - b32 success = W32_TryReleaseThread(thread, F32Infinity); + b32 success = W32_TryEndThread(thread, F32Infinity); Assert(success); - LAX success; } //////////////////////////////// -//~ Win32 wait list +//~ Pool operations -W32_WaitBin *W32_WaitBinFromAddr(u64 addr) +W32_JobPool *W32_JobPoolFromKind(JobPool pool_kind) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; - u64 mixed = RandU64FromSeed(addr); - return &g->wait_addr_bins[mixed % W32_NumWaitAddrBins]; -} - -W32_WaitBin *W32_WaitBinFromTime(u64 time) -{ - W32_SharedJobCtx *g = &W32_shared_job_ctx; - u64 mixed = RandU64FromSeed(time); - return &g->wait_addr_bins[mixed % W32_NumWaitTimeBins]; -} - -/* REQUIRED: Caller must have acquired `wake_lock` for each fiber in array */ -void W32_WakeLockedFibers(i32 num_fibers, W32_Fiber **fibers) -{ - W32_SharedJobCtx *g = &W32_shared_job_ctx; - - /* Update wait lists */ - for (i32 i = 0; i < num_fibers; ++i) + if (pool_kind == JobPool_Inherit) { - W32_Fiber *fiber = fibers[i]; - u64 wait_addr = fiber->wait_addr; - u64 wait_time = fiber->wait_time; - - /* Lock & search wait bins */ - /* TODO: Cache these in parameters since caller has one of them already calculated */ - W32_WaitBin *wait_addr_bin = 0; - W32_WaitBin *wait_time_bin = 0; - W32_WaitList *wait_addr_list = 0; - W32_WaitList *wait_time_list = 0; - if (wait_addr != 0) - { - wait_addr_bin = W32_WaitBinFromAddr(wait_addr); - LockTicketMutex(&wait_addr_bin->lock); - for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)wait_addr) - { - wait_addr_list = tmp; - } - } - } - if (wait_time != 0) - { - wait_time_bin = W32_WaitBinFromTime(wait_time); - LockTicketMutex(&wait_time_bin->lock); - for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)wait_time) - { - wait_time_list = tmp; - } - } - } - { - /* Remove from addr list */ - if (wait_addr_list) - { - if (--wait_addr_list->num_waiters == 0) - { - /* Free addr list */ - W32_WaitList *prev = wait_addr_list->prev_in_bin; - W32_WaitList *next = wait_addr_list->next_in_bin; - if (prev) - { - prev->next_in_bin = next; - } - else - { - wait_addr_bin->first_wait_list = next; - } - if (next) - { - next->prev_in_bin = prev; - } - else - { - wait_addr_bin->last_wait_list = prev; - } - wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; - wait_addr_bin->first_free_wait_list = wait_addr_list; - } - else - { - i16 prev_id = fiber->prev_addr_waiter; - i16 next_id = fiber->next_addr_waiter; - if (prev_id) - { - W32_FiberFromId(prev_id)->next_addr_waiter = next_id; - } - else - { - wait_addr_list->first_waiter = next_id; - } - if (next_id) - { - W32_FiberFromId(next_id)->prev_addr_waiter = prev_id; - } - else - { - wait_addr_list->last_waiter = prev_id; - } - } - fiber->wait_addr = 0; - fiber->prev_addr_waiter = 0; - fiber->next_addr_waiter = 0; - } - /* Remove from time list */ - if (wait_time_list) - { - if (--wait_time_list->num_waiters == 0) - { - /* Free time list */ - W32_WaitList *prev = wait_time_list->prev_in_bin; - W32_WaitList *next = wait_time_list->next_in_bin; - if (prev) - { - prev->next_in_bin = next; - } - else - { - wait_time_bin->first_wait_list = next; - } - if (next) - { - next->prev_in_bin = prev; - } - else - { - wait_time_bin->last_wait_list = prev; - } - wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; - wait_time_bin->first_free_wait_list = wait_time_list; - } - else - { - i16 prev_id = fiber->prev_time_waiter; - i16 next_id = fiber->next_time_waiter; - if (prev_id) - { - W32_FiberFromId(prev_id)->next_time_waiter = next_id; - } - else - { - wait_time_list->first_waiter = next_id; - } - if (next_id) - { - W32_FiberFromId(next_id)->prev_time_waiter = prev_id; - } - else - { - wait_time_list->last_waiter = prev_id; - } - } - fiber->wait_time = 0; - fiber->prev_time_waiter = 0; - fiber->next_time_waiter = 0; - } - /* Unlock fiber */ - Atomic32FetchSet(&fiber->wake_lock, 0); - } - /* Unlock wait bins */ - if (wait_time_bin != 0) UnlockTicketMutex(&wait_time_bin->lock); - if (wait_addr_bin != 0) UnlockTicketMutex(&wait_addr_bin->lock); + W32_Fiber *fiber = W32_FiberFromId(FiberId()); + pool_kind = fiber->pool; } - - /* Resume jobs */ - /* TODO: Batch submit waiters based on queue kind rather than one at a time */ - i32 job_counts_per_pool[JobPool_Count] = ZI; - for (i32 i = 0; i < num_fibers; ++i) - { - W32_Fiber *fiber = fibers[i]; - JobPool pool_kind = fiber->job_pool; - ++job_counts_per_pool[pool_kind]; - W32_JobPool *pool = &g->job_pools[pool_kind]; - W32_JobQueue *queue = &pool->job_queues[fiber->job_priority]; - LockTicketMutex(&queue->lock); - { - W32_JobInfo *info = 0; - if (queue->first_free) - { - info = queue->first_free; - queue->first_free = info->next; - } - else - { - info = PushStructNoZero(queue->arena, W32_JobInfo); - } - ZeroStruct(info); - info->count = 1; - info->num_dispatched = fiber->job_id; - info->fiber_id = fiber->id; - info->desc = fiber->job_desc; - if (queue->first) - { - info->next = queue->first; - } - else - { - queue->last = info; - } - queue->first = info; - } - UnlockTicketMutex(&queue->lock); - } - - /* Wake workers */ - if (num_fibers > 0) - { - for (JobPool pool_kind = 0; pool_kind < (i32)countof(job_counts_per_pool); ++pool_kind) - { - i32 job_count = job_counts_per_pool[pool_kind]; - if (job_count > 0) - { - W32_JobPool *pool = &g->job_pools[pool_kind]; - LockTicketMutex(&pool->workers_wake_tm); - { - Atomic64FetchAdd(&pool->num_jobs_in_queue.v, job_count); - if (job_count >= W32_WakeAllThreshold) - { - WakeByAddressAll(&pool->num_jobs_in_queue); - } - else - { - for (i32 i = 0; i < job_count; ++i) - { - WakeByAddressSingle(&pool->num_jobs_in_queue); - } - } - } - UnlockTicketMutex(&pool->workers_wake_tm); - } - } - } -} - -void W32_WakeByAddress(void *addr, i32 count) -{ - TempArena scratch = BeginScratchNoConflict(); - - W32_WaitBin *wait_addr_bin = W32_WaitBinFromAddr((u64)addr); - W32_WaitList *wait_addr_list = 0; - - /* Get list of waiting fibers */ - i32 num_fibers = 0; - W32_Fiber **fibers = 0; - { - LockTicketMutex(&wait_addr_bin->lock); - { - /* Search for wait addr list */ - for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)addr) - { - wait_addr_list = tmp; - } - } - - /* Lock fibers & build array */ - if (wait_addr_list) - { - fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_addr_list->num_waiters); - for (W32_Fiber *fiber = W32_FiberFromId(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = W32_FiberFromId(fiber->next_addr_waiter)) - { - if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0) - { - fibers[num_fibers] = fiber; - ++num_fibers; - } - } - } - } - UnlockTicketMutex(&wait_addr_bin->lock); - } - - if (num_fibers > 0) - { - W32_WakeLockedFibers(num_fibers, fibers); - } - - /* Wake win32 blocking thread waiters */ - if (count >= W32_WakeAllThreshold) - { - WakeByAddressAll(addr); - } - else - { - for (i32 i = 0; i < count; ++i) - { - WakeByAddressSingle(addr); - } - } - - EndScratch(scratch); -} - -void W32_WakeByTime(u64 time) -{ - TempArena scratch = BeginScratchNoConflict(); - - W32_WaitBin *wait_time_bin = W32_WaitBinFromTime(time); - W32_WaitList *wait_time_list = 0; - - /* Build list of waiters to resume */ - i32 num_fibers = 0; - W32_Fiber **fibers = 0; - { - LockTicketMutex(&wait_time_bin->lock); - { - /* Search for wait time list */ - for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)time) - { - wait_time_list = tmp; - } - } - - if (wait_time_list) - { - /* Set waiter wake status & build fibers list */ - fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_time_list->num_waiters); - for (W32_Fiber *fiber = W32_FiberFromId(wait_time_list->first_waiter); fiber; fiber = W32_FiberFromId(fiber->next_time_waiter)) - { - if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0) - { - fibers[num_fibers] = fiber; - ++num_fibers; - } - - } - } - } - UnlockTicketMutex(&wait_time_bin->lock); - } - - W32_WakeLockedFibers(num_fibers, fibers); - - EndScratch(scratch); + return &W32_shared_job_state.job_pools[pool_kind]; } //////////////////////////////// @@ -709,7 +207,7 @@ void W32_WakeByTime(u64 time) /* If `pool` is 0, then the currently running thread will be converted into a fiber */ W32_Fiber *W32_AcquireFiber(W32_JobPool *pool) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; + W32_SharedJobState *g = &W32_shared_job_state; i16 fiber_id = 0; W32_Fiber *fiber = 0; char *new_name_cstr = 0; @@ -733,9 +231,13 @@ W32_Fiber *W32_AcquireFiber(W32_JobPool *pool) fiber_id = g->num_fibers++; if (fiber_id >= MaxFibers) { - //Panic(Lit("Max fibers reached")); + Panic(Lit("Max fibers reached")); } fiber = &g->fibers[fiber_id]; + if (!g->fiber_names_arena) + { + g->fiber_names_arena = AcquireArena(Gibi(64)); + } new_name_cstr = PushStructs(g->fiber_names_arena, char, W32_FiberNameMaxSize); } } @@ -787,6 +289,7 @@ W32_Fiber *W32_AcquireFiber(W32_JobPool *pool) { __profn("CreateFiber"); fiber->addr = CreateFiber(W32_FiberStackSize, W32_FiberEntryPoint, (void *)(i64)fiber_id); + fiber->pool = pool->kind; } else { @@ -795,16 +298,7 @@ W32_Fiber *W32_AcquireFiber(W32_JobPool *pool) fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id); } } - fiber->wait_addr = 0; - fiber->wait_time = 0; - fiber->prev_addr_waiter = 0; - fiber->next_addr_waiter = 0; - fiber->prev_time_waiter = 0; - fiber->next_time_waiter = 0; - fiber->job_id = 0; - fiber->job_pool = 0; - fiber->job_priority = 0; - fiber->yield_param = 0; + fiber->task = 0; fiber->parent_id = 0; return fiber; } @@ -825,84 +319,63 @@ void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber) //- Fiber id ForceInline W32_Fiber *W32_FiberFromId(i16 id) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; - if (id <= 0) - { - return 0; - } - else - { - return &g->fibers[id]; - } + return id > 0 ? &W32_shared_job_state.fibers[id] : 0; } -//- Fiber control flow - -void W32_FiberResume(W32_Fiber *fiber) +void W32_SwitchToFiber(W32_Fiber *fiber) { SwitchToFiber(fiber->addr); } -void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber) -{ - LAX fiber; - Assert(fiber->id == FiberId()); - Assert(parent_fiber->id == fiber->parent_id); - Assert(parent_fiber->id > 0); - { - __prof_fiber_leave(); - W32_FiberResume(parent_fiber); - __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id); - } -} - //////////////////////////////// //~ Win32 fiber entry -void W32_FiberEntryPoint(void *id_ptr) +void W32_FiberEntryPoint(void *win32_fiber_data) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; - i16 id = (i32)(i64)id_ptr; - volatile W32_Fiber *fiber = W32_FiberFromId(id); - __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id); + i16 fiber_id = (i16)(i64)win32_fiber_data; + volatile W32_Fiber *fiber = W32_FiberFromId(fiber_id); + W32_JobPool *pool = W32_JobPoolFromKind(fiber->pool); + JobPool pool_kind = fiber->pool; + char *fiber_name_cstr = fiber->name_cstr; for (;;) { - //- Run job + __prof_fiber_enter(fiber_name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind) + Kibi(1) + fiber->id); + W32_Task *task = fiber->task; + Job *job = task->job; + + /* Run task */ + job->func(job->sig, task->task_id); + + /* Check if we've completed the last task in the job */ + if (Atomic32FetchAdd(&job->num_tasks_completed.v, 1) + 1 >= job->count) { - W32_YieldParam *yield_param = fiber->yield_param; - yield_param->kind = W32_YieldKind_None; + /* Increment fence */ + if (job->counter) { - MemoryBarrier(); - GenericJobDesc *job_desc = fiber->job_desc; - GenericJobFunc *job_func = job_desc->func; - job_func(job_desc->sig, fiber->job_id); - MemoryBarrier(); + FetchAddFence(&job->counter->jobs_completed_fence, 1); } + /* Free job */ + LockTicketMutex(&pool->free_jobs_tm); + { + StackPush(pool->first_free_job, job); + } + UnlockTicketMutex(&pool->free_jobs_tm); } - //- Job completed + + /* Free task */ { - GenericJobDesc *job_desc = fiber->job_desc; - /* Check if we're the last completed dispatch in the job */ - if ((Atomic32FetchAdd(&job_desc->num_completed, 1) + 1) >= job_desc->count) + LockTicketMutex(&pool->free_tasks_tm); { - /* Decrement counter */ - if (job_desc->counter) - { - AddCounter(job_desc->counter, -1); - } - /* Release job desc */ - ResetArena(job_desc->arena); - LockTicketMutex(&g->job_descs_tm); - { - job_desc->next_free = g->first_free_job_desc; - g->first_free_job_desc = job_desc; - } - UnlockTicketMutex(&g->job_descs_tm); + StackPush(pool->first_free_task, task); } - /* Yield to worker */ - fiber->yield_param->kind = W32_YieldKind_Done; + UnlockTicketMutex(&pool->free_tasks_tm); + } + + /* Yield to worker */ + { + __prof_fiber_leave(); W32_Fiber *parent_fiber = W32_FiberFromId(fiber->parent_id); - W32_YieldFiber((W32_Fiber *)fiber, parent_fiber); + W32_SwitchToFiber(parent_fiber); } } } @@ -912,11 +385,11 @@ void W32_FiberEntryPoint(void *id_ptr) W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg) { - W32_SharedJobCtx *g = &W32_shared_job_ctx; W32_WorkerCtx *ctx = worker_ctx_arg; JobPool pool_kind = ctx->pool_kind; - W32_JobPool *pool = &g->job_pools[pool_kind]; - LAX ctx; + W32_JobPool *pool = W32_JobPoolFromKind(pool_kind); + i16 worker_fiber_id = FiberId(); + { /* TODO: Heuristic pinning */ @@ -928,7 +401,6 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg) __profn("Set priority"); b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0; Assert(success); - LAX success; } #if 0 @@ -950,7 +422,6 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg) } #endif Assert(success); - LAX success; } #endif @@ -961,571 +432,286 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg) DWORD task = 0; HANDLE mmc_handle = AvSetMmThreadCharacteristics(L"Pro Audio", &task); Assert(mmc_handle); - LAX mmc_handle; } } - i32 worker_fiber_id = FiberId(); - - W32_Fiber *job_fiber = 0; - b32 shutdown = 0; - while (!shutdown) + W32_Fiber *task_fiber = 0; + i64 tasks_count = 0; + while (tasks_count >= 0) { - //- Pull job from queue - GenericJobDesc *job_desc = 0; - JobPriority job_priority = 0; - i16 job_fiber_id = 0; - i32 job_id = 0; + W32_Task *task = 0; { - //__profnc("Pull job", Rgb32F(0.75, 0.75, 0)); - for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues) && !job_desc; ++priority) + LockTicketMutex(&pool->tasks_tm); { - W32_JobQueue *queue = &pool->job_queues[priority]; - if (queue) + task = pool->first_task; + if (task) { - LockTicketMutex(&queue->lock); - { - W32_JobInfo *info = queue->first; - while (info && !job_desc) - { - W32_JobInfo *next = info->next; - b32 dequeue = 0; - if (info->fiber_id <= 0) - { - job_id = info->num_dispatched++; - if (job_id < info->count) - { - /* Pick job */ - Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1); - job_priority = priority; - job_desc = info->desc; - if (job_id == (info->count - 1)) - { - /* We're picking up the last dispatch, so dequeue the job */ - dequeue = 1; - } - } - } - else - { - /* This job is to be resumed from a yield */ - Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1); - job_fiber_id = info->fiber_id; - job_priority = priority; - job_id = info->num_dispatched; - job_desc = info->desc; - dequeue = 1; - } - if (dequeue) - { - if (!next) - { - queue->last = 0; - } - queue->first = next; - info->next = queue->first_free; - queue->first_free = info; - } - info = next; - } - } - UnlockTicketMutex(&queue->lock); + QueuePop(pool->first_task, pool->last_task); + Atomic64FetchAdd(&pool->tasks_count.v, -1); } } + UnlockTicketMutex(&pool->tasks_tm); + } + + if (task) + { + if (task->fiber_id != 0) + { + /* Task is resuming with an existing fiber */ + if (task_fiber) + { + W32_ReleaseFiber(pool, task_fiber); + } + task_fiber = W32_FiberFromId(task->fiber_id); + } + else + { + /* Task is unstarted, wrap it in a fiber */ + if (!task_fiber) + { + task_fiber = W32_AcquireFiber(pool); + } + task_fiber->task = task; + task->fiber_id = task_fiber->id; + } + + /* Run task fiber */ + task_fiber->parent_id = worker_fiber_id; + W32_SwitchToFiber(task_fiber); + + if (Atomic8FetchTestSet(&task_fiber->is_suspending, 1, 0) == 1) + { + /* Fiber suspended during execution */ + task_fiber = 0; + } } - //- Release old fiber if resuming a yielded fiber - if (job_fiber_id > 0) + /* Park worker */ + tasks_count = Atomic64Fetch(&pool->tasks_count.v); + while (tasks_count == 0) { - if (job_fiber) - { - W32_ReleaseFiber(pool, job_fiber); - } - job_fiber = W32_FiberFromId(job_fiber_id); - } - - //- Run fiber - if (job_desc) - { - if (!job_fiber) - { - job_fiber = W32_AcquireFiber(pool); - } - job_fiber_id = job_fiber->id; - { - __profnc("Run fiber", Rgb32F(1, 1, 1)); - __profvalue(job_fiber->id); - W32_YieldParam yield = ZI; - job_fiber->parent_id = worker_fiber_id; - job_fiber->job_desc = job_desc; - job_fiber->job_id = job_id; - job_fiber->job_pool = pool_kind; - job_fiber->job_priority = job_priority; - job_fiber->yield_param = &yield; - b32 done = 0; - while (!done) - { - W32_FiberResume(job_fiber); - switch (yield.kind) - { - default: - { - /* Invalid yield kind */ - TempArena scratch = BeginScratchNoConflict(); - //Panic(FormatString(scratch.arena, Lit("Invalid fiber yield kind \"%F\""), FmtSint(yield.kind))); - EndScratch(scratch); - } break; - - //- Fiber is waiting - case W32_YieldKind_Wait: - { - __profn("Process fiber wait"); - volatile void *wait_addr = yield.wait.addr; - void *wait_cmp = yield.wait.cmp; - u32 wait_size = yield.wait.size; - i64 wait_timeout_ns = yield.wait.timeout_ns; - i64 wait_time = 0; - if (wait_timeout_ns > 0 && wait_timeout_ns < I64Max) - { - u64 current_scheduler_cycle = Atomic64Fetch(&g->current_scheduler_cycle.v); - i64 current_scheduler_cycle_period_ns = Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v); - wait_time = current_scheduler_cycle + MaxI64((i64)((f64)wait_timeout_ns / (f64)current_scheduler_cycle_period_ns), 1); - } - - W32_WaitBin *wait_addr_bin = W32_WaitBinFromAddr((u64)wait_addr); - W32_WaitBin *wait_time_bin = W32_WaitBinFromTime((u64)wait_time); - - if (wait_addr != 0) LockTicketMutex(&wait_addr_bin->lock); - { - if (wait_time != 0) LockTicketMutex(&wait_time_bin->lock); - { - //- Load and compare value at address now that wait bins are locked - b32 cancel_wait = wait_addr == 0 && wait_time == 0; - if (wait_addr != 0) - { - switch (wait_size) - { - default: cancel_wait = 1; Assert(0); break; /* Invalid wait size */ - case 1: cancel_wait = (u8)_InterlockedCompareExchange8(wait_addr, 0, 0) != *(u8 *)wait_cmp; break; - case 2: cancel_wait = (u16)_InterlockedCompareExchange16(wait_addr, 0, 0) != *(u16 *)wait_cmp; break; - case 4: cancel_wait = (u32)_InterlockedCompareExchange(wait_addr, 0, 0) != *(u32 *)wait_cmp; break; - case 8: cancel_wait = (u64)_InterlockedCompareExchange64(wait_addr, 0, 0) != *(u64 *)wait_cmp; break; - } - } - if (wait_time != 0 && !cancel_wait) - { - cancel_wait = wait_time <= Atomic64Fetch(&g->current_scheduler_cycle.v); - } - if (!cancel_wait) - { - if (wait_addr != 0) - { - //- Search for wait addr list in bin - W32_WaitList *wait_addr_list = 0; - for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)wait_addr) - { - wait_addr_list = tmp; - } - } - //- Acquire new wait addr list - if (!wait_addr_list) - { - if (wait_addr_bin->first_free_wait_list) - { - wait_addr_list = wait_addr_bin->first_free_wait_list; - wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin; - } - else - { - LockTicketMutex(&g->wait_lists_arena_tm); - { - wait_addr_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList); - } - UnlockTicketMutex(&g->wait_lists_arena_tm); - } - ZeroStruct(wait_addr_list); - wait_addr_list->value = (u64)wait_addr; - if (wait_addr_bin->last_wait_list) - { - wait_addr_bin->last_wait_list->next_in_bin = wait_addr_list; - wait_addr_list->prev_in_bin = wait_addr_bin->last_wait_list; - } - else - { - wait_addr_bin->first_wait_list = wait_addr_list; - } - wait_addr_bin->last_wait_list = wait_addr_list; - } - //- Insert fiber into wait addr list - job_fiber->wait_addr = (u64)wait_addr; - if (wait_addr_list->last_waiter) - { - W32_FiberFromId(wait_addr_list->last_waiter)->next_addr_waiter = job_fiber_id; - job_fiber->prev_addr_waiter = wait_addr_list->last_waiter; - } - else - { - wait_addr_list->first_waiter = job_fiber_id; - } - wait_addr_list->last_waiter = job_fiber_id; - ++wait_addr_list->num_waiters; - } - if (wait_time != 0) - { - //- Search for wait time list in bin - W32_WaitList *wait_time_list = 0; - for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) - { - if (tmp->value == (u64)wait_time) - { - wait_time_list = tmp; - } - } - //- Acquire new wait time list - if (!wait_time_list) - { - if (wait_time_bin->first_free_wait_list) - { - wait_time_list = wait_time_bin->first_free_wait_list; - wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin; - } - else - { - LockTicketMutex(&g->wait_lists_arena_tm); - { - wait_time_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList); - } - UnlockTicketMutex(&g->wait_lists_arena_tm); - } - ZeroStruct(wait_time_list); - wait_time_list->value = wait_time; - if (wait_time_bin->last_wait_list) - { - wait_time_bin->last_wait_list->next_in_bin = wait_time_list; - wait_time_list->prev_in_bin = wait_time_bin->last_wait_list; - } - else - { - wait_time_bin->first_wait_list = wait_time_list; - } - wait_time_bin->last_wait_list = wait_time_list; - } - //- Insert fiber into wait time list - job_fiber->wait_time = wait_time; - if (wait_time_list->last_waiter) - { - W32_FiberFromId(wait_time_list->last_waiter)->next_time_waiter = job_fiber_id; - job_fiber->prev_time_waiter = wait_time_list->last_waiter; - } - else - { - wait_time_list->first_waiter = job_fiber_id; - } - wait_time_list->last_waiter = job_fiber_id; - ++wait_time_list->num_waiters; - } - - //- PopStruct worker's job fiber - job_fiber = 0; - done = 1; - } - } - if (wait_time != 0) UnlockTicketMutex(&wait_time_bin->lock); - } - if (wait_addr != 0) UnlockTicketMutex(&wait_addr_bin->lock); - } break; - - //- Fiber is finished - case W32_YieldKind_Done: - { - done = 1; - } break; - } - } - } - } - - //- Wait for job - i64 num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); - shutdown = Atomic32Fetch(&pool->workers_shutdown.v); - if (num_jobs_in_queue <= 0 && !shutdown) - { - //__profnc("Wait for job", Rgb32F(0.75, 0.75, 0)); - LockTicketMutex(&pool->workers_wake_tm); - { - num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); - shutdown = Atomic32Fetch(&pool->workers_shutdown.v); - while (num_jobs_in_queue <= 0 && !shutdown) - { - { - UnlockTicketMutex(&pool->workers_wake_tm); - WaitOnAddress(&pool->num_jobs_in_queue, &num_jobs_in_queue, sizeof(num_jobs_in_queue), INFINITE); - LockTicketMutex(&pool->workers_wake_tm); - } - shutdown = Atomic32Fetch(&pool->workers_shutdown.v); - num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); - } - } - UnlockTicketMutex(&pool->workers_wake_tm); + /* TODO: Add toggleable flag that allows for workers to skip parking + * for high-throughput workloads, and instead just spin until work is available */ + WaitOnAddress(&pool->tasks_count, &tasks_count, sizeof(tasks_count), INFINITE); + tasks_count = Atomic64Fetch(&pool->tasks_count.v); } } - //- Worker shutdown - if (job_fiber) + /* Worker shutdown */ + if (task_fiber) { - W32_ReleaseFiber(pool, job_fiber); + W32_ReleaseFiber(pool, task_fiber); } } //////////////////////////////// -//~ Win32 job scheduler entry +//~ @hookdef Fiber suspend/resume operations -W32_ThreadDef(W32_JobSchedulerEntryFunc, UNUSED arg) -{ - struct W32_SharedJobCtx *g = &W32_shared_job_ctx; - - { - i32 priority = THREAD_PRIORITY_TIME_CRITICAL; - b32 success = SetThreadPriority(GetCurrentThread(), priority); - LAX success; - Assert(success); - } - - /* Create high resolution timer */ - HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); - if (!timer) - { - //Panic(Lit("Failed to create high resolution timer")); - } - - /* Create rolling buffer of scheduler cycles initialized to default value */ - i32 periods_index = 0; - i64 periods[W32_NumRollingSchedulerPeriods] = ZI; - for (i32 i = 0; i < (i32)countof(periods); ++i) - { - periods[i] = W32_DefaultSchedulerPeriodNs; - } - - i64 last_cycle_ns = 0; - while (!Atomic32Fetch(&g->shutdown)) - { - __profn("Job scheduler cycle"); - { - __profn("Job scheduler wait"); - LARGE_INTEGER due = ZI; - due.QuadPart = -1; - //due.QuadPart = -10000; - //due.QuadPart = -32000; - //due.QuadPart = -12000; - //due.QuadPart = -8000; - SetWaitableTimerEx(timer, &due, 0, 0, 0, 0, 0); - WaitForSingleObject(timer, INFINITE); - } - - /* Calculate mean period */ - i64 now_ns = TimeNs(); - i64 period_ns = last_cycle_ns == 0 ? W32_DefaultSchedulerPeriodNs : now_ns - last_cycle_ns; - last_cycle_ns = now_ns; - - /* Calculate mean period */ - { - periods[periods_index++] = period_ns; - if (periods_index == countof(periods)) - { - periods_index = 0; - } - f64 periods_sum_ns = 0; - for (i32 i = 0; i < (i32)countof(periods); ++i) - { - periods_sum_ns += (f64)periods[i]; - } - f64 mean_ns = periods_sum_ns / (f64)countof(periods); - Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, RoundF64ToI64(mean_ns)); - } - - { - __profn("Job scheduler run"); - i64 current_cycle = Atomic64FetchAdd(&g->current_scheduler_cycle.v, 1) + 1; - W32_WakeByTime((u64)current_cycle); - } - } -} - -//////////////////////////////// -//~ @hookdef Futex - -void FutexYield(volatile void *addr, void *cmp, u32 size, i64 timeout_ns) +void SuspendFiber(void) { W32_Fiber *fiber = W32_FiberFromId(FiberId()); - i16 parent_id = fiber->parent_id; - if (parent_id != 0) + W32_Fiber *parent_fiber = W32_FiberFromId(fiber->parent_id); + Assert(parent_fiber->id > 0); { - *fiber->yield_param = (W32_YieldParam) { - .kind = W32_YieldKind_Wait, - .wait = { - .addr = addr, - .cmp = cmp, - .size = size, - .timeout_ns = timeout_ns - } - }; - W32_YieldFiber(fiber, W32_FiberFromId(parent_id)); - } - else - { - i32 timeout_ms = 0; - if (timeout_ns > 10000000000000000ll) - { - timeout_ms = INFINITE; - } - else if (timeout_ns != 0) - { - timeout_ms = timeout_ns / 1000000; - timeout_ms += (timeout_ms == 0) * SignF32(timeout_ns); - } - if (addr == 0) - { - Sleep(timeout_ms); - } - else - { - WaitOnAddress(addr, cmp, size, timeout_ms); - } + __prof_fiber_leave(); + Atomic8FetchSet(&fiber->is_suspending, 1); + W32_SwitchToFiber(parent_fiber); + __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->pool) + Kibi(1) + fiber->id); } } -void FutexWake(void *addr, i32 count) +void ResumeFibers(i16 fiber_ids_count, i16 *fiber_ids) { - W32_WakeByAddress(addr, count); + /* Group tasks by pool */ + W32_TaskList tasks_by_pool[JobPool_Count] = ZI; + for (i16 id_index = 0; id_index < fiber_ids_count; ++id_index) + { + i16 fiber_id = fiber_ids[id_index]; + W32_Fiber *fiber = W32_FiberFromId(fiber_id); + W32_Task *task = fiber->task; + JobPool pool_kind = fiber->pool; + W32_TaskList *pool_tasks = &tasks_by_pool[pool_kind]; + QueuePush(pool_tasks->first, pool_tasks->last, task); + ++pool_tasks->count; + + /* Wait for fiber to finish suspending */ + while (Atomic8Fetch(&fiber->is_suspending)) + { + _mm_pause(); + } + } + + /* Submit tasks */ + for (JobPool pool_kind = 0; pool_kind < JobPool_Count; ++pool_kind) + { + W32_TaskList *tasks = &tasks_by_pool[pool_kind]; + i16 count = tasks->count; + if (count > 0) + { + W32_JobPool *pool = &W32_shared_job_state.job_pools[pool_kind]; + + /* Push tasks to front of queue */ + LockTicketMutex(&pool->tasks_tm); + { + tasks->last->next = pool->first_task; + pool->first_task = tasks->first; + if (!pool->last_task) + { + pool->last_task = tasks->last; + } + Atomic64FetchAdd(&pool->tasks_count.v, count); + } + UnlockTicketMutex(&pool->tasks_tm); + + /* Wake workers */ + if (count >= W32_WakeAllWorkersThreshold) + { + WakeByAddressAll(&pool->tasks_count); + } + else + { + for (i16 i = 0; i < count; ++i) + { + WakeByAddressSingle(&pool->tasks_count); + } + } + } + } } //////////////////////////////// //~ @hoodef Job operations -GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, JobDescParams params) +Job *OpenJob(JobFunc *func, JobPool pool_kind) { - struct W32_SharedJobCtx *g = &W32_shared_job_ctx; - GenericJobDesc *result = 0; - Arena *arena = 0; - LockTicketMutex(&g->job_descs_tm); + W32_JobPool *pool = W32_JobPoolFromKind(pool_kind); + Job *job = 0; { - if (g->first_free_job_desc) + Arena *job_arena = 0; { - result = g->first_free_job_desc; - g->first_free_job_desc = result->next_free; - arena = result->arena; - } - else - { - result = PushStructNoZero(g->job_descs_arena, GenericJobDesc); - arena = AcquireArena(Mebi(4)); + { + LockTicketMutex(&pool->free_jobs_tm); + Job *free_job = pool->first_free_job; + if (free_job) + { + pool->first_free_job = free_job->next; + job_arena = free_job->arena; + } + UnlockTicketMutex(&pool->free_jobs_tm); + } + if (job_arena) + { + ResetArena(job_arena); + } + else + { + job_arena = AcquireArena(Mebi(4)); + } } + job = PushStruct(job_arena, Job); + job->arena = job_arena; } - UnlockTicketMutex(&g->job_descs_tm); - ZeroStruct(result); - result = PushStruct(arena, GenericJobDesc); - result->arena = arena; - result->sig = PushBytes(arena, sig_size, sig_align); - result->func = func; - result->count = params.count; - result->pool = params.pool; - result->priority = params.priority; - result->counter = params.counter; - return result; + job->pool = pool_kind; + job->func = func; + job->count = 1; + return job; } -void RunJobEx(GenericJobDesc *desc) +void CloseJob(Job *job) { - __prof; - struct W32_SharedJobCtx *g = &W32_shared_job_ctx; - i32 count = desc->count; - Counter *counter = desc->counter; - JobPool pool_kind = desc->pool; - JobPriority priority = desc->priority; - if (count > 0) - { - if (counter) - { - AddCounter(counter, 1); - } - W32_Fiber *fiber = W32_FiberFromId(FiberId()); - priority = ClampI32(priority, fiber->job_priority, JobPriority_Count - 1); /* A job cannot create a job with a higher priority than itself */ - if (pool_kind == JobPool_Inherit) - { - pool_kind = fiber->job_pool; - } - W32_JobPool *pool = &g->job_pools[pool_kind]; - W32_JobQueue *queue = &pool->job_queues[priority]; - LockTicketMutex(&queue->lock); - { - W32_JobInfo *info = 0; - if (queue->first_free) - { - info = queue->first_free; - queue->first_free = info->next; - } - else - { - info = PushStructNoZero(queue->arena, W32_JobInfo); - } - ZeroStruct(info); - info->desc = desc; - info->count = count; - if (queue->last) - { - queue->last->next = info; - } - else - { - queue->first = info; - } - queue->last = info; - } - UnlockTicketMutex(&queue->lock); + TempArena scratch = BeginScratchNoConflict(); - /* Wake workers */ + W32_JobPool *pool = W32_JobPoolFromKind(job->pool); + u32 num_tasks = job->count; + + if (num_tasks > 0) + { + if (job->counter) { - LockTicketMutex(&pool->workers_wake_tm); + Atomic64FetchAdd(&job->counter->num_jobs_dispatched.v, 1); + } + + /* Allocate tasks from free list */ + u32 num_tasks_allocated = 0; + W32_Task **tasks_array = PushStructsNoZero(scratch.arena, W32_Task *, num_tasks); + { + LockTicketMutex(&pool->free_tasks_tm); { - Atomic64FetchAdd(&pool->num_jobs_in_queue.v, count); - if (count >= W32_WakeAllThreshold) + while (num_tasks_allocated < num_tasks) { - WakeByAddressAll(&pool->num_jobs_in_queue); - } - else - { - for (i32 i = 0; i < count; ++i) + W32_Task *task = pool->first_free_task; + if (task) { - WakeByAddressSingle(&pool->num_jobs_in_queue); + tasks_array[num_tasks_allocated++] = task; + StackPop(pool->first_free_task); + } + else + { + break; } } } - UnlockTicketMutex(&pool->workers_wake_tm); + UnlockTicketMutex(&pool->free_tasks_tm); + } + + /* Allocate new tasks from memory */ + u32 remaining = num_tasks - num_tasks_allocated; + if (remaining > 0) + { + Arena *perm = PermArena(); + PushAlign(perm, CachelineSize); + W32_Task *pushed_tasks = PushStructsNoZero(perm, W32_Task, remaining); + for (u32 i = 0; i < remaining; ++i) + { + tasks_array[num_tasks_allocated + i] = &pushed_tasks[i]; + } + num_tasks_allocated += remaining; + PushAlign(perm, CachelineSize); + } + + /* Generate task list */ + W32_TaskList tasks = ZI; + for (u32 i = 0; i < num_tasks; ++i) + { + W32_Task *task = tasks_array[i]; + ZeroStruct(task); + task->job = job; + task->task_id = tasks.count++; + QueuePush(tasks.first, tasks.last, task); + } + + /* Push tasks to back of pool */ + { + LockTicketMutex(&pool->tasks_tm); + { + // QueuePush(pool->first_task, pool->last_task, tasks.first); + if (pool->last_task) + { + pool->last_task->next = tasks.first; + } + else + { + pool->first_task = tasks.first; + } + pool->last_task = tasks.last; + Atomic64FetchAdd(&pool->tasks_count.v, num_tasks); + } + UnlockTicketMutex(&pool->tasks_tm); + } + + /* Wake workers */ + if (num_tasks >= W32_WakeAllWorkersThreshold) + { + WakeByAddressAll(&pool->tasks_count); + } + else + { + for (u32 i = 0; i < num_tasks; ++i) + { + WakeByAddressSingle(&pool->tasks_count); + } } } -} -//////////////////////////////// -//~ @hookdef Helpers - -i64 TimeNs(void) -{ - struct W32_SharedJobCtx *g = &W32_shared_job_ctx; - LARGE_INTEGER qpc; - QueryPerformanceCounter(&qpc); - i64 result = (qpc.QuadPart - g->timer_start_qpc) * g->ns_per_qpc; - return result; -} - -u32 GetLogicalProcessorCount(void) -{ - return GetActiveProcessorCount(ALL_PROCESSOR_GROUPS); -} - -i64 GetCurrentSchedulerPeriodNs(void) -{ - W32_SharedJobCtx *g = &W32_shared_job_ctx; - return Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v); + EndScratch(scratch); } diff --git a/src/base/base_win32/base_win32_job.h b/src/base/base_win32/base_win32_job.h index 8e521615..c03b39fa 100644 --- a/src/base/base_win32/base_win32_job.h +++ b/src/base/base_win32/base_win32_job.h @@ -32,42 +32,14 @@ typedef W32_ThreadDef(W32_ThreadFunc, data); Struct(W32_Thread) { + String thread_name; W32_ThreadFunc *entry_point; - void *thread_data; - char thread_name_cstr[256]; - wchar_t thread_name_wstr[256]; + void *thread_udata; i32 profiler_group; - W32_Thread *next; - W32_Thread *prev; - HANDLE handle; }; -//////////////////////////////// -//~ Wait list types - -AlignedStruct(W32_WaitList, 64) -{ - u64 value; - i16 first_waiter; - i16 last_waiter; - i32 num_waiters; - W32_WaitList *next_in_bin; - W32_WaitList *prev_in_bin; - u8 pad[32]; -}; -StaticAssert(alignof(W32_WaitList) == 64); /* Avoid false sharing */ - -AlignedStruct(W32_WaitBin, 64) -{ - W32_WaitList *first_wait_list; - W32_WaitList *last_wait_list; - W32_WaitList *first_free_wait_list; - TicketMutex lock; -}; -StaticAssert(alignof(W32_WaitBin) == 64); /* Avoid false sharing */ - //////////////////////////////// //~ Fiber types @@ -76,211 +48,140 @@ StaticAssert(alignof(W32_WaitBin) == 64); /* Avoid false sharing */ #define W32_FiberNameSuffixCstr "]" #define W32_FiberNameMaxSize 64 -//- Yield param -Enum(W32_YieldKind) -{ - W32_YieldKind_None, - W32_YieldKind_Done, - W32_YieldKind_Wait, - - W32_YieldKind_Count -}; - -Struct(W32_YieldParam) -{ - W32_YieldKind kind; - union - { - struct - { - volatile void *addr; - void *cmp; - u32 size; - i64 timeout_ns; - } wait; - }; -}; - //- Fiber -AlignedStruct(W32_Fiber, 64) +AlignedStruct(W32_Fiber, CachelineSize) { /* ---------------------------------------------------- */ void *addr; /* 08 bytes */ /* ---------------------------------------------------- */ char *name_cstr; /* 08 bytes */ /* ---------------------------------------------------- */ - Atomic32 wake_lock; /* 04 bytes (4 byte alignment) */ + Atomic8 is_suspending; /* 01 bytes */ + u8 _pad0[1]; /* 01 bytes (padding) */ i16 id; /* 02 bytes */ + i16 pool; /* 02 bytes */ i16 parent_id; /* 02 bytes */ /* ---------------------------------------------------- */ - u64 wait_addr; /* 08 bytes */ + struct W32_Task *task; /* 08 bytes */ /* ---------------------------------------------------- */ - u64 wait_time; /* 08 bytes */ - /* ---------------------------------------------------- */ - i16 next_addr_waiter; /* 02 bytes */ - i16 prev_addr_waiter; /* 02 bytes */ - i16 next_time_waiter; /* 02 bytes */ - i16 prev_time_waiter; /* 02 bytes */ - /* ---------------------------------------------------- */ - GenericJobDesc *job_desc; /* 08 bytes */ - /* ---------------------------------------------------- */ - u8 _pad0[8]; /* 08 bytes (padding) */ - /* ---------------------------------------------------- */ - /* -------------------- Cache line -------------------- */ - /* ---------------------------------------------------- */ - i32 job_id; /* 04 bytes */ - i16 job_pool; /* 02 bytes */ - i16 job_priority; /* 02 bytes */ - /* ---------------------------------------------------- */ - W32_YieldParam *yield_param; /* 08 bytes */ - /* ---------------------------------------------------- */ - u8 _pad1[48]; /* 48 bytes (padding) */ - +#if 0 + u8 _pad1[32]; /* 32 bytes (padding) */ +#endif }; -StaticAssert(sizeof(W32_Fiber) == 128); /* Padding validation (increase if necessary) */ -StaticAssert(alignof(W32_Fiber) == 64); /* Verify alignment to avoid false sharing */ -StaticAssert(offsetof(W32_Fiber, wake_lock) % 4 == 0); /* Atomic must be aligned */ +StaticAssert(alignof(W32_Fiber) == CachelineSize && sizeof(W32_Fiber) % CachelineSize == 0); /* False sharing validation */ +StaticAssert(offsetof(W32_Fiber, is_suspending) % 4 == 0); /* Atomic alignment validation */ //////////////////////////////// -//~ Job queue types +//~ Job pool types //- Worker ctx -AlignedStruct(W32_WorkerCtx, 64) +AlignedStruct(W32_WorkerCtx, CachelineSize) { JobPool pool_kind; i32 id; }; -//- Job info -Struct(W32_JobInfo) +//- Task +Struct(W32_Task) { - GenericJobDesc *desc; - - i32 count; - i32 num_dispatched; - i16 fiber_id; /* If the job is being resumed from a yield */ - - W32_JobInfo *next; + W32_Task *next; + Job *job; + u32 task_id; + i16 fiber_id; }; -//- Job queue -AlignedStruct(W32_JobQueue, 64) +Struct(W32_TaskList) { - TicketMutex lock; - Arena *arena; - - W32_JobInfo *first; - W32_JobInfo *last; - - W32_JobInfo *first_free; + W32_Task *first; + W32_Task *last; + u64 count; }; //- Job pool -AlignedStruct(W32_JobPool, 64) +AlignedStruct(W32_JobPool, CachelineSize) { - /* Jobs */ - W32_JobQueue job_queues[JobPriority_Count]; - - TicketMutex free_fibers_tm; - i16 first_free_fiber_id; - - /* Workers */ - Atomic32Padded workers_shutdown; - Atomic64Padded num_jobs_in_queue; - TicketMutex workers_wake_tm; - + JobPool kind; i32 num_worker_threads; i32 thread_priority; u64 thread_affinity_mask; b32 thread_is_audio; - Arena *worker_threads_arena; W32_Thread **worker_threads; W32_WorkerCtx *worker_contexts; + + /* Tasks */ + AlignedBlock(CachelineSize) + { + TicketMutex tasks_tm; + Atomic64Padded tasks_count; + W32_Task *first_task; + W32_Task *last_task; + }; + + /* TODO: Make lists wait-free */ + + /* Free fibers */ + AlignedBlock(CachelineSize) + { + TicketMutex free_fibers_tm; + i16 first_free_fiber_id; + }; + + /* Free jobs */ + AlignedBlock(CachelineSize) + { + TicketMutex free_jobs_tm; + Job *first_free_job; + }; + + /* Free tasks */ + AlignedBlock(CachelineSize) + { + TicketMutex free_tasks_tm; + W32_Task *first_free_task; + }; }; //////////////////////////////// -//~ State +//~ State types -/* Assume scheduler cycle is 20hz at start to be conservative */ -#define W32_DefaultSchedulerPeriodNs 50000000 -#define W32_NumRollingSchedulerPeriods 1000 +/* Arbitrary threshold for determining when to use a looped WakeByAddressSingle vs a WakeByAddressAll to wake parked workers */ +#define W32_WakeAllWorkersThreshold 8 -#define W32_NumWaitAddrBins 16384 -#define W32_NumWaitTimeBins 1024 - -/* Arbitrary threshold for determining when to fall back from a looped WakeByAddressSingle to WakeByAddressAll */ -#define W32_WakeAllThreshold 16 - -Struct(W32_SharedJobCtx) +Struct(W32_SharedJobState) { - i64 timer_start_qpc; - i64 ns_per_qpc; - Atomic32 shutdown; - - //- Worker thread pool - TicketMutex threads_tm; - Arena *threads_arena; - W32_Thread *first_thread; - W32_Thread *last_thread; - W32_Thread *first_free_thread; - - //- Scheduler - Atomic64Padded current_scheduler_cycle; - Atomic64Padded current_scheduler_cycle_period_ns; - //- Fibers - TicketMutex fibers_tm; - i16 num_fibers; - Arena *fiber_names_arena; - W32_Fiber fibers[MaxFibers]; - - //- Job descs - Arena *job_descs_arena; - TicketMutex job_descs_tm; - GenericJobDesc *first_free_job_desc; - - //- Wait lists - Atomic64Padded waiter_wake_gen; - TicketMutex wait_lists_arena_tm; - Arena *wait_lists_arena; - - //- Wait tables - W32_WaitBin wait_addr_bins[W32_NumWaitAddrBins]; - W32_WaitBin wait_time_bins[W32_NumWaitTimeBins]; + AlignedBlock(CachelineSize) + { + TicketMutex fibers_tm; + i16 num_fibers; + W32_Fiber fibers[MaxFibers]; + Arena *fiber_names_arena; + }; //- Job pools - W32_JobPool job_pools[JobPool_Count]; -}; - -extern W32_SharedJobCtx W32_shared_job_ctx; + AlignedBlock(CachelineSize) + { + W32_JobPool job_pools[JobPool_Count]; + }; +} extern W32_shared_job_state; //////////////////////////////// //~ Startup void StartupJobs(void); -//////////////////////////////// -//~ Shutdown - -void ShutdownJobs(void); - //////////////////////////////// //~ Thread operations DWORD WINAPI W32_Win32ThreadProc(LPVOID vt); -W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, String thread_name, i32 profiler_group); -b32 W32_TryReleaseThread(W32_Thread *thread, f32 timeout_seconds); -void W32_WaitReleaseThread(W32_Thread *thread); +W32_Thread *W32_StartThread(W32_ThreadFunc *entry_point, void *thread_udata, String thread_name, i32 profiler_group); +b32 W32_TryEndThread(W32_Thread *thread, f32 timeout_seconds); +void W32_WaitEndThread(W32_Thread *thread); //////////////////////////////// -//~ Wait list operations +//~ Pool operations -W32_WaitBin *W32_WaitBinFromAddr(u64 addr); -W32_WaitBin *W32_WaitBinFromTime(u64 time); - -void W32_WakeLockedFibers(i32 num_fibers, W32_Fiber **fibers); -void W32_WakeByAddress(void *addr, i32 count); -void W32_WakeByTime(u64 time); +W32_JobPool *W32_JobPoolFromKind(JobPool pool_kind); //////////////////////////////// //~ Fiber operations @@ -288,20 +189,15 @@ void W32_WakeByTime(u64 time); W32_Fiber *W32_AcquireFiber(W32_JobPool *pool); void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber); ForceInline W32_Fiber *W32_FiberFromId(i16 id); -void W32_FiberResume(W32_Fiber *fiber); +void W32_SwitchToFiber(W32_Fiber *fiber); void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber); //////////////////////////////// //~ Fiber entry -void W32_FiberEntryPoint(void *id_ptr); +void W32_FiberEntryPoint(void *wi32_fiber_data); //////////////////////////////// //~ Job worker entry W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg); - -//////////////////////////////// -//~ Job scheduler entry - -W32_ThreadDef(W32_JobSchedulerEntryFunc, _); diff --git a/src/draw/draw.h b/src/draw/draw.h index 3e5cd540..34573680 100644 --- a/src/draw/draw.h +++ b/src/draw/draw.h @@ -99,14 +99,12 @@ Struct(D_TextParams) }) //////////////////////////////// -//~ State +//~ State types Struct(D_SharedState) { GPU_Resource *solid_white_texture; -}; - -extern D_SharedState D_shared_state; +} extern D_shared_state; //////////////////////////////// //~ Startup diff --git a/src/font/font.c b/src/font/font.c index c2e88277..a9c55e77 100644 --- a/src/font/font.c +++ b/src/font/font.c @@ -122,11 +122,14 @@ AC_Asset *F_LoadAsset(Resource resource, f32 point_size, b32 wait) if (is_first_touch) { AC_MarkLoading(asset); - F_LoadJob_Desc *desc = PushJobDesc(F_LoadJob, .pool = JobPool_Background, .priority = JobPriority_Low); - desc->sig->asset = asset; - desc->sig->resource = resource; - desc->sig->point_size = point_size; - RunJobEx((GenericJobDesc *)desc); + { + Job *job = OpenJob(F_LoadJob, JobPool_Background); + F_LoadJob_Sig *sig = PushStruct(job->arena, F_LoadJob_Sig); + sig->asset = asset; + sig->resource = resource; + sig->point_size = point_size; + CloseJob(job); + } if (wait) { AC_YieldOnAssetReady(asset); diff --git a/src/gpu/gpu_dx12/gpu_dx12.h b/src/gpu/gpu_dx12/gpu_dx12.h index d12f4ff7..020924ea 100644 --- a/src/gpu/gpu_dx12/gpu_dx12.h +++ b/src/gpu/gpu_dx12/gpu_dx12.h @@ -145,7 +145,7 @@ Struct(GPU_D12_Swapchain) }; //////////////////////////////// -//~ State +//~ State types Struct(GPU_D12_FiberState) { @@ -156,7 +156,7 @@ Struct(GPU_D12_FiberState) Struct(GPU_D12_SharedState) { i32 _; -}; +} extern GPU_D12_shared_state; //////////////////////////////// //~ State operations diff --git a/src/meta/meta.c b/src/meta/meta.c index 09399f19..672a4263 100644 --- a/src/meta/meta.c +++ b/src/meta/meta.c @@ -62,16 +62,10 @@ //////////////////////////////// //~ Util -/* TODO: Move this to OS layer */ -void Echo(String msg) -{ - OS_Echo(msg); -} - void EchoLine(String msg) { - OS_Echo(msg); - OS_Echo(Lit("\n")); + Echo(msg); + Echo(Lit("\n")); } Struct(LineCol) @@ -535,9 +529,12 @@ JobDef(StepJob, sig, id) ++i; } } - Counter counter = ZI; - RunJob(shader_entries_count, RunCommandJob, JobPool_Hyper, JobPriority_Normal, &counter, .cmds = compile_cmds, .results = compile_results); - YieldOnCounter(&counter); + JobCounter counter = ZI; + RunJob(RunCommandJob, + .count = shader_entries_count, + .counter = &counter, + .sig = { .cmds = compile_cmds, .results = compile_results }); + YieldOnJobs(&counter); //- Process shader compilation results { @@ -569,9 +566,9 @@ JobDef(StepJob, sig, id) arc_params.arc_store = shader_store_name; arc_params.arc_dir = shader_store_name; - Counter counter = ZI; - RunJob(1, StepJob, JobPool_Hyper, JobPriority_Normal, &counter, .params = &arc_params, .results = &arc_results); - YieldOnCounter(&counter); + JobCounter counter = ZI; + RunJob(StepJob, .counter = &counter, .sig.params = &arc_params, .sig.results = &arc_results); + YieldOnJobs(&counter); InheritStepResults(arena, result, 1, &arc_results); } } @@ -645,9 +642,13 @@ JobDef(StepJob, sig, id) ++i; } } - Counter counter = ZI; - RunJob(dir_embeds_count, StepJob, JobPool_Hyper, JobPriority_Normal, &counter, .params = arc_params_array, .results = arc_results_array); - YieldOnCounter(&counter); + JobCounter counter = ZI; + RunJob(StepJob, + .counter = &counter, + .count = dir_embeds_count, + .sig.params = arc_params_array, + .sig.results = arc_results_array); + YieldOnJobs(&counter); InheritStepResults(arena, result, dir_embeds_count, arc_results_array); } } @@ -804,7 +805,8 @@ JobDef(StepJob, sig, id) //////////////////////////////// //~ Startup -void StartupMeta(void) +JobDecl(BuildJob, EmptySig); +JobDef(BuildJob, _, __) { Arena *arena = PermArena(); M_ErrorList errors = ZI; @@ -991,9 +993,9 @@ void StartupMeta(void) resource_params->compiler_params = cp; resource_params->flattened = flattened; - Counter step_counter = ZI; - RunJob(countof(params_array), StepJob, JobPool_Hyper, JobPriority_Normal, &step_counter, .params = params_array, .results = results_array); - YieldOnCounter(&step_counter); + JobCounter step_counter = ZI; + RunJob(StepJob, .count = countof(params_array), .counter = &step_counter, .sig.params = params_array, .sig.results = results_array); + YieldOnJobs(&step_counter); //////////////////////////////// //~ Process compile step results @@ -1106,5 +1108,5 @@ void StartupMeta(void) void StartupLayers(void) { OS_Startup(); - StartupMeta(); + RunJob(BuildJob, .pool = JobPool_Hyper); } diff --git a/src/meta/meta_os/meta_os.h b/src/meta/meta_os/meta_os.h index a9a17a46..0f83c332 100644 --- a/src/meta/meta_os/meta_os.h +++ b/src/meta/meta_os/meta_os.h @@ -53,5 +53,4 @@ void OS_Rm(String path); //////////////////////////////// //~ @hookdecl Shell operations -void OS_Echo(String msg); OS_CommandResult OS_RunCommand(Arena *arena, String cmd); diff --git a/src/meta/meta_os/meta_os_win32/meta_os_win32.c b/src/meta/meta_os/meta_os_win32/meta_os_win32.c index 987d8f9f..3de6875d 100644 --- a/src/meta/meta_os/meta_os_win32/meta_os_win32.c +++ b/src/meta/meta_os/meta_os_win32/meta_os_win32.c @@ -208,15 +208,6 @@ void OS_Rm(String path) //////////////////////////////// //~ @hookdef Shell hooks -void OS_Echo(String msg) -{ - HANDLE console_handle = GetStdHandle(STD_OUTPUT_HANDLE); - if (console_handle != INVALID_HANDLE_VALUE) - { - WriteFile(console_handle, msg.text, msg.len, 0, 0); - } -} - OS_CommandResult OS_RunCommand(Arena *arena, String cmd) { TempArena scratch = BeginScratch(arena); diff --git a/src/mixer/mixer.h b/src/mixer/mixer.h index 3fa77f2e..b899a99f 100644 --- a/src/mixer/mixer.h +++ b/src/mixer/mixer.h @@ -75,7 +75,7 @@ Struct(MIX_Track){ }; //////////////////////////////// -//~ State +//~ State types Struct(MIX_SharedState) { @@ -91,9 +91,7 @@ Struct(MIX_SharedState) MIX_Track *track_last_playing; u64 track_playing_count; MIX_Track *track_first_free; -}; - -extern MIX_SharedState M_shared_state; +} extern M_shared_state; //////////////////////////////// //~ Startup diff --git a/src/platform/platform.h b/src/platform/platform.h index 36cca614..140c8501 100644 --- a/src/platform/platform.h +++ b/src/platform/platform.h @@ -279,12 +279,12 @@ Enum(P_MessageBoxKind) void P_Startup(void); //////////////////////////////// -//~ @hookdecl Time helper operations +//~ @hookdecl Time helper hooks P_DateTime P_LocalTime(void); //////////////////////////////// -//~ @hookdecl File system operations +//~ @hookdecl File system hooks /* NOTE: File paths use forward slash '/' as delimiter */ @@ -310,14 +310,14 @@ u64 P_GetFileSize(P_File file); P_FileTime P_GetFileTime(P_File file); //////////////////////////////// -//~ @hookdecl File map operations +//~ @hookdecl File map hooks P_FileMap P_OpenFileMap(P_File file); void P_CloseFileMap(P_FileMap map); String P_GetFileMapData(P_FileMap map); //////////////////////////////// -//~ @hookdecl Window operations +//~ @hookdecl Window hooks P_Window *P_AcquireWindow(void); void P_ReleaseWindow(P_Window *window); @@ -342,7 +342,7 @@ Vec2I32 P_GetWindowMonitorSize(P_Window *window); u64 P_GetInternalWindowHandle(P_Window *window); //////////////////////////////// -//~ @hookdecl Address helpers +//~ @hookdecl Address helper hooks P_Address P_AddressFromString(String str); P_Address P_AddressFromIpPortCstr(char *ip_cstr, char *port_cstr); @@ -351,7 +351,7 @@ String P_StringFromAddress(Arena *arena, P_Address address); b32 P_AddressIsEqual(P_Address a, P_Address b); //////////////////////////////// -//~ @hookdecl Sock operations +//~ @hookdecl Sock hooks P_Sock *P_AcquireSock(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size); void P_ReleaseSock(P_Sock *sock); @@ -359,14 +359,20 @@ P_SockReadResult P_ReadSock(Arena *arena, P_Sock *sock); void P_WriteSock(P_Sock *sock, P_Address address, String data); //////////////////////////////// -//~ @hookdecl Utils +//~ @hookdecl Util hooks void P_MessageBox(P_MessageBoxKind kind, String message); void P_SetClipboardText(String str); String P_GetClipboardText(Arena *arena); //////////////////////////////// -//~ @hookdecl Sleep +//~ @hookdecl Timer hooks + +Fence *P_GetTimerFence(); +i64 P_GetCurrentTimerPeriodNs(); + +//////////////////////////////// +//~ @hookdecl Sleep hooks void P_SleepPrecise(i64 sleep_time_ns); void P_SleepFrame(i64 last_frame_time_ns, i64 target_dt_ns); diff --git a/src/platform/platform_log.c b/src/platform/platform_log.c index f46efaf0..b8d0767b 100644 --- a/src/platform/platform_log.c +++ b/src/platform/platform_log.c @@ -1,6 +1,3 @@ -//////////////////////////////// -//~ State - P_SharedLogState P_shared_log_state = ZI; //////////////////////////////// @@ -184,9 +181,9 @@ void P_Log_(i32 level, String msg) FmtString(msg) ); #else - String msg_formatted = FormatString( + String msg_formatted = StringF( scratch.arena, - Lit("[%F:%F:%F.%F] |%F| [%F] %F"), + "[%F:%F:%F.%F] |%F| [%F] %F", /* Time */ FmtUintZ(datetime.hour, 2), diff --git a/src/platform/platform_log.h b/src/platform/platform_log.h index 119030c5..afc8b972 100644 --- a/src/platform/platform_log.h +++ b/src/platform/platform_log.h @@ -56,9 +56,8 @@ Struct(LogEventCallback) #define P_LogLevel_Count 6 //////////////////////////////// -//~ State +//~ State types -//- Shared context Struct(P_SharedLogState) { Atomic32 initialized; @@ -70,11 +69,11 @@ Struct(P_SharedLogState) P_File file; b32 file_valid; -}; +} P_shared_log_state; -extern P_SharedLogState P_shared_log_state; +//////////////////////////////// +//~ Log level types -//-- Log level settings Struct(P_LogLevelSettings) { String shorthand; diff --git a/src/platform/platform_win32/platform_win32.c b/src/platform/platform_win32/platform_win32.c index 27b6325a..92709f26 100644 --- a/src/platform/platform_win32/platform_win32.c +++ b/src/platform/platform_win32/platform_win32.c @@ -99,6 +99,9 @@ void P_Startup(void) //- Init winsock WSAStartup(MAKEWORD(2, 2), &g->wsa_data); g->socks_arena = AcquireArena(Gibi(64)); + + //- Init timer + RunJob(P_W32_TimerJob); } //////////////////////////////// @@ -181,7 +184,7 @@ P_W32_Window *P_W32_AcquireWindow(void) * created and receive a HWND, because on Windows a the event proc must run on * the same thread that created the window. */ AddCounter(&window->ready_fence, 1); - window->window_thread = W32_AcquireThread(&P_W32_WindowThreadEntryFunc, window, Lit("Window thread"), PROF_THREAD_GROUP_WINDOW); + window->window_thread = W32_StartThread(&P_W32_WindowThreadEntryFunc, window, Lit("Window thread"), PROF_THREAD_GROUP_WINDOW); YieldOnCounter(&window->ready_fence); return window; @@ -193,7 +196,7 @@ void P_W32_ReleaseWindow(P_W32_Window *window) Atomic32FetchSet(&window->shutdown, 1); P_W32_SharedState *g = &P_W32_shared_state; P_W32_WakeWindow(window); - W32_WaitReleaseThread(window->window_thread); + W32_WaitEndThread(window->window_thread); Lock lock = LockE(&g->windows_mutex); { @@ -862,6 +865,69 @@ P_Address P_W32_PlatformAddressFromWin32Address(P_W32_Address ws_addr) return result; } +//////////////////////////////// +//~ Timer job + +JobDef(P_W32_TimerJob, _, __) +{ + P_W32_SharedState *g = &P_W32_shared_state; + SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL); + + /* Create high resolution timer */ + HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); + if (!timer) + { + Panic(Lit("Failed to create high resolution timer")); + } + + i32 periods_index = 0; + i64 periods[P_W32_NumRollingTimerPeriods] = ZI; + for (i32 i = 0; i < (i32)countof(periods); ++i) + { + periods[i] = P_W32_DefaultTimerPeriodNs; + } + + i64 last_cycle_ns = 0; + /* FIXME: shutdown */ + for (;;) + { + __profn("Job scheduler cycle"); + { + __profn("Job scheduler wait"); + LARGE_INTEGER due = ZI; + due.QuadPart = -1; + //due.QuadPart = -10000; + //due.QuadPart = -32000; + //due.QuadPart = -12000; + //due.QuadPart = -8000; + SetWaitableTimerEx(timer, &due, 0, 0, 0, 0, 0); + WaitForSingleObject(timer, INFINITE); + } + i64 now_ns = TimeNs(); + i64 period_ns = last_cycle_ns == 0 ? P_W32_DefaultTimerPeriodNs : now_ns - last_cycle_ns; + last_cycle_ns = now_ns; + + /* Calculate mean period */ + { + periods[periods_index++] = period_ns; + if (periods_index == countof(periods)) + { + periods_index = 0; + } + f64 periods_sum_ns = 0; + for (i32 i = 0; i < (i32)countof(periods); ++i) + { + periods_sum_ns += (f64)periods[i]; + } + f64 mean_ns = periods_sum_ns / (f64)countof(periods); + Atomic64FetchSet(&g->average_timer_period_ns.v, RoundF64ToI64(mean_ns)); + } + + /* Update fence */ + FetchSetFence(&g->timer_fence, now_ns); + } +} + //////////////////////////////// //~ @hookdef Time hooks @@ -1083,7 +1149,7 @@ String P_ReadFile(Arena *arena, P_File file) { /* ReadFile returns non-zero on success */ /* TODO: error checking */ - AlignArena(arena, 16); + PushAlign(arena, CachelineSize); s.text = PushStructsNoZero(arena, u8, size); LAX ReadFile( (HANDLE)file.handle, @@ -1774,6 +1840,21 @@ String P_GetClipboardText(Arena *arena) return result; } +//////////////////////////////// +//~ @hookdecl Timer hooks + +Fence *P_GetTimerFence() +{ + P_W32_SharedState *g = &P_W32_shared_state; + return &g->timer_fence; +} + +i64 P_GetCurrentTimerPeriodNs() +{ + P_W32_SharedState *g = &P_W32_shared_state; + return Atomic64Fetch(&g->average_timer_period_ns.v); +} + //////////////////////////////// //~ @hookdef Sleep hooks @@ -1781,29 +1862,26 @@ void P_SleepPrecise(i64 sleep_time_ns) { __prof; - i64 big_sleep = GetCurrentSchedulerPeriodNs(); - i64 tolerance = (f64)big_sleep * 0.5; - //i64 tolerance = 1000000000; - i64 now_ns = TimeNs(); i64 target_ns = now_ns + sleep_time_ns; - /* Sleep */ - while (now_ns < target_ns - big_sleep - tolerance) + /* Sleep on timer to conserve power */ { - __profn("Sleep part"); - FutexYield(0, 0, 0, big_sleep); - now_ns = TimeNs(); + __profn("Sleep timer"); + Fence *timer_fence = P_GetTimerFence(); + i64 timer_period_ns = P_GetCurrentTimerPeriodNs(); + i64 timer_tolerance_ns = timer_period_ns * 0.5; + i64 target_timer_sleep_ns = target_ns - timer_period_ns - timer_tolerance_ns; + YieldOnFence(timer_fence, target_timer_sleep_ns); } /* Spin */ + now_ns = TimeNs(); + while (now_ns < target_ns) { __profn("Sleep spin"); - while (now_ns < target_ns) - { - _mm_pause(); - now_ns = TimeNs(); - } + _mm_pause(); + now_ns = TimeNs(); } } diff --git a/src/platform/platform_win32/platform_win32.h b/src/platform/platform_win32/platform_win32.h index 291c5f96..7e847ac8 100644 --- a/src/platform/platform_win32/platform_win32.h +++ b/src/platform/platform_win32/platform_win32.h @@ -110,9 +110,11 @@ Struct(P_W32_Sock) }; //////////////////////////////// -//~ State +//~ State types #define P_W32_WindowClassName L"power_play_window_class" +#define P_W32_NumRollingTimerPeriods 500 +#define P_W32_DefaultTimerPeriodNs 50000000 Struct(P_W32_SharedState) { @@ -135,9 +137,11 @@ Struct(P_W32_SharedState) Arena *socks_arena; Mutex socks_mutex; P_W32_Sock *first_free_sock; -}; -extern P_W32_SharedState P_W32_shared_state; + //- Timer + Fence timer_fence; + Atomic64Padded average_timer_period_ns; +} extern P_W32_shared_state; //////////////////////////////// //~ Time operations @@ -172,3 +176,8 @@ LRESULT CALLBACK P_W32_Win32WindowProc(HWND hwnd, UINT msg, WPARAM wparam, LPARA P_W32_Address P_W32_Win32AddressFromPlatformAddress(P_Address addr); P_W32_Address P_W32_ConvertAnyaddrToLocalhost(P_W32_Address addr); P_Address P_W32_PlatformAddressFromWin32Address(P_W32_Address ws_addr); + +//////////////////////////////// +//~ Timer job + +JobDecl(P_W32_TimerJob, EmptySig); diff --git a/src/playback/playback_wasapi/playback_wasapi.c b/src/playback/playback_wasapi/playback_wasapi.c index ede5cb1f..4e17b809 100644 --- a/src/playback/playback_wasapi/playback_wasapi.c +++ b/src/playback/playback_wasapi/playback_wasapi.c @@ -15,7 +15,7 @@ void PB_Startup(void) PB_WSP_SharedState *g = &PB_WSP_shared_state; PB_WSP_InitializeWasapi(); /* Start playback job */ - RunJob(1, PB_WSP_PlaybackJob, JobPool_Audio, JobPriority_High, &g->PB_WSP_PlaybackJob_counter, 0); + RunJob(PB_WSP_PlaybackJob, .pool = JobPool_Audio, .counter = &g->shutdown_job_counter); OnExit(&PB_WSP_Shutdown); } @@ -24,7 +24,7 @@ ExitFuncDef(PB_WSP_Shutdown) __prof; PB_WSP_SharedState *g = &PB_WSP_shared_state; Atomic32FetchSet(&g->shutdown, 1); - YieldOnCounter(&g->PB_WSP_PlaybackJob_counter); + YieldOnJobs(&g->shutdown_job_counter); } void PB_WSP_InitializeWasapi(void) diff --git a/src/playback/playback_wasapi/playback_wasapi.h b/src/playback/playback_wasapi/playback_wasapi.h index 053f6993..1c17f848 100644 --- a/src/playback/playback_wasapi/playback_wasapi.h +++ b/src/playback/playback_wasapi/playback_wasapi.h @@ -22,7 +22,7 @@ Struct(PB_WSP_Buff) }; //////////////////////////////// -//~ State +//~ State types Struct(PB_WSP_SharedState) { @@ -32,10 +32,8 @@ Struct(PB_WSP_SharedState) IAudioRenderClient *playback; WAVEFORMATEX *buffer_format; u32 buffer_frames; - Counter PB_WSP_PlaybackJob_counter; -}; - -extern PB_WSP_SharedState PB_WSP_shared_state; + JobCounter shutdown_job_counter; +} extern PB_WSP_shared_state; //////////////////////////////// //~ Wasapi startup diff --git a/src/pp/pp.c b/src/pp/pp.c index b282b157..98cb5109 100644 --- a/src/pp/pp.c +++ b/src/pp/pp.c @@ -48,8 +48,8 @@ void StartupUser(void) P_ShowWindow(g->window); /* Start jobs */ - RunJob(1, UpdateUserJob, JobPool_User, JobPriority_High, &g->shutdown_job_counters, 0); - RunJob(1, SimJob, JobPool_Sim, JobPriority_High, &g->shutdown_job_counters, 0); + RunJob(UpdateUserJob, .pool = JobPool_User, .counter = &g->shutdown_job_counter); + RunJob(SimJob, .pool = JobPool_Sim, .counter = &g->shutdown_job_counter); OnExit(&ShutdownUser); } @@ -61,7 +61,7 @@ ExitFuncDef(ShutdownUser) __prof; SharedUserState *g = &shared_user_state; Atomic32FetchSet(&g->shutdown, 1); - YieldOnCounter(&g->shutdown_job_counters); + YieldOnJobs(&g->shutdown_job_counter); P_ReleaseWindow(g->window); } @@ -2310,10 +2310,10 @@ void UpdateUser(P_Window *window) __profn("Shade pass"); GPU_ProfN(cl, Lit("Shade pass")); - u32 shade_flags = K_SHADE_FLAG_NONE; + u32 shade_flags = ShadeFlag_None; if (effects_disabled) { - shade_flags |= K_SHADE_FLAG_DISABLE_EFFECTS; + shade_flags |= ShadeFlag_DisableEffects; } ShadeSig sig = ZI; sig.flags = shade_flags; diff --git a/src/pp/pp.h b/src/pp/pp.h index 38714965..623e6570 100644 --- a/src/pp/pp.h +++ b/src/pp/pp.h @@ -139,7 +139,7 @@ Struct(DecodeQueue) }; //////////////////////////////// -//~ State +//~ State types Struct(BindState) { @@ -153,7 +153,7 @@ Struct(BindState) Struct(SharedUserState) { Atomic32 shutdown; - Counter shutdown_job_counters; + JobCounter shutdown_job_counter; P_Window *window; GPU_Swapchain *swapchain; @@ -266,9 +266,7 @@ Struct(SharedUserState) Vec2 world_cursor; Vec2 focus_send; -}; - -extern SharedUserState shared_user_state; +} extern shared_user_state; //////////////////////////////// //~ Startup diff --git a/src/pp/pp_draw.gpu b/src/pp/pp_draw.gpu index fb3c6e98..103ca84e 100644 --- a/src/pp/pp_draw.gpu +++ b/src/pp/pp_draw.gpu @@ -277,7 +277,7 @@ void CSDef(ShadeCS, Semantic(uint3, SV_DispatchThreadID)) color *= albedo_tex[id]; /* Apply lighting */ - if (!(sig.flags & K_SHADE_FLAG_DISABLE_EFFECTS)) + if (!(sig.flags & ShadeFlag_DisableEffects)) { color.rgb *= ColorFromPos(id); } diff --git a/src/pp/pp_draw.h b/src/pp/pp_draw.h index c1182dfb..a516a0a9 100644 --- a/src/pp/pp_draw.h +++ b/src/pp/pp_draw.h @@ -60,8 +60,8 @@ AssertRootConst(FloodSig, 8); //////////////////////////////// //~ Shade types -#define K_SHADE_FLAG_NONE (0 << 0) -#define K_SHADE_FLAG_DISABLE_EFFECTS (1 << 0) +#define ShadeFlag_None (0 << 0) +#define ShadeFlag_DisableEffects (1 << 0) Struct(ShadeSig) { diff --git a/src/pp/pp_sim.h b/src/pp/pp_sim.h index 80f4a3bb..dce320af 100644 --- a/src/pp/pp_sim.h +++ b/src/pp/pp_sim.h @@ -227,7 +227,7 @@ Inline Snapshot *NilSnapshot(void) } //////////////////////////////// -//~ State +//~ State types #define ClientLookupBinsCount 127 #define TickLookupBinsCount 127 diff --git a/src/sound/sound.c b/src/sound/sound.c index e431a0dd..d5a1a9a7 100644 --- a/src/sound/sound.c +++ b/src/sound/sound.c @@ -98,11 +98,14 @@ AC_Asset *SND_LoadAsset(Resource resource, SND_SoundFlag flags, b32 wait) if (is_first_touch) { AC_MarkLoading(asset); - SND_LoadJob_Desc *desc = PushJobDesc(SND_LoadJob, .pool = JobPool_Background, .priority = JobPriority_Low, .counter = &asset->counter); - desc->sig->resource = resource; - desc->sig->asset = asset; - desc->sig->flags = flags; - RunJobEx((GenericJobDesc *)desc); + { + Job *job = OpenJob(SND_LoadJob, JobPool_Background); + SND_LoadJob_Sig *sig = PushStruct(job->arena, SND_LoadJob_Sig); + sig->resource = resource; + sig->asset = asset; + sig->flags = flags; + CloseJob(job); + } if (wait) { AC_YieldOnAssetReady(asset); diff --git a/src/sprite/sprite.c b/src/sprite/sprite.c index 2f9a462a..2150f796 100644 --- a/src/sprite/sprite.c +++ b/src/sprite/sprite.c @@ -286,13 +286,13 @@ S_Entry *S_FetchEntry(Resource resource, JobPool pool, S_FetchFlag flags) && !Atomic32Fetch(&entry->texture_touched) && !Atomic32FetchTestSet(&entry->texture_touched, 0, 1)) { - RunJob(1, S_LoadTextureJob, pool, JobPriority_Inherit, 0, .entry = entry); + RunJob(S_LoadTextureJob, .pool = pool, .sig.entry = entry); } if ((flags & S_FetchFlag_Sheet) && !Atomic32Fetch(&entry->sheet_touched) && !Atomic32FetchTestSet(&entry->sheet_touched, 0, 1)) { - RunJob(1, S_LoadSheetJob, pool, JobPriority_Inherit, 0, .entry = entry); + RunJob(S_LoadSheetJob, .pool = pool, .sig.entry = entry); } return entry; } diff --git a/src/sprite/sprite.h b/src/sprite/sprite.h index 95e164a5..9980cb53 100644 --- a/src/sprite/sprite.h +++ b/src/sprite/sprite.h @@ -121,16 +121,14 @@ Struct(S_EntryBin) }; //////////////////////////////// -//~ State +//~ State types #define S_EntryBinsCount 1024 Struct(S_SharedState) { S_EntryBin entry_bins[S_EntryBinsCount]; -}; - -extern S_SharedState S_shared_state; +} extern S_shared_state; //////////////////////////////// //~ Load jobs diff --git a/src/tar/tar.h b/src/tar/tar.h index 7fdc959d..4fc8138f 100644 --- a/src/tar/tar.h +++ b/src/tar/tar.h @@ -12,7 +12,7 @@ Struct(TAR_Entry) b32 is_dir; TAR_Entry *next; TAR_Entry *next_child; /* If entry is dir, points to first child. Otherwise points to next sibling. */ -}; +} extern Readonly TAR_nil_entry; Struct(TAR_Archive) { @@ -20,8 +20,6 @@ Struct(TAR_Archive) TAR_Entry *head; }; -extern Readonly TAR_Entry TAR_nil_entry; - //////////////////////////////// //~ Header types diff --git a/src/ttf/ttf_dwrite/ttf_dwrite.c b/src/ttf/ttf_dwrite/ttf_dwrite.c index fb919f7d..3bbd8650 100644 --- a/src/ttf/ttf_dwrite/ttf_dwrite.c +++ b/src/ttf/ttf_dwrite/ttf_dwrite.c @@ -30,8 +30,7 @@ void TTF_Startup(void) #endif if (error != S_OK) { - /* FIXME: Enable this */ - //Panic(Lit("Error creating DWrite factory")); + Panic(Lit("Error creating DWrite factory")); (*(volatile int *)0) = 0; } } diff --git a/src/ttf/ttf_dwrite/ttf_dwrite.h b/src/ttf/ttf_dwrite/ttf_dwrite.h index aa800dca..7462fb60 100644 --- a/src/ttf/ttf_dwrite/ttf_dwrite.h +++ b/src/ttf/ttf_dwrite/ttf_dwrite.h @@ -140,7 +140,7 @@ static inline UINT32 IDWriteGdiInterop_Release EXTERN_C HRESULT DECLSPEC_IMPORT WINAPI DWriteCreateFactory (DWRITE_FACTORY_TYPE factoryType, const GUID* iid, void** factory) WIN_NOEXCEPT; //////////////////////////////// -//~ State +//~ State types /* TODO: Determine font dpi dynamically */ #define TTF_DW_Dpi (96.0f) @@ -148,6 +148,4 @@ EXTERN_C HRESULT DECLSPEC_IMPORT WINAPI DWriteCreateFactory (DWRITE_FACTORY_TYPE Struct(TTF_DW_SharedState) { struct IDWriteFactory5 *factory; -}; - -extern TTF_DW_SharedState TTF_DW_shared_state; +} extern TTF_DW_shared_state;