power_play/src/base/base_async.c

109 lines
2.7 KiB
C

////////////////////////////////////////////////////////////
//~ Bootstrap
void BootstrapAsync(void)
{
// TODO: Dynamic lane counts
DispatchWave(Lit("Async"), 4, AsyncWorkerEntryPoint, 0);
}
////////////////////////////////////////////////////////////
//~ Async ops
void OnAsyncTick(AsyncTickCallbackFunc *func)
{
Arena *perm = PermArena();
AsyncTickCallbackNode *n = PushStruct(perm, AsyncTickCallbackNode);
n->callback.func = func;
Lock lock = LockE(&Base.async.mutex);
{
SllQueuePush(Base.async.first_callback_node, Base.async.last_callback_node, n);
Base.async.callback_nodes_count += 1;
}
Unlock(&lock);
}
void SignalAsyncTick(void)
{
Atomic64FetchAdd(&Base.async.signal.v, 1);
FutexWakeNeq(&Base.async.signal.v);
}
////////////////////////////////////////////////////////////
//~ Async worker
void AsyncWorkerEntryPoint(WaveLaneCtx *lane)
{
AsyncFrameLaneCtx frame = Zi;
frame.arena = AcquireArena(Gibi(64));
// Tick forever
for (;;)
{
AsyncWorkerCtx *w = &Base.async.worker;
//////////////////////////////
//- Begin tick
if (lane->idx == 0)
{
// Wait for signal
{
i64 passive_timeout_ns = NsFromSeconds(0.25);
i64 now_ns = TimeNs();
i64 passive_run_at_ns = now_ns + passive_timeout_ns;
i64 cur_signal = Atomic64Fetch(&Base.async.signal.v);
while (cur_signal <= w->last_seen_signal && (passive_run_at_ns - now_ns) > 1000000)
{
FutexYieldNeq(&Base.async.signal.v, &cur_signal, sizeof(cur_signal), passive_run_at_ns - now_ns);
cur_signal = Atomic64Fetch(&Base.async.signal.v);
now_ns = TimeNs();
}
w->last_seen_signal = cur_signal;
}
// Collect callbacks
{
Lock lock = LockE(&Base.async.mutex);
{
w->callbacks_count = Base.async.callback_nodes_count;
w->callbacks = PushStructsNoZero(frame.arena, AsyncTickCallback, w->callbacks_count);
u64 callback_idx = 0;
for (AsyncTickCallbackNode *n = Base.async.first_callback_node; n; n = n->next)
{
w->callbacks[callback_idx] = n->callback;
++callback_idx;
}
}
Unlock(&lock);
}
}
WaveSync(lane);
//////////////////////////////
//- Run async callbacks
for (u64 callback_idx = 0; callback_idx < w->callbacks_count; ++callback_idx)
{
AsyncTickCallback *callback = &w->callbacks[callback_idx];
callback->func(lane, &frame);
}
//////////////////////////////
//- End tick
WaveSync(lane);
ResetArena(frame.arena);
{
Arena *old_frame_arena = frame.arena;
ZeroStruct(&frame);
frame.arena = old_frame_arena;
}
}
}