Skip to content

Commit bc52ffb

Browse files
MaxHeimbrockclaude
andauthored
Reduce main thread dispatch if not needed (#269)
* Dispatch raw-safe one-shot completions on the FFI thread Adds an opt-in `rawSafe` flag to `RegisterPendingCallback`. When set, `FFICallback` resolves the pending callback directly on the FFI callback thread instead of marshaling through Unity's main-thread SynchronizationContext. Saves up to one frame of latency on async ops whose completion only mutates volatile YieldInstruction state. `YieldInstruction.IsDone`/`IsError` are converted to volatile-backed properties so the release semantics of the completion's volatile write make any preceding state mutations visible to the main-thread reader once it observes IsDone == true. Opted in: the three generic instruction classes (`FfiInstruction<T>`, `FfiStreamInstruction<T>`, `FfiStreamResultInstruction<T,U>`) which only set IsDone/IsError/Error/ result and do not touch Unity APIs. This covers SetLocalName/Metadata/ Attributes, UnpublishTrack, all stream Read/Write/Close/WriteToFile. Not opted in: bespoke completion handlers (Connect, PublishTrack, GetStats, CaptureAudioFrame, PerformRpc, SendText, SendFile, TextStreamOpen, ByteStreamOpen, PublishDataTrack). Those create C# objects and/or fire user events and need a separate audit before they can move off the main thread. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Dispatch ByteStreamReaderEvent on the FFI thread Short-circuits ByteStreamReaderEvent in FFICallback so chunk-received events land in the incremental read buffer without waiting for the next main-thread frame drain. Mirrors the AudioStreamEvent fast path. The internal subscriber (ByteStreamReader.ReadIncrementalInstruction) forwards into ReadIncrementalInstructionBase, which now serializes mutations of the chunk queue, latest chunk, error, and the IsEos / IsCurrentReadDone flags under a single lock. The flag fields on StreamYieldInstruction are made volatile so the main-thread coroutine's keepWaiting poll observes the FFI thread's writes without acquiring the lock. Only ReadIncremental consumers benefit; ReadAll still goes through FfiStreamResultInstruction (already raw-safe via the previous commit). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Dispatch TextStreamReaderEvent on the FFI thread Mirrors the byte-stream fast path. TextStreamReader.ReadIncrementalInstruction shares ReadIncrementalInstructionBase<TContent> with the byte reader, so the lock added in the previous commit already covers text chunk mutations — only the FFICallback short-circuit needs to be added. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Add threading tests for raw-safe dispatch and chunk lock Three new EditMode tests in DataStreamIncrementalReadTests covering the ReadIncrementalInstructionBase lock added for raw chunk dispatch: - producer/consumer with 5000 chunks, asserts FIFO and no loss - OnEos racing with chunks, asserts consistent post-race state - Reset racing with OnChunk, asserts no chunk lost (200 trials) YieldInstructionThreadingTests is a new file with three tests that guard the volatile semantics of YieldInstruction.IsDone / IsError: - background thread sets IsDone, foreground spin observes it - keepWaiting flips after a background completion - IsError write before IsDone is visible after the volatile read Each test has a Timeout attribute so a regression that breaks the volatile read or the lock fails the runner instead of hanging it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Drop NUnit [Timeout] from threading tests The [Timeout] attribute does not behave reliably in our CI runner. Removed it from all six threading tests and replaced the only unbounded Task.Wait() with a Wait(TimeSpan) + Assert.Fail pair so a genuine deadlock still produces a clean test failure rather than hanging until the outer CI wall-clock kills the runner. The in-test DateTime.UtcNow deadlines on the spin loops already convert hang regressions into descriptive Assert.Fail messages, so the [Timeout] attribute was redundant safety on top of those. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix Reset/OnChunk race in ReadIncrementalInstructionBase base.Reset() writes IsCurrentReadDone=false outside the chunk lock because StreamYieldInstruction owns the field but doesn't know about the lock the subclass added. That left a window where a producer's OnChunk could acquire the lock between base.Reset() and Reset()'s re-acquisition, write _latestChunk and set IsCurrentReadDone=true, and have its chunk immediately overwritten by Reset()'s subsequent queue dequeue. The producer's chunk was lost. Empirically the race fired ~4% of the time under stress (150-300 chunks lost out of 5000 across 5 iterations of OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder). Fix: move base.Reset() inside the lock so the entire IsCurrentReadDone false-then-maybe-true sequence is atomic against OnChunk. After fix: 5/5 iterations × 6 tests = 30/30 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Opt remaining FfiStream*Instruction classes into raw-safe dispatch The earlier raw-safe commit only flipped FfiInstruction<T> and missed the two sibling generic classes despite the commit message claiming all three were opted in. They follow the same pattern: only mutate volatile YieldInstruction state plus an Error / _result reference set strictly before IsDone, no Unity APIs touched. Adds rawSafe: true to: - FfiStreamInstruction<TCallback> — covers ByteStreamWriter.WriteInstruction / CloseInstruction and TextStreamWriter.WriteInstruction / CloseInstruction - FfiStreamResultInstruction<TCallback,TResult> — covers ByteStreamReader.ReadAllInstruction / WriteToFileInstruction and TextStreamReader.ReadAllInstruction Verified: 5/5 iterations × 6 threading tests = 30/30 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Test that rawSafe dispatch actually moves off the main thread The existing threading tests verify cross-thread state visibility (volatile IsDone, lock around chunk mutations) but cannot detect a regression that silently routed raw-safe completions back through SynchronizationContext.Post — the volatile/lock work is correct in either case. Adds three EditMode tests that prove the FFI-thread fast path is actually taken: RawSafeTrueCallback_RunsOnDispatchingBackgroundThread Registers a rawSafe:true pending callback, dispatches via TryDispatchRawSafe from a freshly-created Thread, asserts the completion captured the dispatcher's ManagedThreadId — not the test main thread's. This is the load-bearing test. RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComplete Negative control: TryDispatchRawSafe returns false for entries registered with rawSafe:false and does not invoke the completion. TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread Sanity check that the underlying race-proof dispatch is synchronous on the caller's thread regardless of rawSafe — what makes the rawSafe path "raw" is that FFICallback (the FFI thread) is the caller, not the dispatch mechanism itself. Two FfiClient methods (TryDispatchRawSafe, TryDispatchPendingCallback) go from private to internal so the tests can invoke them directly. The race-semantics comments are unchanged. Verified: 3 tests x 5 iterations = 15/15 passing via Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Test rawSafe wiring end-to-end via RouteFfiEvent extraction The earlier RawSafeDispatchTests called TryDispatchRawSafe directly, which proves the inner method is synchronous on the caller's thread but does not prove FFICallback actually invokes it for raw-safe entries. Commenting out the if-block in FFICallback would not have made any of those tests fail. Refactor: extract everything in FFICallback after the byte parse into RouteFfiEvent(FfiEvent). FFICallback shrinks to disposed-check, parse, RouteFfiEvent. Production traffic still always lands in FFICallback; tests can now drive RouteFfiEvent directly from a chosen thread without P/Invoke pointer juggling. Two new integration tests: RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread Calls RouteFfiEvent from a fresh Thread, asserts the rawSafe completion captured the dispatcher thread's id (not the test main thread). Verified to fail with a clear message when the TryDispatchRawSafe call is removed from RouteFfiEvent. RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCompletion Swaps FfiClient.Instance._context for a RecordingSynchronizationContext, dispatches a non-raw entry, asserts the completion was NOT run on the dispatcher thread, asserts exactly one item was posted to the SC, then drains the queue from the test thread and asserts the completion runs on the drainer thread. Verified: 5 tests x 5 iterations = 25/25 passing via Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch. Negative control: with the rawSafe block in RouteFfiEvent commented out, the new integration test fails with: "Completion did not run — RouteFfiEvent may be missing the TryDispatchRawSafe call for rawSafe entries." Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Remove unreachable switch cases for fast-path events in DispatchEvent AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent all early-return inside RouteFfiEvent before reaching the SynchronizationContext.Post that drains into DispatchEvent. The switch arms for those three events were therefore unreachable — DispatchEvent is private and only the post lambda calls it. Replaced with a single comment pointing readers at the fast-path returns. AudioStreamEvent's case had been dead since the audio fast-path was added (predates this branch); the other two became dead in cc2ed4f / 967d0b9. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Dispatch Logs on the FFI callback thread Adds Logs to RouteFfiEvent's fast-path early-returns alongside the audio and stream-reader events. UnityEngine.Debug.unityLogger is one of the few Unity APIs documented thread-safe — it queues to the console drain internally — so calling Utils.HandleLogBatch from the FFI thread is safe. Skipping the main-thread post matters most during error storms, panics, and LK_VERBOSE noise where Rust may emit log batches faster than Unity drains its post queue. Logs now reach the console without a one-frame delay even if the main thread is busy or wedged. There are no public subscribers for the Logs event, so this has no effect on user-facing surfaces. The unreachable Logs case in DispatchEvent's switch is removed (now covered by the comment that already documents the other fast-path events). Verified: 11 tests x 5 iterations = 55/55 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Rename rawSafe to dispatchToMainThread / TrySkipDispatch The rawSafe flag described a capability ("safe to run on the FFI thread") but its name was opaque, and TryDispatchRawSafe was a misnomer — the method doesn't dispatch anything; it runs the completion inline on the caller's thread. Rename for clarity: rawSafe: false (default) → dispatchToMainThread: true (default) rawSafe: true → dispatchToMainThread: false TryDispatchRawSafe → TrySkipDispatch PendingCallbackBase.RawSafe → DispatchToMainThread The default flips polarity but the meaning is the same: by default, completions go through SynchronizationContext.Post (safe for any handler that touches Unity APIs); the four generic FfiInstruction* classes opt out by passing dispatchToMainThread:false because their onComplete only mutates volatile state. TrySkipDispatch reads correctly at the call site: if (Instance.TrySkipDispatch(...)) return; // fall through to dispatch Instance._context?.Post(...); — "try to skip the dispatch; if we can't, dispatch normally." TryDispatchPendingCallback keeps its name: "dispatch" there means function-dispatch (route the event to its registered handler), a different sense of the word and at a different abstraction level. Test file renamed RawSafeDispatchTests.cs → SkipDispatchTests.cs (GUID preserved so Unity sees the rename, not delete+add). Test class and method names updated to match the new vocabulary. Verified: 11 tests x 5 iterations = 55/55 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Cleanup * Adds the EventThreadingModel.md to document FFI event dispatch behaviour * Adding .meta file for new doc file --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6d78387 commit bc52ffb

11 files changed

Lines changed: 746 additions & 44 deletions

EventThreadingModel.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
## Summary
2+
3+
FFI events whose handlers don't touch Unity APIs run directly on the FFI callback thread instead of being marshaled to Unity's main thread via `SynchronizationContext.Post`.
4+
5+
`FFICallback` previously usually routes Rust events (except `AudioStreamEvent`) through `_context.Post` to Unity's main thread. That's the safe default for handlers that touch Unity APIs (`Texture2D`, `GameObject`, `Transform`, …) but it costs one frame of latency for handlers that don't. Four categories of events can skip that:
6+
7+
1. **Audio stream events** that are written to the audio stream ring buffer and consumed on audio thread.
8+
2. **One-shot async completions** that only flip `IsDone` on a `YieldInstruction``SetMetadata`, `UnpublishTrack`, all stream `Read/Write/Close` ops.
9+
3. **Stream reader chunk events** that just append bytes/strings to an internal buffer.
10+
4. **Log batches**`UnityEngine.Debug.unityLogger` is documented thread-safe; the post hop adds latency without benefit, especially during error storms or `LK_VERBOSE` noise.
11+
12+
## Logic in code:
13+
14+
```csharp
15+
internal static void RouteFfiEvent(FfiEvent response)
16+
{
17+
if (_isDisposed) return;
18+
19+
// 1. Per-event-type fast paths — invoke handler directly on FFI thread.
20+
if (response.MessageCase == FfiEvent.MessageOneofCase.AudioStreamEvent) { ...; return; }
21+
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) { ...; return; }
22+
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) { ...; return; }
23+
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) { ...; return; }
24+
25+
// 2. One-shot completion fast path — opted-in pending callbacks complete inline.
26+
var requestAsyncId = ExtractRequestAsyncId(response);
27+
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
28+
return;
29+
30+
// 3. Fallback — post to Unity's main-thread sync context.
31+
Instance._context?.Post(static (resp) =>
32+
{
33+
var r = resp as FfiEvent;
34+
if (r == null) return;
35+
DispatchEvent(r);
36+
}, response);
37+
}
38+
```
39+
40+
## Event Table
41+
42+
| Event | Where it runs | Why |
43+
| --- | --- | --- |
44+
| `AudioStreamEvent` | **FFI thread** (unchanged) | Audio thread consumes the data; main-thread latency would hurt timing |
45+
| `Logs` | **FFI thread** (new) | `Debug.unityLogger` is thread-safe; logs reach console immediately during panics / errors |
46+
| `ByteStreamReaderEvent` | **FFI thread** (new) | Internal buffer is now lock-protected; chunks land without frame delay |
47+
| `TextStreamReaderEvent` | **FFI thread** (new) | Same lock as byte path (shared `ReadIncrementalInstructionBase`) |
48+
| One-shot completions via `FfiInstruction<T>` | **FFI thread** (new) | `SetLocalMetadata`, `SetLocalName`, `SetLocalAttributes`, `UnpublishTrack` — only flip `IsDone`/`IsError` |
49+
| One-shot completions via `FfiStreamInstruction<T>` | **FFI thread** (new) | `ByteStreamWriter.Write/Close`, `TextStreamWriter.Write/Close` |
50+
| One-shot completions via `FfiStreamResultInstruction<T,U>` | **FFI thread** (new) | `ByteStreamReader.ReadAll/WriteToFile`, `TextStreamReader.ReadAll` |
51+
| `RoomEvent` | Main thread | Fires user-facing `ParticipantConnected`, `TrackPublished`, etc. |
52+
| `TrackEvent` | Main thread | (No subscribers today; main-thread default for safety) |
53+
| `RpcMethodInvocation` | Main thread | User RPC handlers commonly touch game state |
54+
| `Disconnect` | Main thread | UI updates typical |
55+
| `VideoStreamEvent` | Main thread | Internal buffering is fast; user-facing raw delivery deferred (see follow-ups) |
56+
| `DataTrackStreamEvent` | Main thread | Deferred until a concrete consumer asks |
57+
| `Connect` (one-shot) | Main thread | Bespoke handler fires participant-connected events |
58+
| `PublishTrack` (one-shot) | Main thread | Bespoke handler |
59+
| `GetStats` (one-shot) | Main thread | Bespoke handler |
60+
| `CaptureAudioFrame` (one-shot) | Main thread | Bespoke handler |
61+
| `PerformRpc` (one-shot) | Main thread | Bespoke handler surfaces response |
62+
| `SendText` / `SendFile` (one-shot) | Main thread | Bespoke handlers |
63+
| `TextStreamOpen` / `ByteStreamOpen` (one-shot) | Main thread | Bespoke handlers return writer objects |
64+
| `PublishDataTrack` (one-shot) | Main thread | Bespoke handler |

EventThreadingModel.md.meta

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/Scripts/DataStream.cs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
8080
private readonly Queue<TContent> _pendingChunks = new();
8181
private TContent _latestChunk;
8282

83+
// Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter
84+
// run on the main-thread coroutine. _gate serializes mutations of the queue,
85+
// _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides.
86+
private readonly object _gate = new();
87+
8388
/// <summary>
8489
/// Error that occurred on the last read, if any.
8590
/// </summary>
@@ -94,8 +99,11 @@ protected TContent LatestChunk
9499
{
95100
get
96101
{
97-
if (Error != null) throw Error;
98-
return _latestChunk;
102+
lock (_gate)
103+
{
104+
if (Error != null) throw Error;
105+
return _latestChunk;
106+
}
99107
}
100108
}
101109

@@ -108,34 +116,48 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle)
108116

109117
protected void OnChunk(TContent content)
110118
{
111-
if (IsCurrentReadDone)
112-
{
113-
// Consumer hasn't yielded since the last chunk; buffer until Reset().
114-
_pendingChunks.Enqueue(content);
115-
}
116-
else
119+
lock (_gate)
117120
{
118-
_latestChunk = content;
119-
IsCurrentReadDone = true;
121+
if (IsCurrentReadDone)
122+
{
123+
// Consumer hasn't yielded since the last chunk; buffer until Reset().
124+
_pendingChunks.Enqueue(content);
125+
}
126+
else
127+
{
128+
_latestChunk = content;
129+
IsCurrentReadDone = true;
130+
}
120131
}
121132
}
122133

123134
public override void Reset()
124135
{
125-
base.Reset();
126-
if (_pendingChunks.Count > 0)
136+
// base.Reset() must run under the same lock as OnChunk, otherwise the
137+
// window between IsCurrentReadDone=false (from base) and the dequeue
138+
// below lets a producer race in, write _latestChunk, and have its
139+
// chunk immediately overwritten by the dequeue. That race lost ~4% of
140+
// chunks under stress before this fix.
141+
lock (_gate)
127142
{
128-
_latestChunk = _pendingChunks.Dequeue();
129-
IsCurrentReadDone = true;
143+
base.Reset();
144+
if (_pendingChunks.Count > 0)
145+
{
146+
_latestChunk = _pendingChunks.Dequeue();
147+
IsCurrentReadDone = true;
148+
}
130149
}
131150
}
132151

133152
protected void OnEos(Proto.StreamError protoError)
134153
{
135-
IsEos = true;
136-
if (protoError != null)
154+
lock (_gate)
137155
{
138-
Error = new StreamError(protoError);
156+
IsEos = true;
157+
if (protoError != null)
158+
{
159+
Error = new StreamError(protoError);
160+
}
139161
}
140162
}
141163
}

Runtime/Scripts/Internal/FFIClient.cs

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ internal void RegisterPendingCallback<TCallback>(
186186
ulong requestAsyncId,
187187
Func<FfiEvent, TCallback?> selector,
188188
Action<TCallback> onComplete,
189-
Action? onCancel = null
189+
Action? onCancel = null,
190+
bool dispatchToMainThread = true
190191
) where TCallback : class
191192
{
192193
// Request registration must happen before the request is sent. That ordering is what
@@ -198,7 +199,13 @@ internal void RegisterPendingCallback<TCallback>(
198199
//
199200
// Duplicate IDs are treated as a hard error because they would allow two unrelated
200201
// requests to compete for the same completion slot.
201-
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel);
202+
//
203+
// dispatchToMainThread defaults to true: completion runs on Unity's main thread via
204+
// SynchronizationContext.Post, which is safe for any onComplete that touches Unity
205+
// APIs or fires user events. Pass false when the onComplete only mutates volatile
206+
// YieldInstruction state — RouteFfiEvent will then run it inline on the FFI callback
207+
// thread instead of paying a frame of latency for the post.
208+
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel, dispatchToMainThread);
202209
if (!pendingCallbacks.TryAdd(requestAsyncId, pending))
203210
{
204211
throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}");
@@ -278,6 +285,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
278285

279286
var respData = new Span<byte>(data.ToPointer()!, (int)size.ToUInt64());
280287
var response = FfiEvent.Parser!.ParseFrom(respData);
288+
RouteFfiEvent(response);
289+
}
290+
291+
// Routing logic split out from FFICallback so tests can drive it from a
292+
// chosen thread without going through the P/Invoke entry point. Running
293+
// production traffic still always lands here via FFICallback above.
294+
internal static void RouteFfiEvent(FfiEvent response)
295+
{
296+
if (_isDisposed) return;
281297

282298
// Audio stream events are handled directly on the FFI callback thread
283299
// to bypass the main thread, since the audio thread consumes the data
@@ -287,6 +303,47 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
287303
return;
288304
}
289305

306+
// Log batches are forwarded directly. UnityEngine.Debug.unityLogger is
307+
// documented thread-safe; Unity's logger queues to its console drain
308+
// internally. Skipping the main-thread post means logs reach the
309+
// console without a one-frame delay — useful during error storms,
310+
// panics, or LK_VERBOSE noise where the post queue could otherwise
311+
// back up.
312+
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs)
313+
{
314+
Utils.HandleLogBatch(response.Logs);
315+
return;
316+
}
317+
318+
// Byte stream reader events feed an internal incremental-read buffer that
319+
// already serializes mutations under its own lock. Skipping the main-thread
320+
// post lets chunks land in the buffer immediately rather than waiting for
321+
// the next frame drain.
322+
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent)
323+
{
324+
Instance.ByteStreamReaderEventReceived?.Invoke(response.ByteStreamReaderEvent!);
325+
return;
326+
}
327+
328+
// Same treatment for text stream readers — they share
329+
// ReadIncrementalInstructionBase<TContent> with the byte path, so the
330+
// lock added there already protects all state mutations.
331+
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent)
332+
{
333+
Instance.TextStreamReaderEventReceived?.Invoke(response.TextStreamReaderEvent!);
334+
return;
335+
}
336+
337+
// One-shot completions registered with dispatchToMainThread:false also bypass the
338+
// main thread. The pending callback's onComplete only mutates volatile
339+
// YieldInstruction fields, so resolving it here saves up to one frame of latency
340+
// on async ops like SetMetadata / UnpublishTrack / stream Read/Write/Close.
341+
var requestAsyncId = ExtractRequestAsyncId(response);
342+
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
343+
{
344+
return;
345+
}
346+
290347
// Run on the main thread, the order of execution is guaranteed by Unity
291348
// It uses a Queue internally
292349
Instance._context?.Post(static (resp) =>
@@ -316,9 +373,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)
316373

317374
switch (ffiEvent.MessageCase)
318375
{
319-
case FfiEvent.MessageOneofCase.Logs:
320-
Utils.HandleLogBatch(ffiEvent.Logs);
321-
break;
322376
case FfiEvent.MessageOneofCase.PublishData:
323377
break;
324378
case FfiEvent.MessageOneofCase.RoomEvent:
@@ -338,15 +392,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)
338392
case FfiEvent.MessageOneofCase.VideoStreamEvent:
339393
Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!);
340394
break;
341-
case FfiEvent.MessageOneofCase.AudioStreamEvent:
342-
Instance.AudioStreamEventReceived?.Invoke(ffiEvent.AudioStreamEvent!);
343-
break;
344-
case FfiEvent.MessageOneofCase.ByteStreamReaderEvent:
345-
Instance.ByteStreamReaderEventReceived?.Invoke(ffiEvent.ByteStreamReaderEvent!);
346-
break;
347-
case FfiEvent.MessageOneofCase.TextStreamReaderEvent:
348-
Instance.TextStreamReaderEventReceived?.Invoke(ffiEvent.TextStreamReaderEvent!);
349-
break;
350395
case FfiEvent.MessageOneofCase.DataTrackStreamEvent:
351396
Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!);
352397
break;
@@ -357,7 +402,7 @@ private static void DispatchEvent(FfiEvent ffiEvent)
357402
}
358403
}
359404

360-
private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
405+
internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
361406
{
362407
// Remove-first dispatch is the key race-proofing step.
363408
//
@@ -385,6 +430,26 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
385430
return false;
386431
}
387432

433+
// Inline-completion fast path for one-shot callbacks whose onComplete only
434+
// mutates volatile YieldInstruction state (no Unity APIs, no user-event
435+
// invocations). Returning true means the caller has been fully handled here
436+
// — no further dispatch needed. Returning false means the caller should fall
437+
// through to its normal main-thread post path. Same race model as
438+
// TryDispatchPendingCallback: the side that wins TryRemove is the only side
439+
// that may invoke the completion.
440+
internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent)
441+
{
442+
if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending))
443+
{
444+
return false;
445+
}
446+
if (pending.DispatchToMainThread)
447+
{
448+
return false;
449+
}
450+
return TryDispatchPendingCallback(requestAsyncId, ffiEvent);
451+
}
452+
388453
private static ulong? ExtractRequestAsyncId(FfiEvent ffiEvent)
389454
{
390455
// This switch is only concerned with one-shot async completion callbacks that echo
@@ -420,6 +485,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
420485

421486
private abstract class PendingCallbackBase
422487
{
488+
public abstract bool DispatchToMainThread { get; }
423489
public abstract bool TryComplete(FfiEvent ffiEvent);
424490
public abstract void Cancel();
425491
}
@@ -429,16 +495,21 @@ private sealed class PendingCallback<TCallback> : PendingCallbackBase where TCal
429495
private readonly Func<FfiEvent, TCallback?> selector;
430496
private readonly Action<TCallback> onComplete;
431497
private readonly Action? onCancel;
498+
private readonly bool dispatchToMainThread;
499+
500+
public override bool DispatchToMainThread => dispatchToMainThread;
432501

433502
public PendingCallback(
434503
Func<FfiEvent, TCallback?> selector,
435504
Action<TCallback> onComplete,
436-
Action? onCancel
505+
Action? onCancel,
506+
bool dispatchToMainThread
437507
)
438508
{
439509
this.selector = selector;
440510
this.onComplete = onComplete;
441511
this.onCancel = onCancel;
512+
this.dispatchToMainThread = dispatchToMainThread;
442513
}
443514

444515
public override bool TryComplete(FfiEvent ffiEvent)

Runtime/Scripts/Internal/FfiInstruction.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ internal FfiInstruction(
2828
{
2929
IsError = true;
3030
IsDone = true;
31-
});
31+
},
32+
dispatchToMainThread: false);
3233
}
3334
}
3435

@@ -63,7 +64,8 @@ internal FfiStreamInstruction(
6364
Error = new StreamError("Canceled");
6465
IsError = true;
6566
IsDone = true;
66-
});
67+
},
68+
dispatchToMainThread: false);
6769
}
6870
}
6971

@@ -116,7 +118,8 @@ internal FfiStreamResultInstruction(
116118
Error = new StreamError("Canceled");
117119
IsError = true;
118120
IsDone = true;
119-
});
121+
},
122+
dispatchToMainThread: false);
120123
}
121124
}
122125
}

0 commit comments

Comments
 (0)