diff --git a/sender/frame_buffer.go b/sender/frame_buffer.go index 8d83650..3f894ca 100644 --- a/sender/frame_buffer.go +++ b/sender/frame_buffer.go @@ -83,9 +83,16 @@ type FrameBuffer struct { } // NewFrameBuffer creates a new frame buffer with the specified dimensions. +// +// Capacity 2: one in-flight + one staged. Sized for low-latency real-time +// senders where source > encoder rate. When the source is steady (no bursts +// and no encoder slack to drain a backlog), encoder throughput is set by +// encoder CPU capacity regardless of capacity, so a deeper queue only adds +// wait time without delivering more frames. Capacity 2 tolerates one frame +// of arrival/encode jitter; capacity 1 would drop on any same-tick burst. func NewFrameBuffer(width, height int) *FrameBuffer { return &FrameBuffer{ - frameChan: make(chan frameWithMeta, 8), // Increased from 2 to 8 for better buffering + frameChan: make(chan frameWithMeta, 2), closeChan: make(chan struct{}), width: width, height: height, diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index af9421a..c12176c 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -374,15 +374,15 @@ func TestFrameBuffer_SendWithCaptureTs_OverflowReportsEviction(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() - // Buffer capacity is 8. The first 8 sends fit; the 9th must evict the + // Buffer capacity is 2. The first 2 sends fit; the 3rd must evict the // oldest entry so downstream SLO accounting can see the overload. testImg := image.NewRGBA(image.Rect(0, 0, 640, 480)) - for i := range 8 { + for i := range 2 { evicted, err := fb.SendFrameWithCaptureTS(testImg, int64(i+1)) require.NoError(t, err) require.False(t, evicted, "send %d should not evict before capacity", i) } - evicted, err := fb.SendFrameWithCaptureTS(testImg, 9) + evicted, err := fb.SendFrameWithCaptureTS(testImg, 3) require.NoError(t, err) assert.True(t, evicted, "send beyond capacity should report eviction") } diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index 8ee8e0b..057bfc7 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -757,14 +757,16 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) { // libvpx has a one-frame encoder lookahead, so a single SendFrame + // processEncodedFrames will not produce an encoded output yet. Drive - // a small batch and pump processEncodedFrames until the callback has - // seen at least one event carrying a non-zero captureTSUs. This - // matches the real pipeline where the encoder runs at ~30fps with - // frames continuously injected by the cgo bridge. + // a small batch and pump processEncodedFrames between sends so the + // 2-slot FrameBuffer drains and the first frame is not evicted before + // the encoder consumes it. This matches the real pipeline where the + // encoder runs at ~30fps with frames continuously injected by the + // cgo bridge. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) const wantCaptureTS int64 = 1_700_000_000_000 for i, ts := range []int64{wantCaptureTS, wantCaptureTS + 33_000, wantCaptureTS + 66_000} { require.NoError(t, sender.SendFrameWithCaptureTS("cam-0", testImg, ts), "send %d", i) + sender.processEncodedFrames() } deadline := time.Now().Add(2 * time.Second) @@ -831,12 +833,12 @@ func TestRTCSender_OnFrameSent_FiresWithDroppedOnEviction(t *testing.T) { } }) - // Saturate the 8-slot FrameBuffer, then send extras to force eviction. + // Saturate the 2-slot FrameBuffer, then send extras to force eviction. // The encode goroutine has not been driven, so the buffer never drains // and every send beyond capacity must evict and fire the callback with // dropped=true. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) - for range 8 { + for range 2 { require.NoError(t, sender.SendFrameWithCaptureTS("cam-0", testImg, 1)) } for range 3 {