From b0666872c81259ec11228805bd4b976e726cbcfa Mon Sep 17 00:00:00 2001 From: jayli Date: Wed, 27 May 2026 16:17:19 -0700 Subject: [PATCH] Add dequeue + encode-done stamps to OnFrameSent Lets callers split FrameBuffer queue wait from encode + send time. - FrameBuffer.LastDequeueWallUs() returns time.Now().UnixMicro() at the most recent successful Read; black-frame timeouts preserve the prior value. - FrameSentCallback gains dequeuedAtWallUs and encodeDoneAtWallUs between captureTSUs and sentAtWallUs. - encodeAndSendTrack populates both from LastDequeueWallUs and tAfterRead. Eviction path passes 0 (no encode occurred). Tests cover LastDequeueWallUs advancing per Read, preservation on black-frame timeout, and timestamp ordering on the success path. --- sender/frame_buffer.go | 27 +++++++++++++++------- sender/frame_buffer_test.go | 46 +++++++++++++++++++++++++++++++++++++ sender/rtc_sender.go | 31 ++++++++++++++----------- sender/rtc_sender_test.go | 31 +++++++++++++++++++------ 4 files changed, 106 insertions(+), 29 deletions(-) diff --git a/sender/frame_buffer.go b/sender/frame_buffer.go index 01090ad..8d83650 100644 --- a/sender/frame_buffer.go +++ b/sender/frame_buffer.go @@ -71,14 +71,15 @@ type frameWithMeta struct { // FrameBuffer is a simple in-memory frame buffer that implements VideoSource // It can be used as a virtual video driver for testing or programmatic frame injection. type FrameBuffer struct { - frameChan chan frameWithMeta - closeChan chan struct{} - closeOnce sync.Once - width int - height int - id string - initialized bool - lastCaptureTSUs atomic.Int64 + frameChan chan frameWithMeta + closeChan chan struct{} + closeOnce sync.Once + width int + height int + id string + initialized bool + lastCaptureTSUs atomic.Int64 + lastDequeueWallUs atomic.Int64 } // NewFrameBuffer creates a new frame buffer with the specified dimensions. @@ -133,6 +134,7 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) { select { case fm := <-f.frameChan: f.lastCaptureTSUs.Store(fm.captureTSUs) + f.lastDequeueWallUs.Store(time.Now().UnixMicro()) return fm.img, func() {}, nil case <-f.closeChan: @@ -149,6 +151,7 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) { select { case fm := <-f.frameChan: f.lastCaptureTSUs.Store(fm.captureTSUs) + f.lastDequeueWallUs.Store(time.Now().UnixMicro()) return fm.img, func() {}, nil case <-f.closeChan: @@ -167,6 +170,14 @@ func (f *FrameBuffer) LastCaptureTSUs() int64 { return f.lastCaptureTSUs.Load() } +// LastDequeueWallUs returns time.Now().UnixMicro() at the most recent +// real-frame Read (the queue-exit instant), or 0 if no real frame has +// been consumed yet. Intended for the encode loop to read right after +// encodedReader.Read. +func (f *FrameBuffer) LastDequeueWallUs() int64 { + return f.lastDequeueWallUs.Load() +} + // SendFrame adds a frame to the buffer with no capture timestamp. // If the buffer is full, it drops the oldest frame and adds the new one. func (f *FrameBuffer) SendFrame(frame image.Image) error { diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index fca38d0..af9421a 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -324,6 +324,52 @@ func TestFrameBuffer_BlackFrame_FirstReadReturnsZero(t *testing.T) { "first black-frame Read on a fresh buffer should return 0") } +func TestFrameBuffer_LastDequeueWallUs_AdvancesOnRead(t *testing.T) { + fb := NewFrameBuffer(640, 480) + defer func() { _ = fb.Close() }() + fb.SetInitialized() + + assert.Zero(t, fb.LastDequeueWallUs(), "fresh buffer should report zero") + + testImg := image.NewRGBA(image.Rect(0, 0, 640, 480)) + _, err := fb.SendFrameWithCaptureTS(testImg, 111) + require.NoError(t, err) + + before := time.Now().UnixMicro() + _, release, err := fb.Read() + require.NoError(t, err) + release() + after := time.Now().UnixMicro() + + got := fb.LastDequeueWallUs() + assert.GreaterOrEqual(t, got, before, "dequeue stamp must be >= pre-Read wall clock") + assert.LessOrEqual(t, got, after, "dequeue stamp must be <= post-Read wall clock") +} + +func TestFrameBuffer_LastDequeueWallUs_PreservedOnBlackFrame(t *testing.T) { + fb := NewFrameBuffer(640, 480) + defer func() { _ = fb.Close() }() + + testImg := image.NewRGBA(image.Rect(0, 0, 640, 480)) + _, err := fb.SendFrameWithCaptureTS(testImg, 555) + require.NoError(t, err) + + // First Read consumes the buffered frame and stamps a real dequeue. + _, release, err := fb.Read() + require.NoError(t, err) + release() + stampAfterReal := fb.LastDequeueWallUs() + require.Positive(t, stampAfterReal) + + // Second Read hits the 100ms timeout (uninitialized + empty) and + // returns a black frame; LastDequeueWallUs must NOT be clobbered. + _, release, err = fb.Read() + require.NoError(t, err) + release() + assert.Equal(t, stampAfterReal, fb.LastDequeueWallUs(), + "black-frame Read should preserve LastDequeueWallUs") +} + func TestFrameBuffer_SendWithCaptureTs_OverflowReportsEviction(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index fd0da3a..8906311 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -79,15 +79,17 @@ type EncodedTrack struct { // encode pass. type EncodedFrameCallback func(trackID string, data []byte, isKeyframe bool) -// FrameSentCallback fires once per encoded frame after WriteSample -// returns, and once per frame evicted from a full buffer to make room. -// captureTSUs echoes the value passed to SendFrameWithCaptureTS (0 means -// no timestamp; filter when used for latency measurement). sentAtWallUs -// is time.Now().UnixMicro() at WriteSample return (or at eviction). -// dropped is true when WriteSample errored or the frame was evicted. -// Runs on the encode goroutine for completed sends, on the SendFrame -// caller's goroutine for evictions; must not block. -type FrameSentCallback func(trackID string, captureTSUs, sentAtWallUs int64, dropped bool) +// FrameSentCallback fires per encoded frame (after WriteSample) and per +// frame evicted from a full buffer. captureTSUs echoes the value passed +// to SendFrameWithCaptureTS (0 = none). dequeuedAtWallUs / encodeDoneAtWallUs +// / sentAtWallUs are time.Now().UnixMicro() at FrameBuffer.Read return, +// encodedReader.Read return, and WriteSample return respectively. On +// dropped=true (WriteSample error or eviction), dequeuedAtWallUs and +// encodeDoneAtWallUs are 0. Runs on the encode goroutine for completed +// sends, on the SendFrame caller's goroutine for evictions; must not block. +type FrameSentCallback func(trackID string, + captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs int64, + dropped bool) // RTCSender is a generic sender that can work with any frame source. type RTCSender struct { @@ -529,7 +531,7 @@ func (s *RTCSender) SendFrameWithCaptureTS(trackID string, frame image.Image, ca evicted, err := frameBuffer.SendFrameWithCaptureTS(frame, captureTSUs) if evicted && s.onFrameSent != nil { - s.onFrameSent(trackID, 0, time.Now().UnixMicro(), true) + s.onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true) } return err @@ -734,11 +736,11 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo } tAfterRead := time.Now() - // Capture timestamp for the just-encoded frame; non-FrameBuffer - // sources return 0. - var captureTSUs int64 + // Per-frame stamps; non-FrameBuffer sources report 0. + var captureTSUs, dequeuedAtWallUs int64 if cfs, ok := track.videoSource.(*FrameBuffer); ok { captureTSUs = cfs.LastCaptureTSUs() + dequeuedAtWallUs = cfs.LastDequeueWallUs() } track.bitrateTracker.AddFrame(len(encoded.Data), tAfterRead) @@ -766,7 +768,8 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo } if s.onFrameSent != nil { - s.onFrameSent(trackID, captureTSUs, tAfterWrite.UnixMicro(), writeErr != nil) + s.onFrameSent(trackID, captureTSUs, dequeuedAtWallUs, + tAfterRead.UnixMicro(), tAfterWrite.UnixMicro(), writeErr != nil) } release() diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index 0c6fd6a..8ee8e0b 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -733,19 +733,26 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) { require.NoError(t, err) type sentEvent struct { - trackID string - captureTSUs int64 - sentAtWallUs int64 - dropped bool + trackID string + captureTSUs int64 + dequeuedAtWallUs int64 + encodeDoneAtWallUs int64 + sentAtWallUs int64 + dropped bool } var ( mu sync.Mutex events []sentEvent ) - sender.SetOnFrameSent(func(trackID string, captureTSUs, sentAtWallUs int64, dropped bool) { + sender.SetOnFrameSent(func(trackID string, + captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs int64, + dropped bool, + ) { mu.Lock() defer mu.Unlock() - events = append(events, sentEvent{trackID, captureTSUs, sentAtWallUs, dropped}) + events = append(events, sentEvent{ + trackID, captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs, dropped, + }) }) // libvpx has a one-frame encoder lookahead, so a single SendFrame + @@ -782,8 +789,14 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) { require.NotNil(t, got, "OnFrameSent never echoed the supplied captureTSUs within 2s") assert.Equal(t, "cam-0", got.trackID) assert.Equal(t, wantCaptureTS, got.captureTSUs) + assert.Positive(t, got.dequeuedAtWallUs, "dequeuedAtWallUs should be set on success path") + assert.Positive(t, got.encodeDoneAtWallUs, "encodeDoneAtWallUs should be set on success path") assert.Positive(t, got.sentAtWallUs, "sentAtWallUs should be a real wall-clock microsecond") assert.False(t, got.dropped, "WriteSample should succeed for an in-spec frame") + assert.LessOrEqual(t, got.dequeuedAtWallUs, got.encodeDoneAtWallUs, + "dequeue must precede encode-done") + assert.LessOrEqual(t, got.encodeDoneAtWallUs, got.sentAtWallUs, + "encode-done must precede send") } func TestRTCSender_OnFrameSent_FiresWithDroppedOnEviction(t *testing.T) { @@ -804,11 +817,15 @@ func TestRTCSender_OnFrameSent_FiresWithDroppedOnEviction(t *testing.T) { dropped int nonDrop int ) - sender.SetOnFrameSent(func(_ string, _ int64, _ int64, isDrop bool) { + sender.SetOnFrameSent(func(_ string, _, dequeued, encodeDone, _ int64, isDrop bool) { mu.Lock() defer mu.Unlock() if isDrop { dropped++ + // Evictions must report zero for the intermediate timestamps; + // no encode happened. + assert.Zero(t, dequeued, "evictions should report dequeuedAtWallUs=0") + assert.Zero(t, encodeDone, "evictions should report encodeDoneAtWallUs=0") } else { nonDrop++ }