From dfc44859fe912b61510477e9c3a9fb41bcf104c3 Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 00:10:52 -0700 Subject: [PATCH 1/6] Event-driven encode: per-track goroutine Replace the 100ms processEncodedFrames tick with per-track encode goroutines spawned in AddVideoTrack. FrameBuffer.Read() now blocks on frame arrival when initialized, so the encode loop runs on source cadence instead of the wall-clock tick. The BWE bitrate-update ticker in Start() stays at 100ms (its actual cadence requirement); only the encode trigger moves off it. encodeAndSendTrack snapshots encodedReader/videoTrack/videoSource/ callbacks under tracksMu.RLock so concurrent reconfiguration is safe. Close() cancels the per-track context first, then closes the source (unblocks Read via closeChan), then waits on encodeDone before closing mediaTrack -- no leak, no deadlock. Expected effect on real-time senders where source > encoder rate: removes up to one ticker interval of buffer_wait. Stacked on the depth=2 FrameBuffer change, p99 buffer_wait drops from ~115ms to one encoder period (~10-30ms) on a steady source. --- sender/frame_buffer.go | 8 +- sender/frame_buffer_test.go | 60 +++++++++++--- sender/rtc_sender.go | 156 +++++++++++++++++++----------------- sender/rtc_sender_test.go | 27 ++++--- 4 files changed, 148 insertions(+), 103 deletions(-) diff --git a/sender/frame_buffer.go b/sender/frame_buffer.go index 3f894ca..c215eaa 100644 --- a/sender/frame_buffer.go +++ b/sender/frame_buffer.go @@ -128,8 +128,8 @@ func (f *FrameBuffer) ResetInitialized() { } // Read returns the next available frame from the buffer. -// When initialized (normal operation), returns immediately with ErrNoFrameAvailable -// if no frame is ready. When not initialized (encoder init), blocks up to 100ms +// When initialized (normal operation), it blocks until a frame arrives or the +// buffer is closed. When not initialized (encoder init), it blocks up to 100ms // and returns a black frame for codec property detection. // // Side effect: stores the popped frame's captureTSUs for LastCaptureTSUs. @@ -137,7 +137,7 @@ func (f *FrameBuffer) ResetInitialized() { // lookahead can still correlate the previously-read real frame. func (f *FrameBuffer) Read() (image.Image, func(), error) { if f.initialized { - // Non-blocking fast path for normal operation. + // Blocking path for steady-state encode loops. select { case fm := <-f.frameChan: f.lastCaptureTSUs.Store(fm.captureTSUs) @@ -146,8 +146,6 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) { return fm.img, func() {}, nil case <-f.closeChan: return nil, func() {}, ErrBufferClosed - default: - return nil, func() {}, ErrNoFrameAvailable } } diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index c12176c..86bae9b 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -109,20 +109,36 @@ func TestFrameBuffer_BufferFull(t *testing.T) { } } -func TestFrameBuffer_ReadWithoutFrames(t *testing.T) { +func TestFrameBuffer_ReadWithoutFramesBlocksUntilFrame(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() - // Mark as initialized so we don't get black frames + // Mark as initialized so Read blocks for real frames. fb.SetInitialized() - // Try to read without sending any frames - // This should return ErrNoFrameAvailable immediately (non-blocking) - img, release, err := fb.Read() - assert.Nil(t, img) - assert.NotNil(t, release) - require.Error(t, err) - assert.ErrorIs(t, err, ErrNoFrameAvailable) + done := make(chan struct{}) + go func() { + defer close(done) + img, release, err := fb.Read() + require.NoError(t, err) + assert.NotNil(t, img) + release() + }() + + select { + case <-done: + t.Fatal("Read should block when initialized buffer is empty") + case <-time.After(20 * time.Millisecond): + } + + testImg := image.NewRGBA(image.Rect(0, 0, 640, 480)) + require.NoError(t, fb.SendFrame(testImg)) + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("Read did not unblock after frame arrival") + } } func TestFrameBuffer_MultipleClose(t *testing.T) { @@ -165,7 +181,7 @@ func TestFrameBuffer_ReadInitializedAfterClose(t *testing.T) { fb := NewFrameBuffer(640, 480) fb.SetInitialized() - // Close, then read — fast path should detect closeChan. + // Close, then read — blocking path should detect closeChan. err := fb.Close() require.NoError(t, err) @@ -176,6 +192,30 @@ func TestFrameBuffer_ReadInitializedAfterClose(t *testing.T) { assert.ErrorIs(t, err, ErrBufferClosed) } +func TestFrameBuffer_ReadInitializedCloseWhileBlocked(t *testing.T) { + fb := NewFrameBuffer(640, 480) + fb.SetInitialized() + + done := make(chan struct{}) + go func() { + defer close(done) + img, release, err := fb.Read() + assert.Nil(t, img) + assert.NotNil(t, release) + require.Error(t, err) + assert.ErrorIs(t, err, ErrBufferClosed) + }() + + time.Sleep(20 * time.Millisecond) + require.NoError(t, fb.Close()) + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("blocked Read did not return on Close") + } +} + func TestFrameBuffer_ReadUninitializedWithFrame(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 8906311..f4cf86c 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -71,6 +71,9 @@ type EncodedTrack struct { framesEncoded atomic.Uint64 totalEncodeTimeNs atomic.Uint64 totalSendQueueTimeNs atomic.Uint64 + + encodeCancel context.CancelFunc + encodeDone chan struct{} } // EncodedFrameCallback is called after each VP8 frame is encoded with the @@ -435,6 +438,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { videoSource: frameSource, bitrateTracker: codec.NewBitrateTracker(300 * time.Millisecond), mimeType: mimeType, + encodeDone: make(chan struct{}), } // Mark the frame buffer as initialized after successful track creation @@ -459,6 +463,11 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { }() } + track := s.tracks[info.TrackID] + trackCtx, trackCancel := context.WithCancel(context.Background()) + track.encodeCancel = trackCancel + go s.runEncodeLoop(trackCtx, info.TrackID, track) + return nil } @@ -624,7 +633,7 @@ func (s *RTCSender) AcceptAnswer(answer *webrtc.SessionDescription) error { return s.peerConnection.SetRemoteDescription(*answer) } -// Start begins the encoding and bitrate control loop. +// Start begins the bitrate control loop. func (s *RTCSender) Start(ctx context.Context) error { // Wait for estimator var estimator cc.BandwidthEstimator @@ -634,8 +643,8 @@ func (s *RTCSender) Start(ctx context.Context) error { return nil } - // Combined encoding and bitrate control loop - ticker := time.NewTicker(100 * time.Millisecond) // 10 Hz for encoding (matching 10fps input) + // Keep GCC bitrate updates at 100ms cadence. + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -643,12 +652,8 @@ func (s *RTCSender) Start(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - // Check bitrate every 100ms targetBitrate := estimator.GetTargetBitrate() s.updateBitrate(targetBitrate) - - // Process encoded frames for all tracks - s.processEncodedFrames() } } } @@ -725,12 +730,50 @@ func (s *RTCSender) updateBitrate(targetBitrate int) { } } +func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *EncodedTrack) { + defer close(track.encodeDone) + + for { + select { + case <-ctx.Done(): + return + default: + } + + hasFrame, err := s.encodeAndSendTrack(trackID, track) + if err != nil { + if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { + return + } + if errors.Is(err, ErrNoFrameAvailable) { + time.Sleep(5 * time.Millisecond) + continue + } + s.log.Errorf("Error processing track %s: %v", trackID, err) + continue + } + if hasFrame { + s.tracksMu.Lock() + s.noEncodedFrame = false + s.tracksMu.Unlock() + } + } +} + // encodeAndSendTrack reads, encodes, and sends a single track's frame. // Returns true if a frame was successfully processed, or an error. func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (bool, error) { + s.tracksMu.RLock() + encodedReader := track.encodedReader + videoTrack := track.videoTrack + videoSource := track.videoSource + onEncodedFrame := s.onEncodedFrame + onFrameSent := s.onFrameSent + s.tracksMu.RUnlock() + // Read includes VP8 encode — this is the expensive call. tBeforeRead := time.Now() - encoded, release, err := track.encodedReader.Read() + encoded, release, err := encodedReader.Read() if err != nil { return false, err } @@ -738,14 +781,14 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo // Per-frame stamps; non-FrameBuffer sources report 0. var captureTSUs, dequeuedAtWallUs int64 - if cfs, ok := track.videoSource.(*FrameBuffer); ok { + if cfs, ok := videoSource.(*FrameBuffer); ok { captureTSUs = cfs.LastCaptureTSUs() dequeuedAtWallUs = cfs.LastDequeueWallUs() } track.bitrateTracker.AddFrame(len(encoded.Data), tAfterRead) - writeErr := track.videoTrack.WriteSample(media.Sample{ + writeErr := videoTrack.WriteSample(media.Sample{ Data: encoded.Data, Duration: time.Second / 10, }) @@ -762,13 +805,13 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo track.totalSendQueueTimeNs.Add(uint64(d.Nanoseconds())) //nolint:gosec // G115: bounded > 0 by guard above } - if s.onEncodedFrame != nil { + if onEncodedFrame != nil { isKey := len(encoded.Data) > 0 && (encoded.Data[0]&0x01) == 0 - s.onEncodedFrame(trackID, encoded.Data, isKey) + onEncodedFrame(trackID, encoded.Data, isKey) } - if s.onFrameSent != nil { - s.onFrameSent(trackID, captureTSUs, dequeuedAtWallUs, + if onFrameSent != nil { + onFrameSent(trackID, captureTSUs, dequeuedAtWallUs, tAfterRead.UnixMicro(), tAfterWrite.UnixMicro(), writeErr != nil) } @@ -777,64 +820,9 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo return true, nil } -// processEncodedFrames reads and sends encoded frames for all tracks. -// Encoding happens in parallel (one goroutine per track) because -// encodedReader.Read() includes VP8 encoding via vpx_codec_encode(), -// which takes ~5-10ms per 640x480 frame. Sequential iteration would -// serialize 14 encodes into ~70-140ms, exceeding the 100ms tick interval. -func (s *RTCSender) processEncodedFrames() { - s.tracksMu.RLock() - - totalTracks := len(s.tracks) - if totalTracks == 0 { - s.tracksMu.RUnlock() - - return - } - - type trackResult struct { - hasFrame bool - err error - } - - var wg sync.WaitGroup - results := make([]trackResult, totalTracks) - i := 0 - - for trackID, track := range s.tracks { - idx := i - i++ - wg.Add(1) - - go func(trackID string, track *EncodedTrack) { - defer wg.Done() - - hasFrame, err := s.encodeAndSendTrack(trackID, track) - results[idx] = trackResult{hasFrame: hasFrame, err: err} - }(trackID, track) - } - - wg.Wait() - s.tracksMu.RUnlock() - - allHaveErrors := true - - for _, r := range results { - if r.hasFrame { - allHaveErrors = false - - break - } - - if r.err != nil && !errors.Is(r.err, ErrNoFrameAvailable) { - s.log.Errorf("Error processing track: %v", r.err) - } - } - - s.tracksMu.Lock() - s.noEncodedFrame = allHaveErrors - s.tracksMu.Unlock() -} +// processEncodedFrames used to run tick-driven encoding and is now a no-op. +// Encoding is event-driven via per-track goroutines started in AddVideoTrack. +func (s *RTCSender) processEncodedFrames() {} // SetBitrateAllocation sets custom bitrate allocation for tracks. // allocation is a map of track ID to arbitrary positive numbers representing relative weights @@ -960,17 +948,35 @@ func (s *RTCSender) Close() error { s.rtcpCancel = nil } - s.tracksMu.Lock() - defer s.tracksMu.Unlock() - + s.tracksMu.RLock() + tracks := make([]*EncodedTrack, 0, len(s.tracks)) for _, track := range s.tracks { + tracks = append(tracks, track) + } + s.tracksMu.RUnlock() + + for _, track := range tracks { + if track.encodeCancel != nil { + track.encodeCancel() + } + } + for _, track := range tracks { _ = track.videoSource.Close() _ = track.encodedReader.Close() + } + for _, track := range tracks { + if track.encodeDone != nil { + <-track.encodeDone + } + } + for _, track := range tracks { _ = track.mediaTrack.Close() } + s.tracksMu.Lock() // Clear audio tracks (TrackLocalStaticSample has no Close method) s.audioTracks = nil + s.tracksMu.Unlock() if s.peerConnection != nil { return s.peerConnection.Close() diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index 057bfc7..e5a7325 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -46,7 +46,7 @@ func (m *MockVideoEncoderBuilder) BuildVideoEncoder(r video.Reader, p prop.Media type MockReadCloser struct{} func (m *MockReadCloser) Read() ([]byte, func(), error) { - return []byte{}, func() {}, nil + return nil, func() {}, ErrNoFrameAvailable } func (m *MockReadCloser) Close() error { @@ -639,8 +639,8 @@ func TestRTCSender_GetTrackStats_WithPeerConnection(t *testing.T) { stats := sender.GetTrackStats("cam-0") require.NotNil(t, stats) - // Pipeline counters: still zero, no frames encoded yet. - assert.Equal(t, uint64(0), stats.FramesEncoded) + // Pipeline counters may be non-zero with event-driven encoding. + assert.GreaterOrEqual(t, stats.FramesEncoded, uint64(0)) // Network counters: zero because no traffic has flowed. assert.Equal(t, uint64(0), stats.PacketsSent) assert.Equal(t, uint64(0), stats.RoundTripTimeMeasurements) @@ -755,25 +755,26 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) { }) }) - // libvpx has a one-frame encoder lookahead, so a single SendFrame + - // processEncodedFrames will not produce an encoded output yet. Drive - // a small batch and pump processEncodedFrames between sends so the - // 2-slot FrameBuffer drains and the first frame is not evicted before - // the encoder consumes it. This matches the real pipeline where the - // encoder runs at ~30fps with frames continuously injected by the - // cgo bridge. + // Event-driven encode loop is always running, so feed frames at a small + // interval to avoid overflowing the 2-slot source buffer while still + // exercising capture timestamp echoing through the callback path. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) const wantCaptureTS int64 = 1_700_000_000_000 - for i, ts := range []int64{wantCaptureTS, wantCaptureTS + 33_000, wantCaptureTS + 66_000} { + for i, ts := range []int64{ + wantCaptureTS, + wantCaptureTS + 33_000, + wantCaptureTS + 66_000, + wantCaptureTS + 99_000, + wantCaptureTS + 132_000, + } { require.NoError(t, sender.SendFrameWithCaptureTS("cam-0", testImg, ts), "send %d", i) - sender.processEncodedFrames() + time.Sleep(10 * time.Millisecond) } deadline := time.Now().Add(2 * time.Second) var got *sentEvent for time.Now().Before(deadline) { time.Sleep(50 * time.Millisecond) - sender.processEncodedFrames() mu.Lock() for i := range events { From 77c94d9116478131b82cc3d6a067d35466b5f3ba Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 00:21:55 -0700 Subject: [PATCH 2/6] Harden event-driven encode lifecycle Replace the stale noEncodedFrame latch with per-track encode heartbeat checks, deprecate the old tick-era processEncodedFrames shim, and clarify/add lock boundaries around track loop startup and no-frame handling. Co-authored-by: Cursor --- sender/rtc_sender.go | 48 ++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index f4cf86c..8a3f898 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -28,7 +28,10 @@ import ( "github.com/pion/webrtc/v4/pkg/media" ) -const transportCCRtcpfb = "transport-cc" +const ( + transportCCRtcpfb = "transport-cc" + encodeHealthTimeout = 2 * time.Second +) // Static errors for err113 compliance. var ( @@ -71,6 +74,7 @@ type EncodedTrack struct { framesEncoded atomic.Uint64 totalEncodeTimeNs atomic.Uint64 totalSendQueueTimeNs atomic.Uint64 + lastEncodeAtWallUs atomic.Int64 encodeCancel context.CancelFunc encodeDone chan struct{} @@ -115,9 +119,6 @@ type RTCSender struct { // Bitrate allocation (protected by tracksMu) bitrateAllocation map[string]float64 // Track ID -> percentage (0.0 to 1.0) - // Frame processing status (protected by tracksMu) - noEncodedFrame bool - // Cancel function for RTCP reader goroutines. Replaced on each SetupPeerConnection. rtcpCancel context.CancelFunc @@ -169,7 +170,6 @@ func NewRTCSender(opts ...Option) (*RTCSender, error) { audioTracks: make(map[string]*webrtc.TrackLocalStaticSample), estimatorChan: make(chan cc.BandwidthEstimator, 1), // Buffered to avoid blocking bitrateAllocation: make(map[string]float64), - noEncodedFrame: false, ccLogWriter: io.Discard, log: logging.NewDefaultLoggerFactory().NewLogger("nuro_sender"), } @@ -378,14 +378,15 @@ func getOrCreateEncoderBuilder(info VideoTrackInfo) (codec.VideoEncoderBuilder, // AddVideoTrack adds a new video track. func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { s.tracksMu.Lock() - defer s.tracksMu.Unlock() if _, exists := s.tracks[info.TrackID]; exists { + s.tracksMu.Unlock() return fmt.Errorf("%w: %s", ErrTrackAlreadyExists, info.TrackID) } encoderBuilder, err := getOrCreateEncoderBuilder(info) if err != nil { + s.tracksMu.Unlock() return err } @@ -404,6 +405,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { mimeType := encoderBuilder.RTPCodec().MimeType encodedReader, err := mediaTrack.NewEncodedReader(mimeType) if err != nil { + s.tracksMu.Unlock() return err } @@ -414,6 +416,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { info.TrackID, ) if err != nil { + s.tracksMu.Unlock() _ = encodedReader.Close() _ = mediaTrack.Close() @@ -423,13 +426,14 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { // Store track info videoMediaTrack, ok := mediaTrack.(*mediadevices.VideoTrack) if !ok { + s.tracksMu.Unlock() _ = encodedReader.Close() _ = mediaTrack.Close() return ErrFailedToCastVideoTrack } - s.tracks[info.TrackID] = &EncodedTrack{ + track := &EncodedTrack{ info: info, videoTrack: videoTrack, mediaTrack: videoMediaTrack, @@ -440,6 +444,8 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { mimeType: mimeType, encodeDone: make(chan struct{}), } + track.lastEncodeAtWallUs.Store(time.Now().UnixMicro()) + s.tracks[info.TrackID] = track // Mark the frame buffer as initialized after successful track creation frameSource.SetInitialized() @@ -448,9 +454,10 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { if s.peerConnection != nil { rtpSender, err := s.peerConnection.AddTrack(videoTrack) if err != nil { + s.tracksMu.Unlock() return err } - s.tracks[info.TrackID].rtpSender = rtpSender + track.rtpSender = rtpSender // Handle incoming RTCP go func() { @@ -463,9 +470,9 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { }() } - track := s.tracks[info.TrackID] trackCtx, trackCancel := context.WithCancel(context.Background()) track.encodeCancel = trackCancel + s.tracksMu.Unlock() go s.runEncodeLoop(trackCtx, info.TrackID, track) return nil @@ -745,6 +752,8 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { return } + // FrameBuffer-backed readers block until frame-or-close. Keep this + // branch for tests/mocks and custom non-blocking sources. if errors.Is(err, ErrNoFrameAvailable) { time.Sleep(5 * time.Millisecond) continue @@ -753,9 +762,7 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En continue } if hasFrame { - s.tracksMu.Lock() - s.noEncodedFrame = false - s.tracksMu.Unlock() + track.lastEncodeAtWallUs.Store(time.Now().UnixMicro()) } } } @@ -820,8 +827,8 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo return true, nil } -// processEncodedFrames used to run tick-driven encoding and is now a no-op. -// Encoding is event-driven via per-track goroutines started in AddVideoTrack. +// Deprecated: processEncodedFrames used to run tick-driven encoding. +// Encoding is now event-driven via per-track goroutines started in AddVideoTrack. func (s *RTCSender) processEncodedFrames() {} // SetBitrateAllocation sets custom bitrate allocation for tracks. @@ -1042,5 +1049,16 @@ func (s *RTCSender) GetEncodeFrameOk() bool { s.tracksMu.RLock() defer s.tracksMu.RUnlock() - return !s.noEncodedFrame + if len(s.tracks) == 0 { + return true + } + + cutoffUs := time.Now().Add(-encodeHealthTimeout).UnixMicro() + for _, track := range s.tracks { + if track.lastEncodeAtWallUs.Load() >= cutoffUs { + return true + } + } + + return false } From e80e8691e22a4d35a5dc04e37e20f643952f30e7 Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 14:35:01 -0700 Subject: [PATCH 3/6] Fix bitrate race and lint in event-driven encode Add per-track bitrateMu around BitrateTracker.AddFrame and GetBitrate; the previous tick-driven design serialized both calls on processEncodedFrames, but the per-track encode goroutine now runs concurrently with updateBitrate. Also satisfy golangci-lint nlreturn (blank lines before error returns in AddVideoTrack and continues in runEncodeLoop) and forbidigo (t.Fatal -> require.Fail in frame_buffer_test.go). --- sender/frame_buffer_test.go | 6 +++--- sender/rtc_sender.go | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index 86bae9b..1af7c66 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -127,7 +127,7 @@ func TestFrameBuffer_ReadWithoutFramesBlocksUntilFrame(t *testing.T) { select { case <-done: - t.Fatal("Read should block when initialized buffer is empty") + require.Fail(t, "Read should block when initialized buffer is empty") case <-time.After(20 * time.Millisecond): } @@ -137,7 +137,7 @@ func TestFrameBuffer_ReadWithoutFramesBlocksUntilFrame(t *testing.T) { select { case <-done: case <-time.After(200 * time.Millisecond): - t.Fatal("Read did not unblock after frame arrival") + require.Fail(t, "Read did not unblock after frame arrival") } } @@ -212,7 +212,7 @@ func TestFrameBuffer_ReadInitializedCloseWhileBlocked(t *testing.T) { select { case <-done: case <-time.After(200 * time.Millisecond): - t.Fatal("blocked Read did not return on Close") + require.Fail(t, "blocked Read did not return on Close") } } diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 8a3f898..793a3fc 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -63,7 +63,10 @@ type EncodedTrack struct { encoderBuilder codec.VideoEncoderBuilder videoSource VideoSource bitrateTracker *codec.BitrateTracker - mimeType string + // bitrateMu serializes access to bitrateTracker; AddFrame runs on the + // encode goroutine and GetBitrate runs on the BWE update path. + bitrateMu sync.Mutex + mimeType string // rtpSender is set after AddTrack and used to look up the SSRC when // resolving network stats from the Pion stats interceptor. @@ -381,12 +384,14 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { if _, exists := s.tracks[info.TrackID]; exists { s.tracksMu.Unlock() + return fmt.Errorf("%w: %s", ErrTrackAlreadyExists, info.TrackID) } encoderBuilder, err := getOrCreateEncoderBuilder(info) if err != nil { s.tracksMu.Unlock() + return err } @@ -406,6 +411,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { encodedReader, err := mediaTrack.NewEncodedReader(mimeType) if err != nil { s.tracksMu.Unlock() + return err } @@ -455,6 +461,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { rtpSender, err := s.peerConnection.AddTrack(videoTrack) if err != nil { s.tracksMu.Unlock() + return err } track.rtpSender = rtpSender @@ -722,7 +729,9 @@ func (s *RTCSender) updateBitrate(targetBitrate int) { // Only update encoder for tracks with positive bitrate (like NuroSender) if trackBitrate > 0 { + track.bitrateMu.Lock() currentBitrate := int(track.bitrateTracker.GetBitrate()) + track.bitrateMu.Unlock() if !updateEncoderBitrate(track, currentBitrate, trackBitrate) { s.log.Warnf("No compatible encoder controller for track %s", track.info.TrackID) } @@ -756,9 +765,11 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En // branch for tests/mocks and custom non-blocking sources. if errors.Is(err, ErrNoFrameAvailable) { time.Sleep(5 * time.Millisecond) + continue } s.log.Errorf("Error processing track %s: %v", trackID, err) + continue } if hasFrame { @@ -793,7 +804,9 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo dequeuedAtWallUs = cfs.LastDequeueWallUs() } + track.bitrateMu.Lock() track.bitrateTracker.AddFrame(len(encoded.Data), tAfterRead) + track.bitrateMu.Unlock() writeErr := videoTrack.WriteSample(media.Sample{ Data: encoded.Data, From 95390269ec999def4b7d56839a5b40a78e638c48 Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 14:48:42 -0700 Subject: [PATCH 4/6] Fix AddTrack rollback and Close map reset Ensure AddVideoTrack rolls back partially-created track resources when PeerConnection.AddTrack fails, and clear track maps in Close so post-close lookups cannot return stale closed handles. Co-authored-by: Cursor --- sender/rtc_sender.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 793a3fc..1dd9913 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -460,6 +460,10 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { if s.peerConnection != nil { rtpSender, err := s.peerConnection.AddTrack(videoTrack) if err != nil { + delete(s.tracks, info.TrackID) + _ = track.videoSource.Close() + _ = track.encodedReader.Close() + _ = track.mediaTrack.Close() s.tracksMu.Unlock() return err @@ -994,8 +998,9 @@ func (s *RTCSender) Close() error { } s.tracksMu.Lock() - // Clear audio tracks (TrackLocalStaticSample has no Close method) - s.audioTracks = nil + s.tracks = make(map[string]*EncodedTrack) + // TrackLocalStaticSample has no Close method; reset map to avoid stale handles. + s.audioTracks = make(map[string]*webrtc.TrackLocalStaticSample) s.tracksMu.Unlock() if s.peerConnection != nil { From 1be9a8415d9967d3a3fa2d496f7a96dac2ff47d6 Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 15:03:02 -0700 Subject: [PATCH 5/6] Address review: lock callbacks, drop dead code Lock the s.onFrameSent read in SendFrameWithCaptureTS' eviction path to match the locking convention now used in encodeAndSendTrack. Reorder Close so encodedReader.Close and mediaTrack.Close happen after the encode goroutine has exited, avoiding a Read/Close race on those handles. Delete the processEncodedFrames no-op and the three tests plus two benchmarks that targeted it; the event-driven encode goroutine makes them dead weight. Drop the tautological FramesEncoded >= 0 assertion from TestRTCSender_GetTrackStats_WithPeerConnection. --- sender/rtc_sender.go | 19 ++++++--- sender/rtc_sender_bench_test.go | 51 ---------------------- sender/rtc_sender_test.go | 75 ++------------------------------- 3 files changed, 15 insertions(+), 130 deletions(-) diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 1dd9913..77a8dfc 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -557,8 +557,13 @@ func (s *RTCSender) SendFrameWithCaptureTS(trackID string, frame image.Image, ca } evicted, err := frameBuffer.SendFrameWithCaptureTS(frame, captureTSUs) - if evicted && s.onFrameSent != nil { - s.onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true) + if evicted { + s.tracksMu.RLock() + onFrameSent := s.onFrameSent + s.tracksMu.RUnlock() + if onFrameSent != nil { + onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true) + } } return err @@ -844,10 +849,6 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo return true, nil } -// Deprecated: processEncodedFrames used to run tick-driven encoding. -// Encoding is now event-driven via per-track goroutines started in AddVideoTrack. -func (s *RTCSender) processEncodedFrames() {} - // SetBitrateAllocation sets custom bitrate allocation for tracks. // allocation is a map of track ID to arbitrary positive numbers representing relative weights // The values will be normalized internally (each value divided by sum of all values) @@ -984,16 +985,20 @@ func (s *RTCSender) Close() error { track.encodeCancel() } } + // Close the source first; this makes the encode goroutine's pending + // Read return ErrBufferClosed. for _, track := range tracks { _ = track.videoSource.Close() - _ = track.encodedReader.Close() } + // Wait for the encode goroutine to exit before closing encodedReader / + // mediaTrack so Close cannot race a concurrent Read on those. for _, track := range tracks { if track.encodeDone != nil { <-track.encodeDone } } for _, track := range tracks { + _ = track.encodedReader.Close() _ = track.mediaTrack.Close() } diff --git a/sender/rtc_sender_bench_test.go b/sender/rtc_sender_bench_test.go index 7fb076e..5e22b46 100644 --- a/sender/rtc_sender_bench_test.go +++ b/sender/rtc_sender_bench_test.go @@ -36,57 +36,6 @@ func newBenchSender(b *testing.B, numTracks int) *RTCSender { return sender } -// BenchmarkProcessEncodedFrames_NoFrames measures the hot-path cost of -// processEncodedFrames when no frames are available (the common case between -// frame arrivals). Before: parallel goroutines + WaitGroup per tick. -// After: sequential non-blocking Read loop. -func BenchmarkProcessEncodedFrames_NoFrames(b *testing.B) { - for _, numTracks := range []int{1, 4, 14} { - b.Run(trackCountName(numTracks), func(b *testing.B) { - sender := newBenchSender(b, numTracks) - defer func() { _ = sender.Close() }() - - b.ReportAllocs() - b.ResetTimer() - for range b.N { - sender.processEncodedFrames() - } - }) - } -} - -// BenchmarkProcessEncodedFrames_WithFrames measures processEncodedFrames when -// every track has a frame ready. This exercises the encode+write path. -func BenchmarkProcessEncodedFrames_WithFrames(b *testing.B) { - for _, numTracks := range []int{1, 4, 14} { - b.Run(trackCountName(numTracks), func(b *testing.B) { - sender := newBenchSender(b, numTracks) - defer func() { _ = sender.Close() }() - - // Pre-create test frame - testImg := image.NewYCbCr( - image.Rect(0, 0, 1280, 720), - image.YCbCrSubsampleRatio420, - ) - - b.ReportAllocs() - b.ResetTimer() - for range b.N { - // Feed a frame to every track - sender.tracksMu.RLock() - for _, track := range sender.tracks { - if fb, ok := track.videoSource.(*FrameBuffer); ok { - _ = fb.SendFrame(testImg) - } - } - sender.tracksMu.RUnlock() - - sender.processEncodedFrames() - } - }) - } -} - // BenchmarkUpdateBitrate measures the bitrate update path. // Before: Infof logging per tick. After: Debugf (no-op at default level). func BenchmarkUpdateBitrate(b *testing.B) { diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index e5a7325..069e01a 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -336,79 +336,12 @@ func TestRTCSender_SetupPeerConnection_VideoOnlyInPC(t *testing.T) { assert.True(t, foundVideo, "video track should be in PeerConnection senders") } -func TestRTCSender_ProcessEncodedFrames_NoTracks(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - // Should return immediately with no tracks. - sender.processEncodedFrames() -} - -func TestRTCSender_ProcessEncodedFrames_WithTracks(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - err = sender.AddVideoTrack(VideoTrackInfo{ - TrackID: "cam-0", - Width: 640, - Height: 480, - InitialBitrate: 500_000, - }) - require.NoError(t, err) - - // Call processEncodedFrames multiple times — exercises the sequential loop - // with a real encoder. The encoder may have data from init (black frame). - sender.processEncodedFrames() - - // Send a frame so the encoder can produce output. - testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) - err = sender.SendFrame("cam-0", testImg) - require.NoError(t, err) - - // Give encoder time to consume the frame. - time.Sleep(150 * time.Millisecond) - - sender.processEncodedFrames() - // We mainly verify no panics or deadlocks here. -} - -func TestRTCSender_ProcessEncodedFrames_AllErrors(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - err = sender.AddVideoTrack(VideoTrackInfo{ - TrackID: "cam-0", - Width: 640, - Height: 480, - EncoderBuilder: &MockVideoEncoderBuilder{}, - }) - require.NoError(t, err) - - // Drain any buffered frames — the mock encoder always returns empty data, - // but the FrameBuffer Read() will return ErrNoFrameAvailable in the - // non-blocking path (initialized), which propagates through the encoder. - // Call several times to ensure we hit the no-frame path. - for range 5 { - sender.processEncodedFrames() - } - - // Verify noEncodedFrame reflects the state. - _ = sender.GetEncodeFrameOk() -} - func TestRTCSender_GetEncodeFrameOk(t *testing.T) { sender, err := NewRTCSender() require.NoError(t, err) defer func() { _ = sender.Close() }() - // Default should be true (noEncodedFrame starts false). - assert.True(t, sender.GetEncodeFrameOk()) - - // After processEncodedFrames with no tracks, should still be true (early return). - sender.processEncodedFrames() + // With no tracks, the sender is trivially healthy. assert.True(t, sender.GetEncodeFrameOk()) } @@ -601,11 +534,11 @@ func TestRTCSender_GetTrackStats_PipelineCountersIncrement(t *testing.T) { require.NoError(t, err) // Drive a frame through the encoder so encodeAndSendTrack updates the - // per-track atomic counters that GetTrackStats reports. + // per-track atomic counters that GetTrackStats reports. The encode + // goroutine started by AddVideoTrack picks up the frame on its own. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) require.NoError(t, sender.SendFrame("cam-0", testImg)) time.Sleep(150 * time.Millisecond) - sender.processEncodedFrames() stats := sender.GetTrackStats("cam-0") require.NotNil(t, stats) @@ -639,8 +572,6 @@ func TestRTCSender_GetTrackStats_WithPeerConnection(t *testing.T) { stats := sender.GetTrackStats("cam-0") require.NotNil(t, stats) - // Pipeline counters may be non-zero with event-driven encoding. - assert.GreaterOrEqual(t, stats.FramesEncoded, uint64(0)) // Network counters: zero because no traffic has flowed. assert.Equal(t, uint64(0), stats.PacketsSent) assert.Equal(t, uint64(0), stats.RoundTripTimeMeasurements) From 1b85b1c26c5f40ff552645838cf1fd471abe7d73 Mon Sep 17 00:00:00 2001 From: jayli Date: Tue, 2 Jun 2026 15:32:09 -0700 Subject: [PATCH 6/6] Revert FrameBuffer.Read to non-blocking BenchmarkUpdateBitrate timed out under -race because the encode goroutine sat inside vpx.encoder.Read() holding the vpx encoder's internal mutex while waiting for a frame, blocking the concurrent DynamicQPControl bitrate update that needs the same mutex. Restore the pre-PR non-blocking semantics: when initialized and no frame is queued, Read returns ErrNoFrameAvailable immediately so the vpx mutex is released between attempts. runEncodeLoop already sleeps 5ms on ErrNoFrameAvailable, so this just makes that branch the steady-state idle path again. Add a closeChan precedence check before the non-blocking select so a closed buffer always reports ErrBufferClosed instead of racing with the default branch (Go's select picks ready cases randomly). --- sender/frame_buffer.go | 24 +++++++++++---- sender/frame_buffer_test.go | 59 ++++++------------------------------- sender/rtc_sender.go | 6 ++-- 3 files changed, 31 insertions(+), 58 deletions(-) diff --git a/sender/frame_buffer.go b/sender/frame_buffer.go index c215eaa..4899676 100644 --- a/sender/frame_buffer.go +++ b/sender/frame_buffer.go @@ -128,24 +128,36 @@ func (f *FrameBuffer) ResetInitialized() { } // Read returns the next available frame from the buffer. -// When initialized (normal operation), it blocks until a frame arrives or the -// buffer is closed. When not initialized (encoder init), it blocks up to 100ms -// and returns a black frame for codec property detection. +// When initialized (normal operation), it returns immediately with +// ErrNoFrameAvailable if no frame is ready. The vpx encoder's Read holds +// its internal mutex across the inner Read call, so blocking here would +// also block concurrent DynamicQPControl bitrate updates that try to take +// the same mutex. runEncodeLoop handles ErrNoFrameAvailable by sleeping +// briefly before retrying. When not initialized (encoder init), it blocks +// up to 100ms and returns a black frame for codec property detection. // // Side effect: stores the popped frame's captureTSUs for LastCaptureTSUs. // Black-frame timeouts preserve the previous value so encoders with // lookahead can still correlate the previously-read real frame. func (f *FrameBuffer) Read() (image.Image, func(), error) { if f.initialized { - // Blocking path for steady-state encode loops. + // Check closeChan first so a closed buffer always reports + // ErrBufferClosed instead of racing with the default branch below + // (Go's select picks ready cases pseudo-randomly). + select { + case <-f.closeChan: + return nil, func() {}, ErrBufferClosed + default: + } + // Non-blocking fast path for normal operation. select { case fm := <-f.frameChan: f.lastCaptureTSUs.Store(fm.captureTSUs) f.lastDequeueWallUs.Store(time.Now().UnixMicro()) return fm.img, func() {}, nil - case <-f.closeChan: - return nil, func() {}, ErrBufferClosed + default: + return nil, func() {}, ErrNoFrameAvailable } } diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index 1af7c66..2e50aa7 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -109,36 +109,18 @@ func TestFrameBuffer_BufferFull(t *testing.T) { } } -func TestFrameBuffer_ReadWithoutFramesBlocksUntilFrame(t *testing.T) { +func TestFrameBuffer_ReadWithoutFrames(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() - // Mark as initialized so Read blocks for real frames. + // Mark as initialized so we take the non-blocking fast path. fb.SetInitialized() - done := make(chan struct{}) - go func() { - defer close(done) - img, release, err := fb.Read() - require.NoError(t, err) - assert.NotNil(t, img) - release() - }() - - select { - case <-done: - require.Fail(t, "Read should block when initialized buffer is empty") - case <-time.After(20 * time.Millisecond): - } - - testImg := image.NewRGBA(image.Rect(0, 0, 640, 480)) - require.NoError(t, fb.SendFrame(testImg)) - - select { - case <-done: - case <-time.After(200 * time.Millisecond): - require.Fail(t, "Read did not unblock after frame arrival") - } + img, release, err := fb.Read() + assert.Nil(t, img) + assert.NotNil(t, release) + require.Error(t, err) + assert.ErrorIs(t, err, ErrNoFrameAvailable) } func TestFrameBuffer_MultipleClose(t *testing.T) { @@ -181,7 +163,8 @@ func TestFrameBuffer_ReadInitializedAfterClose(t *testing.T) { fb := NewFrameBuffer(640, 480) fb.SetInitialized() - // Close, then read — blocking path should detect closeChan. + // Close, then read — the closeChan precedence check should win over + // the default branch in the non-blocking select. err := fb.Close() require.NoError(t, err) @@ -192,30 +175,6 @@ func TestFrameBuffer_ReadInitializedAfterClose(t *testing.T) { assert.ErrorIs(t, err, ErrBufferClosed) } -func TestFrameBuffer_ReadInitializedCloseWhileBlocked(t *testing.T) { - fb := NewFrameBuffer(640, 480) - fb.SetInitialized() - - done := make(chan struct{}) - go func() { - defer close(done) - img, release, err := fb.Read() - assert.Nil(t, img) - assert.NotNil(t, release) - require.Error(t, err) - assert.ErrorIs(t, err, ErrBufferClosed) - }() - - time.Sleep(20 * time.Millisecond) - require.NoError(t, fb.Close()) - - select { - case <-done: - case <-time.After(200 * time.Millisecond): - require.Fail(t, "blocked Read did not return on Close") - } -} - func TestFrameBuffer_ReadUninitializedWithFrame(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 77a8dfc..5046764 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -770,8 +770,10 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { return } - // FrameBuffer-backed readers block until frame-or-close. Keep this - // branch for tests/mocks and custom non-blocking sources. + // FrameBuffer.Read is non-blocking; sleep briefly so we don't + // busy-spin between frame arrivals while still leaving the vpx + // encoder's internal mutex free for concurrent DynamicQPControl + // bitrate updates. if errors.Is(err, ErrNoFrameAvailable) { time.Sleep(5 * time.Millisecond)