Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions sender/rtc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type EncodedTrack struct {
// bitrateMu serializes access to bitrateTracker; AddFrame runs on the
// encode goroutine and GetBitrate runs on the BWE update path.
bitrateMu sync.Mutex
// encoderMu protects the encoder lifecycle. The per-track encode
// goroutine holds RLock from encodedReader.Read through release();
// recreateEncoder takes Lock to swap the reader without racing the
// goroutine's in-flight release callback.
encoderMu sync.RWMutex
mimeType string

// rtpSender is set after AddTrack and used to look up the SSRC when
Expand Down Expand Up @@ -770,10 +775,10 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En
if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) {
return
}
// FrameBuffer.Read is non-blocking; sleep briefly so we don't
// busy-spin between frame arrivals while still leaving the vpx
// encoder's internal mutex free for concurrent DynamicQPControl
// bitrate updates.
// Source had no frame this iteration. Sleep briefly to avoid
// busy-spinning and to release the vpx encoder mutex (held
// during Read) between attempts so concurrent DynamicQPControl
// bitrate updates can acquire it.
if errors.Is(err, ErrNoFrameAvailable) {
time.Sleep(5 * time.Millisecond)

Expand All @@ -793,16 +798,20 @@ func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *En
// Returns true if a frame was successfully processed, or an error.
func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (bool, error) {
s.tracksMu.RLock()
encodedReader := track.encodedReader
videoTrack := track.videoTrack
videoSource := track.videoSource
onEncodedFrame := s.onEncodedFrame
onFrameSent := s.onFrameSent
s.tracksMu.RUnlock()

// encoderMu.RLock spans Read..release so recreateEncoder cannot
// swap or close the encoder underneath an in-flight read.
track.encoderMu.RLock()
defer track.encoderMu.RUnlock()

// Read includes VP8 encode — this is the expensive call.
tBeforeRead := time.Now()
encoded, release, err := encodedReader.Read()
encoded, release, err := track.encodedReader.Read()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -928,6 +937,13 @@ func (s *RTCSender) recreateEncodersForActivatedTracks(newAllocation map[string]
// after a track has been idle for a long time.
// Must be called while holding tracksMu.Lock.
func (s *RTCSender) recreateEncoder(track *EncodedTrack) error {
// encoderMu.Lock excludes the per-track encode goroutine for the
// duration of the swap. Without this, the goroutine can be inside
// encodedReader.Read or holding a release() callback while we close
// the old reader, racing on encoder state and FrameBuffer access.
track.encoderMu.Lock()
defer track.encoderMu.Unlock()

// Temporarily reset FrameBuffer to uninitialized so NewEncodedReader
// can read a black frame for codec property detection during init.
// Always restore to initialized afterward, regardless of success or failure.
Expand Down
Loading