From 5d33db7ef824324fc3b034165c75b9b152a2008f Mon Sep 17 00:00:00 2001 From: jayli Date: Wed, 3 Jun 2026 13:46:28 -0700 Subject: [PATCH] Serialize recreateEncoder with encode goroutine Per-track encoderMu (sync.RWMutex). The encode goroutine holds RLock from encodedReader.Read through release(); recreateEncoder takes Lock to swap the reader without racing the goroutine. Before this, SetBitrateAllocation calling recreateEncoder on a 0->positive transition could close the old encoder, reset the shared FrameBuffer, or create a second NewEncodedReader while the per-track encode goroutine was inside Read() or holding a release() callback on the old encoder. Lock order: tracksMu (write) then encoderMu (write); readers take them sequentially and never nest. --- sender/rtc_sender.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 5046764..203ec96 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -66,6 +66,11 @@ type EncodedTrack struct { // bitrateMu serializes access to bitrateTracker; AddFrame runs on the // encode goroutine and GetBitrate runs on the BWE update path. bitrateMu sync.Mutex + // encoderMu protects the encoder lifecycle. The per-track encode + // goroutine holds RLock from encodedReader.Read through release(); + // recreateEncoder takes Lock to swap the reader without racing the + // goroutine's in-flight release callback. + encoderMu sync.RWMutex mimeType string // rtpSender is set after AddTrack and used to look up the SSRC when @@ -770,10 +775,10 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { return } - // FrameBuffer.Read is non-blocking; sleep briefly so we don't - // busy-spin between frame arrivals while still leaving the vpx - // encoder's internal mutex free for concurrent DynamicQPControl - // bitrate updates. + // Source had no frame this iteration. Sleep briefly to avoid + // busy-spinning and to release the vpx encoder mutex (held + // during Read) between attempts so concurrent DynamicQPControl + // bitrate updates can acquire it. if errors.Is(err, ErrNoFrameAvailable) { time.Sleep(5 * time.Millisecond) @@ -793,16 +798,20 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En // Returns true if a frame was successfully processed, or an error. func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (bool, error) { s.tracksMu.RLock() - encodedReader := track.encodedReader videoTrack := track.videoTrack videoSource := track.videoSource onEncodedFrame := s.onEncodedFrame onFrameSent := s.onFrameSent s.tracksMu.RUnlock() + // encoderMu.RLock spans Read..release so recreateEncoder cannot + // swap or close the encoder underneath an in-flight read. + track.encoderMu.RLock() + defer track.encoderMu.RUnlock() + // Read includes VP8 encode — this is the expensive call. tBeforeRead := time.Now() - encoded, release, err := encodedReader.Read() + encoded, release, err := track.encodedReader.Read() if err != nil { return false, err } @@ -928,6 +937,13 @@ func (s *RTCSender) recreateEncodersForActivatedTracks(newAllocation map[string] // after a track has been idle for a long time. // Must be called while holding tracksMu.Lock. func (s *RTCSender) recreateEncoder(track *EncodedTrack) error { + // encoderMu.Lock excludes the per-track encode goroutine for the + // duration of the swap. Without this, the goroutine can be inside + // encodedReader.Read or holding a release() callback while we close + // the old reader, racing on encoder state and FrameBuffer access. + track.encoderMu.Lock() + defer track.encoderMu.Unlock() + // Temporarily reset FrameBuffer to uninitialized so NewEncodedReader // can read a black frame for codec property detection during init. // Always restore to initialized afterward, regardless of success or failure.