Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions sender/frame_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ type frameWithMeta struct {
// FrameBuffer is a simple in-memory frame buffer that implements VideoSource
// It can be used as a virtual video driver for testing or programmatic frame injection.
type FrameBuffer struct {
frameChan chan frameWithMeta
closeChan chan struct{}
closeOnce sync.Once
width int
height int
id string
initialized bool
lastCaptureTSUs atomic.Int64
frameChan chan frameWithMeta
closeChan chan struct{}
closeOnce sync.Once
width int
height int
id string
initialized bool
lastCaptureTSUs atomic.Int64
lastDequeueWallUs atomic.Int64
}

// NewFrameBuffer creates a new frame buffer with the specified dimensions.
Expand Down Expand Up @@ -133,6 +134,7 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) {
select {
case fm := <-f.frameChan:
f.lastCaptureTSUs.Store(fm.captureTSUs)
f.lastDequeueWallUs.Store(time.Now().UnixMicro())

return fm.img, func() {}, nil
case <-f.closeChan:
Expand All @@ -149,6 +151,7 @@ func (f *FrameBuffer) Read() (image.Image, func(), error) {
select {
case fm := <-f.frameChan:
f.lastCaptureTSUs.Store(fm.captureTSUs)
f.lastDequeueWallUs.Store(time.Now().UnixMicro())

return fm.img, func() {}, nil
case <-f.closeChan:
Expand All @@ -167,6 +170,14 @@ func (f *FrameBuffer) LastCaptureTSUs() int64 {
return f.lastCaptureTSUs.Load()
}

// LastDequeueWallUs returns time.Now().UnixMicro() at the most recent
// real-frame Read (the queue-exit instant), or 0 if no real frame has
// been consumed yet. Intended for the encode loop to read right after
// encodedReader.Read.
func (f *FrameBuffer) LastDequeueWallUs() int64 {
return f.lastDequeueWallUs.Load()
}

// SendFrame adds a frame to the buffer with no capture timestamp.
// If the buffer is full, it drops the oldest frame and adds the new one.
func (f *FrameBuffer) SendFrame(frame image.Image) error {
Expand Down
46 changes: 46 additions & 0 deletions sender/frame_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,52 @@ func TestFrameBuffer_BlackFrame_FirstReadReturnsZero(t *testing.T) {
"first black-frame Read on a fresh buffer should return 0")
}

func TestFrameBuffer_LastDequeueWallUs_AdvancesOnRead(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()
fb.SetInitialized()

assert.Zero(t, fb.LastDequeueWallUs(), "fresh buffer should report zero")

testImg := image.NewRGBA(image.Rect(0, 0, 640, 480))
_, err := fb.SendFrameWithCaptureTS(testImg, 111)
require.NoError(t, err)

before := time.Now().UnixMicro()
_, release, err := fb.Read()
require.NoError(t, err)
release()
after := time.Now().UnixMicro()

got := fb.LastDequeueWallUs()
assert.GreaterOrEqual(t, got, before, "dequeue stamp must be >= pre-Read wall clock")
assert.LessOrEqual(t, got, after, "dequeue stamp must be <= post-Read wall clock")
}

func TestFrameBuffer_LastDequeueWallUs_PreservedOnBlackFrame(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()

testImg := image.NewRGBA(image.Rect(0, 0, 640, 480))
_, err := fb.SendFrameWithCaptureTS(testImg, 555)
require.NoError(t, err)

// First Read consumes the buffered frame and stamps a real dequeue.
_, release, err := fb.Read()
require.NoError(t, err)
release()
stampAfterReal := fb.LastDequeueWallUs()
require.Positive(t, stampAfterReal)

// Second Read hits the 100ms timeout (uninitialized + empty) and
// returns a black frame; LastDequeueWallUs must NOT be clobbered.
_, release, err = fb.Read()
require.NoError(t, err)
release()
assert.Equal(t, stampAfterReal, fb.LastDequeueWallUs(),
"black-frame Read should preserve LastDequeueWallUs")
}

func TestFrameBuffer_SendWithCaptureTs_OverflowReportsEviction(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()
Expand Down
31 changes: 17 additions & 14 deletions sender/rtc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ type EncodedTrack struct {
// encode pass.
type EncodedFrameCallback func(trackID string, data []byte, isKeyframe bool)

// FrameSentCallback fires once per encoded frame after WriteSample
// returns, and once per frame evicted from a full buffer to make room.
// captureTSUs echoes the value passed to SendFrameWithCaptureTS (0 means
// no timestamp; filter when used for latency measurement). sentAtWallUs
// is time.Now().UnixMicro() at WriteSample return (or at eviction).
// dropped is true when WriteSample errored or the frame was evicted.
// Runs on the encode goroutine for completed sends, on the SendFrame
// caller's goroutine for evictions; must not block.
type FrameSentCallback func(trackID string, captureTSUs, sentAtWallUs int64, dropped bool)
// FrameSentCallback fires per encoded frame (after WriteSample) and per
// frame evicted from a full buffer. captureTSUs echoes the value passed
// to SendFrameWithCaptureTS (0 = none). dequeuedAtWallUs / encodeDoneAtWallUs
// / sentAtWallUs are time.Now().UnixMicro() at FrameBuffer.Read return,
// encodedReader.Read return, and WriteSample return respectively. On
// dropped=true (WriteSample error or eviction), dequeuedAtWallUs and
// encodeDoneAtWallUs are 0. Runs on the encode goroutine for completed
// sends, on the SendFrame caller's goroutine for evictions; must not block.
type FrameSentCallback func(trackID string,
captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs int64,
dropped bool)

// RTCSender is a generic sender that can work with any frame source.
type RTCSender struct {
Expand Down Expand Up @@ -529,7 +531,7 @@ func (s *RTCSender) SendFrameWithCaptureTS(trackID string, frame image.Image, ca

evicted, err := frameBuffer.SendFrameWithCaptureTS(frame, captureTSUs)
if evicted && s.onFrameSent != nil {
s.onFrameSent(trackID, 0, time.Now().UnixMicro(), true)
s.onFrameSent(trackID, 0, 0, 0, time.Now().UnixMicro(), true)
}

return err
Expand Down Expand Up @@ -734,11 +736,11 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo
}
tAfterRead := time.Now()

// Capture timestamp for the just-encoded frame; non-FrameBuffer
// sources return 0.
var captureTSUs int64
// Per-frame stamps; non-FrameBuffer sources report 0.
var captureTSUs, dequeuedAtWallUs int64
if cfs, ok := track.videoSource.(*FrameBuffer); ok {
captureTSUs = cfs.LastCaptureTSUs()
dequeuedAtWallUs = cfs.LastDequeueWallUs()
}

track.bitrateTracker.AddFrame(len(encoded.Data), tAfterRead)
Expand Down Expand Up @@ -766,7 +768,8 @@ func (s *RTCSender) encodeAndSendTrack(trackID string, track *EncodedTrack) (boo
}

if s.onFrameSent != nil {
s.onFrameSent(trackID, captureTSUs, tAfterWrite.UnixMicro(), writeErr != nil)
s.onFrameSent(trackID, captureTSUs, dequeuedAtWallUs,
tAfterRead.UnixMicro(), tAfterWrite.UnixMicro(), writeErr != nil)
}

release()
Expand Down
31 changes: 24 additions & 7 deletions sender/rtc_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,19 +733,26 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) {
require.NoError(t, err)

type sentEvent struct {
trackID string
captureTSUs int64
sentAtWallUs int64
dropped bool
trackID string
captureTSUs int64
dequeuedAtWallUs int64
encodeDoneAtWallUs int64
sentAtWallUs int64
dropped bool
}
var (
mu sync.Mutex
events []sentEvent
)
sender.SetOnFrameSent(func(trackID string, captureTSUs, sentAtWallUs int64, dropped bool) {
sender.SetOnFrameSent(func(trackID string,
captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs int64,
dropped bool,
) {
mu.Lock()
defer mu.Unlock()
events = append(events, sentEvent{trackID, captureTSUs, sentAtWallUs, dropped})
events = append(events, sentEvent{
trackID, captureTSUs, dequeuedAtWallUs, encodeDoneAtWallUs, sentAtWallUs, dropped,
})
})

// libvpx has a one-frame encoder lookahead, so a single SendFrame +
Expand Down Expand Up @@ -782,8 +789,14 @@ func TestRTCSender_OnFrameSent_FiresWithCaptureTs(t *testing.T) {
require.NotNil(t, got, "OnFrameSent never echoed the supplied captureTSUs within 2s")
assert.Equal(t, "cam-0", got.trackID)
assert.Equal(t, wantCaptureTS, got.captureTSUs)
assert.Positive(t, got.dequeuedAtWallUs, "dequeuedAtWallUs should be set on success path")
assert.Positive(t, got.encodeDoneAtWallUs, "encodeDoneAtWallUs should be set on success path")
assert.Positive(t, got.sentAtWallUs, "sentAtWallUs should be a real wall-clock microsecond")
assert.False(t, got.dropped, "WriteSample should succeed for an in-spec frame")
assert.LessOrEqual(t, got.dequeuedAtWallUs, got.encodeDoneAtWallUs,
"dequeue must precede encode-done")
assert.LessOrEqual(t, got.encodeDoneAtWallUs, got.sentAtWallUs,
"encode-done must precede send")
}

func TestRTCSender_OnFrameSent_FiresWithDroppedOnEviction(t *testing.T) {
Expand All @@ -804,11 +817,15 @@ func TestRTCSender_OnFrameSent_FiresWithDroppedOnEviction(t *testing.T) {
dropped int
nonDrop int
)
sender.SetOnFrameSent(func(_ string, _ int64, _ int64, isDrop bool) {
sender.SetOnFrameSent(func(_ string, _, dequeued, encodeDone, _ int64, isDrop bool) {
mu.Lock()
defer mu.Unlock()
if isDrop {
dropped++
// Evictions must report zero for the intermediate timestamps;
// no encode happened.
assert.Zero(t, dequeued, "evictions should report dequeuedAtWallUs=0")
assert.Zero(t, encodeDone, "evictions should report encodeDoneAtWallUs=0")
} else {
nonDrop++
}
Expand Down
Loading