diff --git a/sender/frame_buffer.go b/sender/frame_buffer.go index 3f894ca..4899676 100644 --- a/sender/frame_buffer.go +++ b/sender/frame_buffer.go @@ -128,15 +128,27 @@ func (f *FrameBuffer) ResetInitialized() { } // Read returns the next available frame from the buffer. -// When initialized (normal operation), returns immediately with ErrNoFrameAvailable -// if no frame is ready. When not initialized (encoder init), blocks up to 100ms -// and returns a black frame for codec property detection. +// When initialized (normal operation), it returns immediately with +// ErrNoFrameAvailable if no frame is ready. The vpx encoder's Read holds +// its internal mutex across the inner Read call, so blocking here would +// also block concurrent DynamicQPControl bitrate updates that try to take +// the same mutex. runEncodeLoop handles ErrNoFrameAvailable by sleeping +// briefly before retrying. When not initialized (encoder init), it blocks +// up to 100ms and returns a black frame for codec property detection. // // Side effect: stores the popped frame's captureTSUs for LastCaptureTSUs. // Black-frame timeouts preserve the previous value so encoders with // lookahead can still correlate the previously-read real frame. func (f *FrameBuffer) Read() (image.Image, func(), error) { if f.initialized { + // Check closeChan first so a closed buffer always reports + // ErrBufferClosed instead of racing with the default branch below + // (Go's select picks ready cases pseudo-randomly). + select { + case <-f.closeChan: + return nil, func() {}, ErrBufferClosed + default: + } // Non-blocking fast path for normal operation. select { case fm := <-f.frameChan: @@ -144,8 +156,6 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) { f.lastDequeueWallUs.Store(time.Now().UnixMicro()) return fm.img, func() {}, nil - case <-f.closeChan: - return nil, func() {}, ErrBufferClosed default: return nil, func() {}, ErrNoFrameAvailable } diff --git a/sender/frame_buffer_test.go b/sender/frame_buffer_test.go index c12176c..2e50aa7 100644 --- a/sender/frame_buffer_test.go +++ b/sender/frame_buffer_test.go @@ -113,11 +113,9 @@ func TestFrameBuffer_ReadWithoutFrames(t *testing.T) { fb := NewFrameBuffer(640, 480) defer func() { _ = fb.Close() }() - // Mark as initialized so we don't get black frames + // Mark as initialized so we take the non-blocking fast path. fb.SetInitialized() - // Try to read without sending any frames - // This should return ErrNoFrameAvailable immediately (non-blocking) img, release, err := fb.Read() assert.Nil(t, img) assert.NotNil(t, release) @@ -165,7 +163,8 @@ func TestFrameBuffer_ReadInitializedAfterClose(t *testing.T) { fb := NewFrameBuffer(640, 480) fb.SetInitialized() - // Close, then read — fast path should detect closeChan. + // Close, then read — the closeChan precedence check should win over + // the default branch in the non-blocking select. err := fb.Close() require.NoError(t, err) diff --git a/sender/rtc_sender.go b/sender/rtc_sender.go index 8906311..5046764 100644 --- a/sender/rtc_sender.go +++ b/sender/rtc_sender.go @@ -28,7 +28,10 @@ import ( "github.com/pion/webrtc/v4/pkg/media" ) -const transportCCRtcpfb = "transport-cc" +const ( + transportCCRtcpfb = "transport-cc" + encodeHealthTimeout = 2 * time.Second +) // Static errors for err113 compliance. var ( @@ -60,7 +63,10 @@ type EncodedTrack struct { encoderBuilder codec.VideoEncoderBuilder videoSource VideoSource bitrateTracker *codec.BitrateTracker - mimeType string + // bitrateMu serializes access to bitrateTracker; AddFrame runs on the + // encode goroutine and GetBitrate runs on the BWE update path. + bitrateMu sync.Mutex + mimeType string // rtpSender is set after AddTrack and used to look up the SSRC when // resolving network stats from the Pion stats interceptor. @@ -71,6 +77,10 @@ type EncodedTrack struct { framesEncoded atomic.Uint64 totalEncodeTimeNs atomic.Uint64 totalSendQueueTimeNs atomic.Uint64 + lastEncodeAtWallUs atomic.Int64 + + encodeCancel context.CancelFunc + encodeDone chan struct{} } // EncodedFrameCallback is called after each VP8 frame is encoded with the @@ -112,9 +122,6 @@ type RTCSender struct { // Bitrate allocation (protected by tracksMu) bitrateAllocation map[string]float64 // Track ID -> percentage (0.0 to 1.0) - // Frame processing status (protected by tracksMu) - noEncodedFrame bool - // Cancel function for RTCP reader goroutines. Replaced on each SetupPeerConnection. rtcpCancel context.CancelFunc @@ -166,7 +173,6 @@ func NewRTCSender(opts ...Option) (*RTCSender, error) { audioTracks: make(map[string]*webrtc.TrackLocalStaticSample), estimatorChan: make(chan cc.BandwidthEstimator, 1), // Buffered to avoid blocking bitrateAllocation: make(map[string]float64), - noEncodedFrame: false, ccLogWriter: io.Discard, log: logging.NewDefaultLoggerFactory().NewLogger("nuro_sender"), } @@ -375,14 +381,17 @@ func getOrCreateEncoderBuilder(info VideoTrackInfo) (codec.VideoEncoderBuilder, // AddVideoTrack adds a new video track. func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { s.tracksMu.Lock() - defer s.tracksMu.Unlock() if _, exists := s.tracks[info.TrackID]; exists { + s.tracksMu.Unlock() + return fmt.Errorf("%w: %s", ErrTrackAlreadyExists, info.TrackID) } encoderBuilder, err := getOrCreateEncoderBuilder(info) if err != nil { + s.tracksMu.Unlock() + return err } @@ -401,6 +410,8 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { mimeType := encoderBuilder.RTPCodec().MimeType encodedReader, err := mediaTrack.NewEncodedReader(mimeType) if err != nil { + s.tracksMu.Unlock() + return err } @@ -411,6 +422,7 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { info.TrackID, ) if err != nil { + s.tracksMu.Unlock() _ = encodedReader.Close() _ = mediaTrack.Close() @@ -420,13 +432,14 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { // Store track info videoMediaTrack, ok := mediaTrack.(*mediadevices.VideoTrack) if !ok { + s.tracksMu.Unlock() _ = encodedReader.Close() _ = mediaTrack.Close() return ErrFailedToCastVideoTrack } - s.tracks[info.TrackID] = &EncodedTrack{ + track := &EncodedTrack{ info: info, videoTrack: videoTrack, mediaTrack: videoMediaTrack, @@ -435,7 +448,10 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { videoSource: frameSource, bitrateTracker: codec.NewBitrateTracker(300 * time.Millisecond), mimeType: mimeType, + encodeDone: make(chan struct{}), } + track.lastEncodeAtWallUs.Store(time.Now().UnixMicro()) + s.tracks[info.TrackID] = track // Mark the frame buffer as initialized after successful track creation frameSource.SetInitialized() @@ -444,9 +460,15 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { if s.peerConnection != nil { rtpSender, err := s.peerConnection.AddTrack(videoTrack) if err != nil { + delete(s.tracks, info.TrackID) + _ = track.videoSource.Close() + _ = track.encodedReader.Close() + _ = track.mediaTrack.Close() + s.tracksMu.Unlock() + return err } - s.tracks[info.TrackID].rtpSender = rtpSender + track.rtpSender = rtpSender // Handle incoming RTCP go func() { @@ -459,6 +481,11 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error { }() } + trackCtx, trackCancel := context.WithCancel(context.Background()) + track.encodeCancel = trackCancel + s.tracksMu.Unlock() + go s.runEncodeLoop(trackCtx, info.TrackID, track) + return nil } @@ -530,8 +557,13 @@ func (s *RTCSender) SendFrameWithCaptureTS(trackID string, frame image.Image, ca } evicted, err := frameBuffer.SendFrameWithCaptureTS(frame, captureTSUs) - if evicted && s.onFrameSent != nil { - s.onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true) + if evicted { + s.tracksMu.RLock() + onFrameSent := s.onFrameSent + s.tracksMu.RUnlock() + if onFrameSent != nil { + onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true) + } } return err @@ -624,7 +656,7 @@ func (s *RTCSender) AcceptAnswer(answer *webrtc.SessionDescription) error { return s.peerConnection.SetRemoteDescription(*answer) } -// Start begins the encoding and bitrate control loop. +// Start begins the bitrate control loop. func (s *RTCSender) Start(ctx context.Context) error { // Wait for estimator var estimator cc.BandwidthEstimator @@ -634,8 +666,8 @@ func (s *RTCSender) Start(ctx context.Context) error { return nil } - // Combined encoding and bitrate control loop - ticker := time.NewTicker(100 * time.Millisecond) // 10 Hz for encoding (matching 10fps input) + // Keep GCC bitrate updates at 100ms cadence. + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -643,12 +675,8 @@ func (s *RTCSender) Start(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - // Check bitrate every 100ms targetBitrate := estimator.GetTargetBitrate() s.updateBitrate(targetBitrate) - - // Process encoded frames for all tracks - s.processEncodedFrames() } } } @@ -710,7 +738,9 @@ func (s *RTCSender) updateBitrate(targetBitrate int) { // Only update encoder for tracks with positive bitrate (like NuroSender) if trackBitrate > 0 { + track.bitrateMu.Lock() currentBitrate := int(track.bitrateTracker.GetBitrate()) + track.bitrateMu.Unlock() if !updateEncoderBitrate(track, currentBitrate, trackBitrate) { s.log.Warnf("No compatible encoder controller for track %s", track.info.TrackID) } @@ -725,12 +755,54 @@ func (s *RTCSender) updateBitrate(targetBitrate int) { } } +func (s *RTCSender) runEncodeLoop(ctx context.Context, trackID string, track *EncodedTrack) { + defer close(track.encodeDone) + + for { + select { + case <-ctx.Done(): + return + default: + } + + hasFrame, err := s.encodeAndSendTrack(trackID, track) + if err != nil { + if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { + return + } + // FrameBuffer.Read is non-blocking; sleep briefly so we don't + // busy-spin between frame arrivals while still leaving the vpx + // encoder's internal mutex free for concurrent DynamicQPControl + // bitrate updates. + if errors.Is(err, ErrNoFrameAvailable) { + time.Sleep(5 * time.Millisecond) + + continue + } + s.log.Errorf("Error processing track %s: %v", trackID, err) + + continue + } + if hasFrame { + track.lastEncodeAtWallUs.Store(time.Now().UnixMicro()) + } + } +} + // encodeAndSendTrack reads, encodes, and sends a single track's frame. // Returns true if a frame was successfully processed, or an error. func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (bool, error) { + s.tracksMu.RLock() + encodedReader := track.encodedReader + videoTrack := track.videoTrack + videoSource := track.videoSource + onEncodedFrame := s.onEncodedFrame + onFrameSent := s.onFrameSent + s.tracksMu.RUnlock() + // Read includes VP8 encode — this is the expensive call. tBeforeRead := time.Now() - encoded, release, err := track.encodedReader.Read() + encoded, release, err := encodedReader.Read() if err != nil { return false, err } @@ -738,14 +810,16 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo // Per-frame stamps; non-FrameBuffer sources report 0. var captureTSUs, dequeuedAtWallUs int64 - if cfs, ok := track.videoSource.(*FrameBuffer); ok { + if cfs, ok := videoSource.(*FrameBuffer); ok { captureTSUs = cfs.LastCaptureTSUs() dequeuedAtWallUs = cfs.LastDequeueWallUs() } + track.bitrateMu.Lock() track.bitrateTracker.AddFrame(len(encoded.Data), tAfterRead) + track.bitrateMu.Unlock() - writeErr := track.videoTrack.WriteSample(media.Sample{ + writeErr := videoTrack.WriteSample(media.Sample{ Data: encoded.Data, Duration: time.Second / 10, }) @@ -762,13 +836,13 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo track.totalSendQueueTimeNs.Add(uint64(d.Nanoseconds())) //nolint:gosec // G115: bounded > 0 by guard above } - if s.onEncodedFrame != nil { + if onEncodedFrame != nil { isKey := len(encoded.Data) > 0 && (encoded.Data[0]&0x01) == 0 - s.onEncodedFrame(trackID, encoded.Data, isKey) + onEncodedFrame(trackID, encoded.Data, isKey) } - if s.onFrameSent != nil { - s.onFrameSent(trackID, captureTSUs, dequeuedAtWallUs, + if onFrameSent != nil { + onFrameSent(trackID, captureTSUs, dequeuedAtWallUs, tAfterRead.UnixMicro(), tAfterWrite.UnixMicro(), writeErr != nil) } @@ -777,65 +851,6 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo return true, nil } -// processEncodedFrames reads and sends encoded frames for all tracks. -// Encoding happens in parallel (one goroutine per track) because -// encodedReader.Read() includes VP8 encoding via vpx_codec_encode(), -// which takes ~5-10ms per 640x480 frame. Sequential iteration would -// serialize 14 encodes into ~70-140ms, exceeding the 100ms tick interval. -func (s *RTCSender) processEncodedFrames() { - s.tracksMu.RLock() - - totalTracks := len(s.tracks) - if totalTracks == 0 { - s.tracksMu.RUnlock() - - return - } - - type trackResult struct { - hasFrame bool - err error - } - - var wg sync.WaitGroup - results := make([]trackResult, totalTracks) - i := 0 - - for trackID, track := range s.tracks { - idx := i - i++ - wg.Add(1) - - go func(trackID string, track *EncodedTrack) { - defer wg.Done() - - hasFrame, err := s.encodeAndSendTrack(trackID, track) - results[idx] = trackResult{hasFrame: hasFrame, err: err} - }(trackID, track) - } - - wg.Wait() - s.tracksMu.RUnlock() - - allHaveErrors := true - - for _, r := range results { - if r.hasFrame { - allHaveErrors = false - - break - } - - if r.err != nil && !errors.Is(r.err, ErrNoFrameAvailable) { - s.log.Errorf("Error processing track: %v", r.err) - } - } - - s.tracksMu.Lock() - s.noEncodedFrame = allHaveErrors - s.tracksMu.Unlock() -} - // SetBitrateAllocation sets custom bitrate allocation for tracks. // allocation is a map of track ID to arbitrary positive numbers representing relative weights // The values will be normalized internally (each value divided by sum of all values) @@ -960,17 +975,40 @@ func (s *RTCSender) Close() error { s.rtcpCancel = nil } - s.tracksMu.Lock() - defer s.tracksMu.Unlock() - + s.tracksMu.RLock() + tracks := make([]*EncodedTrack, 0, len(s.tracks)) for _, track := range s.tracks { + tracks = append(tracks, track) + } + s.tracksMu.RUnlock() + + for _, track := range tracks { + if track.encodeCancel != nil { + track.encodeCancel() + } + } + // Close the source first; this makes the encode goroutine's pending + // Read return ErrBufferClosed. + for _, track := range tracks { _ = track.videoSource.Close() + } + // Wait for the encode goroutine to exit before closing encodedReader / + // mediaTrack so Close cannot race a concurrent Read on those. + for _, track := range tracks { + if track.encodeDone != nil { + <-track.encodeDone + } + } + for _, track := range tracks { _ = track.encodedReader.Close() _ = track.mediaTrack.Close() } - // Clear audio tracks (TrackLocalStaticSample has no Close method) - s.audioTracks = nil + s.tracksMu.Lock() + s.tracks = make(map[string]*EncodedTrack) + // TrackLocalStaticSample has no Close method; reset map to avoid stale handles. + s.audioTracks = make(map[string]*webrtc.TrackLocalStaticSample) + s.tracksMu.Unlock() if s.peerConnection != nil { return s.peerConnection.Close() @@ -1036,5 +1074,16 @@ func (s *RTCSender) GetEncodeFrameOk() bool { s.tracksMu.RLock() defer s.tracksMu.RUnlock() - return !s.noEncodedFrame + if len(s.tracks) == 0 { + return true + } + + cutoffUs := time.Now().Add(-encodeHealthTimeout).UnixMicro() + for _, track := range s.tracks { + if track.lastEncodeAtWallUs.Load() >= cutoffUs { + return true + } + } + + return false } diff --git a/sender/rtc_sender_bench_test.go b/sender/rtc_sender_bench_test.go index 7fb076e..5e22b46 100644 --- a/sender/rtc_sender_bench_test.go +++ b/sender/rtc_sender_bench_test.go @@ -36,57 +36,6 @@ func newBenchSender(b *testing.B, numTracks int) *RTCSender { return sender } -// BenchmarkProcessEncodedFrames_NoFrames measures the hot-path cost of -// processEncodedFrames when no frames are available (the common case between -// frame arrivals). Before: parallel goroutines + WaitGroup per tick. -// After: sequential non-blocking Read loop. -func BenchmarkProcessEncodedFrames_NoFrames(b *testing.B) { - for _, numTracks := range []int{1, 4, 14} { - b.Run(trackCountName(numTracks), func(b *testing.B) { - sender := newBenchSender(b, numTracks) - defer func() { _ = sender.Close() }() - - b.ReportAllocs() - b.ResetTimer() - for range b.N { - sender.processEncodedFrames() - } - }) - } -} - -// BenchmarkProcessEncodedFrames_WithFrames measures processEncodedFrames when -// every track has a frame ready. This exercises the encode+write path. -func BenchmarkProcessEncodedFrames_WithFrames(b *testing.B) { - for _, numTracks := range []int{1, 4, 14} { - b.Run(trackCountName(numTracks), func(b *testing.B) { - sender := newBenchSender(b, numTracks) - defer func() { _ = sender.Close() }() - - // Pre-create test frame - testImg := image.NewYCbCr( - image.Rect(0, 0, 1280, 720), - image.YCbCrSubsampleRatio420, - ) - - b.ReportAllocs() - b.ResetTimer() - for range b.N { - // Feed a frame to every track - sender.tracksMu.RLock() - for _, track := range sender.tracks { - if fb, ok := track.videoSource.(*FrameBuffer); ok { - _ = fb.SendFrame(testImg) - } - } - sender.tracksMu.RUnlock() - - sender.processEncodedFrames() - } - }) - } -} - // BenchmarkUpdateBitrate measures the bitrate update path. // Before: Infof logging per tick. After: Debugf (no-op at default level). func BenchmarkUpdateBitrate(b *testing.B) { diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index 057bfc7..069e01a 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -46,7 +46,7 @@ func (m *MockVideoEncoderBuilder) BuildVideoEncoder(r video.Reader, p prop.Media type MockReadCloser struct{} func (m *MockReadCloser) Read() ([]byte, func(), error) { - return []byte{}, func() {}, nil + return nil, func() {}, ErrNoFrameAvailable } func (m *MockReadCloser) Close() error { @@ -336,79 +336,12 @@ func TestRTCSender_SetupPeerConnection_VideoOnlyInPC(t *testing.T) { assert.True(t, foundVideo, "video track should be in PeerConnection senders") } -func TestRTCSender_ProcessEncodedFrames_NoTracks(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - // Should return immediately with no tracks. - sender.processEncodedFrames() -} - -func TestRTCSender_ProcessEncodedFrames_WithTracks(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - err = sender.AddVideoTrack(VideoTrackInfo{ - TrackID: "cam-0", - Width: 640, - Height: 480, - InitialBitrate: 500_000, - }) - require.NoError(t, err) - - // Call processEncodedFrames multiple times — exercises the sequential loop - // with a real encoder. The encoder may have data from init (black frame). - sender.processEncodedFrames() - - // Send a frame so the encoder can produce output. - testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) - err = sender.SendFrame("cam-0", testImg) - require.NoError(t, err) - - // Give encoder time to consume the frame. - time.Sleep(150 * time.Millisecond) - - sender.processEncodedFrames() - // We mainly verify no panics or deadlocks here. -} - -func TestRTCSender_ProcessEncodedFrames_AllErrors(t *testing.T) { - sender, err := NewRTCSender() - require.NoError(t, err) - defer func() { _ = sender.Close() }() - - err = sender.AddVideoTrack(VideoTrackInfo{ - TrackID: "cam-0", - Width: 640, - Height: 480, - EncoderBuilder: &MockVideoEncoderBuilder{}, - }) - require.NoError(t, err) - - // Drain any buffered frames — the mock encoder always returns empty data, - // but the FrameBuffer Read() will return ErrNoFrameAvailable in the - // non-blocking path (initialized), which propagates through the encoder. - // Call several times to ensure we hit the no-frame path. - for range 5 { - sender.processEncodedFrames() - } - - // Verify noEncodedFrame reflects the state. - _ = sender.GetEncodeFrameOk() -} - func TestRTCSender_GetEncodeFrameOk(t *testing.T) { sender, err := NewRTCSender() require.NoError(t, err) defer func() { _ = sender.Close() }() - // Default should be true (noEncodedFrame starts false). - assert.True(t, sender.GetEncodeFrameOk()) - - // After processEncodedFrames with no tracks, should still be true (early return). - sender.processEncodedFrames() + // With no tracks, the sender is trivially healthy. assert.True(t, sender.GetEncodeFrameOk()) } @@ -601,11 +534,11 @@ func TestRTCSender_GetTrackStats_PipelineCountersIncrement(t *testing.T) { require.NoError(t, err) // Drive a frame through the encoder so encodeAndSendTrack updates the - // per-track atomic counters that GetTrackStats reports. + // per-track atomic counters that GetTrackStats reports. The encode + // goroutine started by AddVideoTrack picks up the frame on its own. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) require.NoError(t, sender.SendFrame("cam-0", testImg)) time.Sleep(150 * time.Millisecond) - sender.processEncodedFrames() stats := sender.GetTrackStats("cam-0") require.NotNil(t, stats) @@ -639,8 +572,6 @@ func TestRTCSender_GetTrackStats_WithPeerConnection(t *testing.T) { stats := sender.GetTrackStats("cam-0") require.NotNil(t, stats) - // Pipeline counters: still zero, no frames encoded yet. - assert.Equal(t, uint64(0), stats.FramesEncoded) // Network counters: zero because no traffic has flowed. assert.Equal(t, uint64(0), stats.PacketsSent) assert.Equal(t, uint64(0), stats.RoundTripTimeMeasurements) @@ -755,25 +686,26 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) { }) }) - // libvpx has a one-frame encoder lookahead, so a single SendFrame + - // processEncodedFrames will not produce an encoded output yet. Drive - // a small batch and pump processEncodedFrames between sends so the - // 2-slot FrameBuffer drains and the first frame is not evicted before - // the encoder consumes it. This matches the real pipeline where the - // encoder runs at ~30fps with frames continuously injected by the - // cgo bridge. + // Event-driven encode loop is always running, so feed frames at a small + // interval to avoid overflowing the 2-slot source buffer while still + // exercising capture timestamp echoing through the callback path. testImg := image.NewYCbCr(image.Rect(0, 0, 640, 480), image.YCbCrSubsampleRatio420) const wantCaptureTS int64 = 1_700_000_000_000 - for i, ts := range []int64{wantCaptureTS, wantCaptureTS + 33_000, wantCaptureTS + 66_000} { + for i, ts := range []int64{ + wantCaptureTS, + wantCaptureTS + 33_000, + wantCaptureTS + 66_000, + wantCaptureTS + 99_000, + wantCaptureTS + 132_000, + } { require.NoError(t, sender.SendFrameWithCaptureTS("cam-0", testImg, ts), "send %d", i) - sender.processEncodedFrames() + time.Sleep(10 * time.Millisecond) } deadline := time.Now().Add(2 * time.Second) var got *sentEvent for time.Now().Before(deadline) { time.Sleep(50 * time.Millisecond) - sender.processEncodedFrames() mu.Lock() for i := range events {