From 9317d7e3884e5fe8c28c957419ecb0cc8e97040c Mon Sep 17 00:00:00 2001 From: youngSSS Date: Tue, 4 Nov 2025 20:32:22 +0900 Subject: [PATCH 1/3] jitter: Add automatic buffer reset on large RTP discontinuities This commit improves the resilience of the jitter buffer by automatically detecting and handling large RTP discontinuities in both sequence numbers and timestamps. New features: - Detect large sequence number jumps (>1000) and timestamp jumps (>30s) - Automatically reset buffer state when discontinuities are detected - Add warning logs for operational visibility when jumps occur - Track previous timestamp to enable timestamp discontinuity detection Bug fixes: - Prevent buffer from staying in invalid state after large gaps - Improve packet expiration handling after discontinuities to reduce premature drops and delayed emission The reset mechanism clears all buffered packets and reinitializes the buffer state, ensuring clean recovery from stream interruptions. Tests: - Add TestLargeSequenceJump to verify sequence discontinuity handling - Add TestLargeTimestampJump to verify timestamp discontinuity handling - Add TestSequenceWraparound to ensure wraparound boundaries work correctly --- jitter/buffer.go | 84 +++++++++++++++++++++++++++++++++- jitter/buffer_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/jitter/buffer.go b/jitter/buffer.go index fa0864e..2b910f3 100644 --- a/jitter/buffer.go +++ b/jitter/buffer.go @@ -43,6 +43,7 @@ type Buffer struct { initialized bool prevSN uint16 + prevTS uint32 head *packet tail *packet @@ -187,6 +188,75 @@ func (b *Buffer) Close() { b.closed.Break() } +func (b *Buffer) isLargeTimestampJump(current, prev uint32) bool { + const MAX_TIMESTAMP_JUMP = 8000 * 30 // 30 seconds at 8kHz + + if !b.initialized { + return false + } + + cur := int64(current) + prv := int64(prev) + + forwardDiff := cur - prv + if forwardDiff < 0 { + forwardDiff += (1 << 32) // handle 32-bit wrap-around + } + + backwardDiff := prv - cur + if backwardDiff < 0 { + backwardDiff += (1 << 32) // handle 32-bit wrap-around + } + + return min(backwardDiff, forwardDiff) > MAX_TIMESTAMP_JUMP +} + +func (b *Buffer) isLargeSequenceJump(current, prev uint16) bool { + const MAX_SEQUENCE_JUMP = 1000 + + if !b.initialized { + return false + } + + cur := int32(current) + prv := int32(prev) + + forwardDiff := cur - prv + if forwardDiff < 0 { + forwardDiff += 65536 // handle wrap-around + } + + backwardDiff := prv - cur + if backwardDiff < 0 { + backwardDiff += 65536 // handle wrap-around + } + + return min(backwardDiff, forwardDiff) > MAX_SEQUENCE_JUMP +} + +func (b *Buffer) reset() { + b.logger.Infow("resetting jitter buffer due to RTP discontinuity") + + for b.head != nil { + next := b.head.next + b.free(b.head) + b.head = next + } + b.tail = nil + + b.initialized = false + b.prevSN = 0 + b.prevTS = 0 + + if !b.timer.Stop() { + select { + case <-b.timer.C: + default: + } + } + b.timer.Reset(b.latency) +} + // push adds a packet to the buffer func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) { b.stats.PacketsPushed++ @@ -197,8 +267,19 @@ func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) { } } + if b.isLargeTimestampJump(pkt.Timestamp, b.prevTS) || + b.isLargeSequenceJump(pkt.SequenceNumber, b.prevSN) { + b.logger.Infow("large RTP discontinuity detected", + "current_ts", pkt.Timestamp, + "prev_ts", b.prevTS, + "current_sn", pkt.SequenceNumber, + "prev_sn", b.prevSN, + ) + b.reset() + } + if b.initialized && before(pkt.SequenceNumber, b.prevSN) { - // packet expired + // packet expired (not after discontinuity reset) if !pkt.Padding { b.stats.PacketsDropped++ if b.onPacketLoss != nil { @@ -354,6 +435,7 @@ func (b *Buffer) popSample() []ExtPacket { func (b *Buffer) popHead() *packet { c := b.head b.prevSN = c.extPacket.SequenceNumber + b.prevTS = c.extPacket.Timestamp b.head = c.next if b.head == nil { b.tail = nil diff --git a/jitter/buffer_test.go b/jitter/buffer_test.go index aa73735..c3db2ad 100644 --- a/jitter/buffer_test.go +++ b/jitter/buffer_test.go @@ -232,6 +232,80 @@ func TestDroppedPackets(t *testing.T) { }) } +func TestLargeSequenceJump(t *testing.T) { + out := make(chan []ExtPacket, 100) + b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out)) + s := newTestStream() + + // push some normal packets + for i := 0; i < 10; i++ { + b.Push(s.gen(true, true)) + checkSample(t, out, 1) + } + + // simulate large sequence jump (should trigger reset) + s.largeSeqJump() + + // buffer should reset and accept new packets + for i := 0; i < 10; i++ { + b.Push(s.gen(true, true)) + checkSample(t, out, 1) + } + + stats := b.Stats() + require.Equal(t, uint64(20), stats.PacketsPushed) + require.Equal(t, uint64(20), stats.PacketsPopped) + require.Equal(t, uint64(20), stats.SamplesPopped) +} + +func TestLargeTimestampJump(t *testing.T) { + out := make(chan []ExtPacket, 100) + b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out)) + s := ×tampStream{ + seq: uint16(rand.Uint32()), + ts: uint32(rand.Uint32()), + } + + // push some normal packets + for i := 0; i < 10; i++ { + b.Push(s.gen(true, true)) + checkSample(t, out, 1) + } + + // simulate large timestamp jump (should trigger reset) + s.largeTimestampJump() + + // buffer should reset and accept new packets + for i := 0; i < 10; i++ { + b.Push(s.gen(true, true)) + checkSample(t, out, 1) + } + + stats := b.Stats() + require.Equal(t, uint64(20), stats.PacketsPushed) + require.Equal(t, uint64(20), stats.PacketsPopped) + require.Equal(t, uint64(20), stats.SamplesPopped) +} + +func TestSequenceWraparound(t *testing.T) { + out := make(chan []ExtPacket, 100) + b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out)) + s := &stream{ + seq: math.MaxUint16 - 5, // start near wrap point + } + + // push packets across wraparound boundary + for i := 0; i < 15; i++ { + b.Push(s.gen(true, true)) + checkSample(t, out, 1) + } + + stats := b.Stats() + require.Equal(t, uint64(15), stats.PacketsPushed) + require.Equal(t, uint64(15), stats.PacketsPopped) + require.Equal(t, uint64(0), stats.PacketsLost) +} + func checkSample(t *testing.T, out chan []ExtPacket, expected int) { select { case sample := <-out: @@ -285,6 +359,36 @@ func (s *stream) discont() { s.seq += math.MaxUint16 / 2 } +func (s *stream) largeSeqJump() { + s.seq += 2000 // more than MAX_SEQUENCE_JUMP (1000) +} + +type timestampStream struct { + seq uint16 + ts uint32 +} + +func (s *timestampStream) gen(head, tail bool) *rtp.Packet { + p := &rtp.Packet{ + Header: rtp.Header{ + Marker: tail, + SequenceNumber: s.seq, + Timestamp: s.ts, + }, + Payload: make([]byte, defaultPacketSize), + } + if head { + copy(p.Payload, headerBytes) + } + s.seq++ + s.ts += 160 // typical increment for 20ms at 8kHz + return p +} + +func (s *timestampStream) largeTimestampJump() { + s.ts += 8000 * 60 // 60 seconds worth of samples (more than MAX_TIMESTAMP_JUMP) +} + const defaultPacketSize = 200 var headerBytes = []byte{0xaa, 0xaa} From 6c29e15c200295febcb1f7985517d211a55b9ae7 Mon Sep 17 00:00:00 2001 From: youngSSS Date: Tue, 4 Nov 2025 20:38:53 +0900 Subject: [PATCH 2/3] change sample rate to 480000 --- jitter/buffer.go | 2 +- jitter/buffer_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jitter/buffer.go b/jitter/buffer.go index 2b910f3..990f177 100644 --- a/jitter/buffer.go +++ b/jitter/buffer.go @@ -189,7 +189,7 @@ func (b *Buffer) Close() { } func (b *Buffer) isLargeTimestampJump(current, prev uint32) bool { - const MAX_TIMESTAMP_JUMP = 8000 * 30 // 30 seconds at 8kHz + const MAX_TIMESTAMP_JUMP = 48000 * 30 // 30 seconds at 48kHz if !b.initialized { return false diff --git a/jitter/buffer_test.go b/jitter/buffer_test.go index c3db2ad..c25b708 100644 --- a/jitter/buffer_test.go +++ b/jitter/buffer_test.go @@ -245,7 +245,7 @@ func TestLargeSequenceJump(t *testing.T) { // simulate large sequence jump (should trigger reset) s.largeSeqJump() - + // buffer should reset and accept new packets for i := 0; i < 10; i++ { b.Push(s.gen(true, true)) @@ -274,7 +274,7 @@ func TestLargeTimestampJump(t *testing.T) { // simulate large timestamp jump (should trigger reset) s.largeTimestampJump() - + // buffer should reset and accept new packets for i := 0; i < 10; i++ { b.Push(s.gen(true, true)) @@ -381,7 +381,7 @@ func (s *timestampStream) gen(head, tail bool) *rtp.Packet { copy(p.Payload, headerBytes) } s.seq++ - s.ts += 160 // typical increment for 20ms at 8kHz + s.ts += 960 // typical increment for 20ms at 48kHz return p } From 7be278b0a46acf7ece5f3210fcf4da3ba1ed8eea Mon Sep 17 00:00:00 2001 From: youngSSS Date: Wed, 5 Nov 2025 11:21:35 +0900 Subject: [PATCH 3/3] apply review: update to set jump value if needed --- jitter/buffer.go | 51 +++++++++++++++++++++++++++++++++---------- jitter/buffer_test.go | 4 ++-- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/jitter/buffer.go b/jitter/buffer.go index 990f177..5c95579 100644 --- a/jitter/buffer.go +++ b/jitter/buffer.go @@ -15,6 +15,7 @@ package jitter import ( + "math" "sync" "time" @@ -52,6 +53,9 @@ type Buffer struct { pool *packet size int + + maxSequenceJump *uint16 + maxTimestampJump *uint32 } type Option func(*Buffer) @@ -113,6 +117,18 @@ func WithPacketLossHandler(handler func()) Option { } } +func WithMaxSequenceJump(max uint16) Option { + return func(b *Buffer) { + b.maxSequenceJump = &max + } +} + +func WithMaxTimestampJump(max uint32) Option { + return func(b *Buffer) { + b.maxTimestampJump = &max + } +} + func (b *Buffer) WithLogger(logger logger.Logger) *Buffer { b.logger = logger return b @@ -189,61 +205,72 @@ func (b *Buffer) Close() { } func (b *Buffer) isLargeTimestampJump(current, prev uint32) bool { - const MAX_TIMESTAMP_JUMP = 48000 * 30 // 30 seconds at 48kHz - - if !b.initialized { + if b.maxTimestampJump == nil || !b.initialized { return false } + maxJump := uint32(*b.maxTimestampJump) + cur := int64(current) prv := int64(prev) forwardDiff := cur - prv if forwardDiff < 0 { - forwardDiff += (1 << 32) // handle 32-bit wrap-around + forwardDiff += int64(math.MaxUint32) + 1 } backwardDiff := prv - cur if backwardDiff < 0 { - backwardDiff += (1 << 32) // handle 32-bit wrap-around + backwardDiff += int64(math.MaxUint32) + 1 } - return min(backwardDiff, forwardDiff) > MAX_TIMESTAMP_JUMP + return min(backwardDiff, forwardDiff) > int64(maxJump) } func (b *Buffer) isLargeSequenceJump(current, prev uint16) bool { - const MAX_SEQUENCE_JUMP = 1000 - - if !b.initialized { + if b.maxSequenceJump == nil || !b.initialized { return false } + maxJump := int32(*b.maxSequenceJump) + cur := int32(current) prv := int32(prev) forwardDiff := cur - prv if forwardDiff < 0 { - forwardDiff += 65536 // handle wrap-around + forwardDiff += int32(math.MaxUint16) + 1 } backwardDiff := prv - cur if backwardDiff < 0 { - backwardDiff += 65536 // handle wrap-around + backwardDiff += int32(math.MaxUint16) + 1 } - return min(backwardDiff, forwardDiff) > MAX_SEQUENCE_JUMP + return min(backwardDiff, forwardDiff) > maxJump } func (b *Buffer) reset() { b.logger.Infow("resetting jitter buffer due to RTP discontinuity") + dropped := 0 for b.head != nil { next := b.head.next + if !b.head.extPacket.Padding { + dropped++ + } b.free(b.head) b.head = next } b.tail = nil + if dropped > 0 { + b.stats.PacketsDropped += uint64(dropped) + if b.onPacketLoss != nil { + b.onPacketLoss() + } + } + b.initialized = false b.prevSN = 0 b.prevTS = 0 diff --git a/jitter/buffer_test.go b/jitter/buffer_test.go index c25b708..0cdc9c4 100644 --- a/jitter/buffer_test.go +++ b/jitter/buffer_test.go @@ -234,7 +234,7 @@ func TestDroppedPackets(t *testing.T) { func TestLargeSequenceJump(t *testing.T) { out := make(chan []ExtPacket, 100) - b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out)) + b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out), WithMaxSequenceJump(1000)) s := newTestStream() // push some normal packets @@ -260,7 +260,7 @@ func TestLargeSequenceJump(t *testing.T) { func TestLargeTimestampJump(t *testing.T) { out := make(chan []ExtPacket, 100) - b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out)) + b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out), WithMaxTimestampJump(48000*30)) s := ×tampStream{ seq: uint16(rand.Uint32()), ts: uint32(rand.Uint32()),