Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cmd/test-streammanager/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"fmt"

"github.com/wavetermdev/waveterm/pkg/wshrpc"
)

// WriterBridge - used by the writer broker
// Sends data to the pipe, receives acks from the pipe
type WriterBridge struct {
pipe *DeliveryPipe
}

func (b *WriterBridge) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error {
b.pipe.EnqueueData(data)
return nil
}

func (b *WriterBridge) StreamDataAckCommand(ack wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error {
return fmt.Errorf("writer bridge should not send acks")
}

// ReaderBridge - used by the reader broker
// Sends acks to the pipe, receives data from the pipe
type ReaderBridge struct {
pipe *DeliveryPipe
}

func (b *ReaderBridge) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error {
return fmt.Errorf("reader bridge should not send data")
}

func (b *ReaderBridge) StreamDataAckCommand(ack wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error {
b.pipe.EnqueueAck(ack)
return nil
}
249 changes: 249 additions & 0 deletions cmd/test-streammanager/deliverypipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"encoding/base64"
"math/rand"
"sort"
"sync"
"time"

"github.com/wavetermdev/waveterm/pkg/wshrpc"
)

type DeliveryConfig struct {
Delay time.Duration
Skew time.Duration
}

type taggedPacket struct {
seq uint64
deliveryTime time.Time
isData bool
dataPk wshrpc.CommandStreamData
ackPk wshrpc.CommandStreamAckData
dataSize int
}

type DeliveryPipe struct {
lock sync.Mutex
config DeliveryConfig

// Sequence counters (separate for data and ack)
dataSeq uint64
ackSeq uint64

// Pending packets sorted by (deliveryTime, seq)
dataPending []taggedPacket
ackPending []taggedPacket

// Delivery targets
dataTarget func(wshrpc.CommandStreamData)
ackTarget func(wshrpc.CommandStreamAckData)

// Control
closed bool
wg sync.WaitGroup

// Metrics
metrics *Metrics
lastDataSeqNum int64
lastAckSeqNum int64

// Byte tracking for high water mark
currentBytes int64
}

func NewDeliveryPipe(config DeliveryConfig, metrics *Metrics) *DeliveryPipe {
return &DeliveryPipe{
config: config,
metrics: metrics,
lastDataSeqNum: -1,
lastAckSeqNum: -1,
}
}

func (dp *DeliveryPipe) SetDataTarget(fn func(wshrpc.CommandStreamData)) {
dp.lock.Lock()
defer dp.lock.Unlock()
dp.dataTarget = fn
}

func (dp *DeliveryPipe) SetAckTarget(fn func(wshrpc.CommandStreamAckData)) {
dp.lock.Lock()
defer dp.lock.Unlock()
dp.ackTarget = fn
}

func (dp *DeliveryPipe) EnqueueData(pkt wshrpc.CommandStreamData) {
dp.lock.Lock()
defer dp.lock.Unlock()

if dp.closed {
return
}

dataSize := base64.StdEncoding.DecodedLen(len(pkt.Data64))
dp.dataSeq++
tagged := taggedPacket{
seq: dp.dataSeq,
deliveryTime: dp.computeDeliveryTime(),
isData: true,
dataPk: pkt,
dataSize: dataSize,
}

dp.dataPending = append(dp.dataPending, tagged)
dp.sortPending(&dp.dataPending)

dp.currentBytes += int64(dataSize)
if dp.metrics != nil {
dp.metrics.AddDataPacket()
dp.metrics.UpdatePipeHighWaterMark(dp.currentBytes)
}
}

func (dp *DeliveryPipe) EnqueueAck(pkt wshrpc.CommandStreamAckData) {
dp.lock.Lock()
defer dp.lock.Unlock()

if dp.closed {
return
}

dp.ackSeq++
tagged := taggedPacket{
seq: dp.ackSeq,
deliveryTime: dp.computeDeliveryTime(),
isData: false,
ackPk: pkt,
}

dp.ackPending = append(dp.ackPending, tagged)
dp.sortPending(&dp.ackPending)

if dp.metrics != nil {
dp.metrics.AddAckPacket()
}
}

func (dp *DeliveryPipe) computeDeliveryTime() time.Time {
base := time.Now().Add(dp.config.Delay)

if dp.config.Skew == 0 {
return base
}

// Random skew: -skew to +skew
skewNs := dp.config.Skew.Nanoseconds()
randomSkew := time.Duration(rand.Int63n(2*skewNs+1) - skewNs)
return base.Add(randomSkew)
}

func (dp *DeliveryPipe) sortPending(pending *[]taggedPacket) {
sort.Slice(*pending, func(i, j int) bool {
pi, pj := (*pending)[i], (*pending)[j]
if pi.deliveryTime.Equal(pj.deliveryTime) {
return pi.seq < pj.seq
}
return pi.deliveryTime.Before(pj.deliveryTime)
})
}

func (dp *DeliveryPipe) Start() {
dp.wg.Add(2)
go dp.dataDeliveryLoop()
go dp.ackDeliveryLoop()
}

func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.dataTarget(pkt.dataPk)

dp.lock.Lock()
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
}
},
)
}
Comment on lines +161 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Data race on lastDataSeqNum access.

The deliver callback accesses and modifies dp.lastDataSeqNum (lines 169, 173) outside of the mutex lock. Since the deliver function runs after dp.lock.Unlock() in deliveryLoop, this creates a data race if multiple goroutines or iterations access these fields concurrently.

The same issue exists in ackDeliveryLoop with lastAckSeqNum (lines 192, 196).

🔒 Proposed fix to protect lastDataSeqNum access
 func (dp *DeliveryPipe) dataDeliveryLoop() {
 	defer dp.wg.Done()
 	dp.deliveryLoop(
 		func() *[]taggedPacket { return &dp.dataPending },
 		func(pkt taggedPacket) {
 			if dp.dataTarget != nil {
+				dp.lock.Lock()
 				// Track out-of-order packets
 				if dp.metrics != nil && dp.lastDataSeqNum != -1 {
 					if pkt.dataPk.Seq < dp.lastDataSeqNum {
 						dp.metrics.AddOOOPacket()
 					}
 				}
 				dp.lastDataSeqNum = pkt.dataPk.Seq
+				dp.currentBytes -= int64(pkt.dataSize)
+				dp.lock.Unlock()
+
 				dp.dataTarget(pkt.dataPk)
-
-				dp.lock.Lock()
-				dp.currentBytes -= int64(pkt.dataSize)
-				dp.lock.Unlock()
 			}
 		},
 	)
 }

Apply the same pattern to ackDeliveryLoop for lastAckSeqNum.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.dataTarget(pkt.dataPk)
dp.lock.Lock()
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
}
},
)
}
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
dp.lock.Lock()
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
dp.dataTarget(pkt.dataPk)
}
},
)
}
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/deliverypipe.go` around lines 161 - 182, The callback
in DeliveryPipe.dataDeliveryLoop accesses and updates dp.lastDataSeqNum without
holding dp.lock, causing a data race; modify the deliver closure used in
deliveryLoop so that any read/write of dp.lastDataSeqNum (and the corresponding
metrics.AddOOOPacket() decision) is done while holding dp.lock — i.e., acquire
dp.lock before comparing pkt.dataPk.Seq to dp.lastDataSeqNum and before
assigning dp.lastDataSeqNum, then release the lock; apply the same pattern in
ackDeliveryLoop to protect dp.lastAckSeqNum (wrap the check, metrics call, and
update in dp.lock).


func (dp *DeliveryPipe) ackDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.ackPending },
func(pkt taggedPacket) {
if dp.ackTarget != nil {
// Track out-of-order acks
if dp.metrics != nil && dp.lastAckSeqNum != -1 {
if pkt.ackPk.Seq < dp.lastAckSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastAckSeqNum = pkt.ackPk.Seq
dp.ackTarget(pkt.ackPk)
}
},
)
}

func (dp *DeliveryPipe) deliveryLoop(
getPending func() *[]taggedPacket,
deliver func(taggedPacket),
) {
for {
dp.lock.Lock()
if dp.closed {
dp.lock.Unlock()
return
}

pending := getPending()
now := time.Now()

// Find all packets ready for delivery (deliveryTime <= now)
readyCount := 0
for _, pkt := range *pending {
if pkt.deliveryTime.After(now) {
break
}
readyCount++
}

// Extract ready packets
ready := make([]taggedPacket, readyCount)
copy(ready, (*pending)[:readyCount])
*pending = (*pending)[readyCount:]

dp.lock.Unlock()

// Deliver all ready packets (outside lock)
for _, pkt := range ready {
deliver(pkt)
}

// Always sleep 1ms - simple busy loop
time.Sleep(1 * time.Millisecond)
}
}

func (dp *DeliveryPipe) Close() {
dp.lock.Lock()
dp.closed = true
dp.lock.Unlock()

dp.wg.Wait()
}
40 changes: 40 additions & 0 deletions cmd/test-streammanager/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"io"
)

// Base64 charset: all printable, easy to inspect manually
const Base64Chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"

type TestDataGenerator struct {
totalBytes int64
generated int64
}

func NewTestDataGenerator(totalBytes int64) *TestDataGenerator {
return &TestDataGenerator{totalBytes: totalBytes}
}

func (g *TestDataGenerator) Read(p []byte) (n int, err error) {
if g.generated >= g.totalBytes {
return 0, io.EOF
}

remaining := g.totalBytes - g.generated
toRead := int64(len(p))
if toRead > remaining {
toRead = remaining
}

// Sequential pattern using base64 chars (0-63 cycling)
for i := int64(0); i < toRead; i++ {
p[i] = Base64Chars[(g.generated+i)%64]
}

g.generated += toRead
return int(toRead), nil
}
Loading
Loading