stress test for the new RPC streaming primitives (+ bug fixes)#2828
stress test for the new RPC streaming primitives (+ bug fixes)#2828
Conversation
WalkthroughAdds a test harness for StreamManager and refactors ACK/buffer APIs. New cmd/test-streammanager files provide: a DeliveryPipe simulator (configurable delay/skew, data/ack queues, delivery loops, metrics), WriterBridge/ReaderBridge for routing stream commands, a deterministic TestDataGenerator, a thread-safe Metrics collector, a Verifier for data integrity, and a Cobra-based integration CLI that wires these pieces together. pkg/jobmanager replaces blocking Write/WriteCtx with a non-blocking WriteAvailable API. pkg/jobmanager/streammanager and pkg/streamclient switch from single-value ACK tracking to tuple-based tracking (maxAckedSeq, maxAckedRwnd) to filter stale ACKs. Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@cmd/test-streammanager/deliverypipe.go`:
- Around line 161-182: The callback in DeliveryPipe.dataDeliveryLoop accesses
and updates dp.lastDataSeqNum without holding dp.lock, causing a data race;
modify the deliver closure used in deliveryLoop so that any read/write of
dp.lastDataSeqNum (and the corresponding metrics.AddOOOPacket() decision) is
done while holding dp.lock — i.e., acquire dp.lock before comparing
pkt.dataPk.Seq to dp.lastDataSeqNum and before assigning dp.lastDataSeqNum, then
release the lock; apply the same pattern in ackDeliveryLoop to protect
dp.lastAckSeqNum (wrap the check, metrics call, and update in dp.lock).
In `@cmd/test-streammanager/main-test-streammanager.go`:
- Around line 134-135: The inline step comment before the completion wait is
mis-numbered: update the comment "// 15. Wait for completion" immediately above
the statement "err = <-done" (in the main test stream manager flow) to read "//
16. Wait for completion" so the step numbering is sequential with the earlier
"// 15." comment; adjust only the comment text.
In `@cmd/test-streammanager/metrics.go`:
- Around line 87-105: The throughput calculation in Metrics.Report can divide by
zero or use a negative duration; guard against this by computing duration :=
m.endTime.Sub(m.startTime) and if duration <= 0 (or duration.Seconds() == 0) set
throughput to 0.0 instead of performing the division, then continue using
throughput in the fmt.Sprintf; update the function Metrics.Report to use this
guarded throughput calculation referencing m.startTime, m.endTime, m.totalBytes
and the Report method.
In `@cmd/test-streammanager/verifier.go`:
- Around line 29-39: The code currently ignores the return values from
v.expectedGen.Read when filling expected, causing comparisons against unwritten
bytes; change the read to capture (n, err) := v.expectedGen.Read(expected),
handle err (treat io.EOF as a valid partial read or propagate unexpected
errors), and only compare the first n bytes of expected to p (and adjust how
v.totalReceived / v.firstMismatch are computed to use i within the n range);
ensure you update v.mismatches and v.firstMismatch only for indices 0..n-1 and
handle the case where n < len(p) appropriately.
🧹 Nitpick comments (3)
cmd/test-streammanager/deliverypipe.go (1)
98-99: Consider using a heap for pending packets.Calling
sortPendingafter everyEnqueueData/EnqueueAckis O(n log n) per insertion. For a stress test tool that may enqueue many packets, usingcontainer/heapwould reduce this to O(log n) per operation.This is acceptable for a test tool but may become a bottleneck under heavy load.
cmd/test-streammanager/main-test-streammanager.go (1)
197-207: SlowReader rate limiting is coarse-grained.The implementation sleeps a full second before every read, regardless of actual bytes read. If the underlying reader returns fewer bytes (e.g., near EOF or buffer smaller than
bytesPerSec), the effective rate will be lower than intended.For a test tool this is acceptable, but consider a more precise approach using token bucket or proportional sleep based on actual bytes read.
pkg/jobmanager/cirbuf.go (1)
77-98: Consider: byte-by-byte loop could be optimized with bulk copy.The current implementation iterates one byte at a time. For a 4KB packet, this executes 4096 loop iterations. While functionally correct, bulk copying (handling wrap-around) would improve throughput.
Since this is supporting a test tool, the current approach is acceptable. If this path becomes latency-sensitive in production, consider refactoring to calculate contiguous segments and use
copy()for bulk transfers.♻️ Sketch of bulk copy optimization
// Calculate how many bytes fit before window limit or wrap-around available := cb.windowSize - cb.count if cb.syncMode && available <= 0 { // handle blocking case } toWrite := min(len(data), available) // Handle wrap-around with at most two copy() calls firstChunk := min(toWrite, len(cb.buf)-cb.writePos) copy(cb.buf[cb.writePos:], data[:firstChunk]) if firstChunk < toWrite { copy(cb.buf[0:], data[firstChunk:toWrite]) } // Update positions and counters
| func (dp *DeliveryPipe) dataDeliveryLoop() { | ||
| defer dp.wg.Done() | ||
| dp.deliveryLoop( | ||
| func() *[]taggedPacket { return &dp.dataPending }, | ||
| func(pkt taggedPacket) { | ||
| if dp.dataTarget != nil { | ||
| // Track out-of-order packets | ||
| if dp.metrics != nil && dp.lastDataSeqNum != -1 { | ||
| if pkt.dataPk.Seq < dp.lastDataSeqNum { | ||
| dp.metrics.AddOOOPacket() | ||
| } | ||
| } | ||
| dp.lastDataSeqNum = pkt.dataPk.Seq | ||
| dp.dataTarget(pkt.dataPk) | ||
|
|
||
| dp.lock.Lock() | ||
| dp.currentBytes -= int64(pkt.dataSize) | ||
| dp.lock.Unlock() | ||
| } | ||
| }, | ||
| ) | ||
| } |
There was a problem hiding this comment.
Data race on lastDataSeqNum access.
The deliver callback accesses and modifies dp.lastDataSeqNum (lines 169, 173) outside of the mutex lock. Since the deliver function runs after dp.lock.Unlock() in deliveryLoop, this creates a data race if multiple goroutines or iterations access these fields concurrently.
The same issue exists in ackDeliveryLoop with lastAckSeqNum (lines 192, 196).
🔒 Proposed fix to protect lastDataSeqNum access
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
+ dp.lock.Lock()
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
+ dp.currentBytes -= int64(pkt.dataSize)
+ dp.lock.Unlock()
+
dp.dataTarget(pkt.dataPk)
-
- dp.lock.Lock()
- dp.currentBytes -= int64(pkt.dataSize)
- dp.lock.Unlock()
}
},
)
}Apply the same pattern to ackDeliveryLoop for lastAckSeqNum.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (dp *DeliveryPipe) dataDeliveryLoop() { | |
| defer dp.wg.Done() | |
| dp.deliveryLoop( | |
| func() *[]taggedPacket { return &dp.dataPending }, | |
| func(pkt taggedPacket) { | |
| if dp.dataTarget != nil { | |
| // Track out-of-order packets | |
| if dp.metrics != nil && dp.lastDataSeqNum != -1 { | |
| if pkt.dataPk.Seq < dp.lastDataSeqNum { | |
| dp.metrics.AddOOOPacket() | |
| } | |
| } | |
| dp.lastDataSeqNum = pkt.dataPk.Seq | |
| dp.dataTarget(pkt.dataPk) | |
| dp.lock.Lock() | |
| dp.currentBytes -= int64(pkt.dataSize) | |
| dp.lock.Unlock() | |
| } | |
| }, | |
| ) | |
| } | |
| func (dp *DeliveryPipe) dataDeliveryLoop() { | |
| defer dp.wg.Done() | |
| dp.deliveryLoop( | |
| func() *[]taggedPacket { return &dp.dataPending }, | |
| func(pkt taggedPacket) { | |
| if dp.dataTarget != nil { | |
| dp.lock.Lock() | |
| // Track out-of-order packets | |
| if dp.metrics != nil && dp.lastDataSeqNum != -1 { | |
| if pkt.dataPk.Seq < dp.lastDataSeqNum { | |
| dp.metrics.AddOOOPacket() | |
| } | |
| } | |
| dp.lastDataSeqNum = pkt.dataPk.Seq | |
| dp.currentBytes -= int64(pkt.dataSize) | |
| dp.lock.Unlock() | |
| dp.dataTarget(pkt.dataPk) | |
| } | |
| }, | |
| ) | |
| } |
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/deliverypipe.go` around lines 161 - 182, The callback
in DeliveryPipe.dataDeliveryLoop accesses and updates dp.lastDataSeqNum without
holding dp.lock, causing a data race; modify the deliver closure used in
deliveryLoop so that any read/write of dp.lastDataSeqNum (and the corresponding
metrics.AddOOOPacket() decision) is done while holding dp.lock — i.e., acquire
dp.lock before comparing pkt.dataPk.Seq to dp.lastDataSeqNum and before
assigning dp.lastDataSeqNum, then release the lock; apply the same pattern in
ackDeliveryLoop to protect dp.lastAckSeqNum (wrap the check, metrics call, and
update in dp.lock).
| // 15. Wait for completion | ||
| err = <-done |
There was a problem hiding this comment.
Duplicate step number in comment.
Line 134 uses // 15. but the previous step (line 125) already uses // 15.. This should be // 16. for consistency.
📝 Proposed fix
- // 15. Wait for completion
+ // 16. Wait for completion
err = <-done🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 134 - 135,
The inline step comment before the completion wait is mis-numbered: update the
comment "// 15. Wait for completion" immediately above the statement "err =
<-done" (in the main test stream manager flow) to read "// 16. Wait for
completion" so the step numbering is sequential with the earlier "// 15."
comment; adjust only the comment text.
| func (m *Metrics) Report() string { | ||
| m.lock.Lock() | ||
| defer m.lock.Unlock() | ||
|
|
||
| duration := m.endTime.Sub(m.startTime) | ||
| throughput := float64(m.totalBytes) / duration.Seconds() / 1024 / 1024 | ||
|
|
||
| return fmt.Sprintf(` | ||
| StreamManager Integration Test Results | ||
| ====================================== | ||
| Duration: %v | ||
| Total Bytes: %d | ||
| Throughput: %.2f MB/s | ||
| Data Packets: %d | ||
| Ack Packets: %d | ||
| OOO Packets: %d | ||
| Pipe High Water: %d bytes (%.2f KB) | ||
| `, duration, m.totalBytes, throughput, m.dataPackets, m.ackPackets, m.oooPackets, | ||
| m.pipeHighWaterMark, float64(m.pipeHighWaterMark)/1024) |
There was a problem hiding this comment.
Potential division by zero in throughput calculation.
If Start() and End() are called in quick succession (or End() is called before Start()), duration.Seconds() could be zero or negative, causing a division by zero or producing +Inf/NaN.
🛡️ Proposed fix to guard against zero duration
func (m *Metrics) Report() string {
m.lock.Lock()
defer m.lock.Unlock()
duration := m.endTime.Sub(m.startTime)
- throughput := float64(m.totalBytes) / duration.Seconds() / 1024 / 1024
+ var throughput float64
+ if duration.Seconds() > 0 {
+ throughput = float64(m.totalBytes) / duration.Seconds() / 1024 / 1024
+ }🤖 Prompt for AI Agents
In `@cmd/test-streammanager/metrics.go` around lines 87 - 105, The throughput
calculation in Metrics.Report can divide by zero or use a negative duration;
guard against this by computing duration := m.endTime.Sub(m.startTime) and if
duration <= 0 (or duration.Seconds() == 0) set throughput to 0.0 instead of
performing the division, then continue using throughput in the fmt.Sprintf;
update the function Metrics.Report to use this guarded throughput calculation
referencing m.startTime, m.endTime, m.totalBytes and the Report method.
| expected := make([]byte, len(p)) | ||
| v.expectedGen.Read(expected) | ||
|
|
||
| for i := 0; i < len(p); i++ { | ||
| if p[i] != expected[i] { | ||
| v.mismatches++ | ||
| if v.firstMismatch == -1 { | ||
| v.firstMismatch = v.totalReceived + int64(i) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Return value from expectedGen.Read() is ignored, causing potential false mismatches.
The read from expectedGen may return fewer bytes than len(p) (especially near EOF), but the comparison loop always iterates over all len(p) bytes. This would compare incoming data against zero/uninitialized bytes in expected, producing spurious mismatch reports.
🐛 Proposed fix to handle partial reads
func (v *Verifier) Write(p []byte) (n int, err error) {
v.lock.Lock()
defer v.lock.Unlock()
expected := make([]byte, len(p))
- v.expectedGen.Read(expected)
+ expectedN, _ := v.expectedGen.Read(expected)
- for i := 0; i < len(p); i++ {
+ for i := 0; i < expectedN; i++ {
if p[i] != expected[i] {
v.mismatches++
if v.firstMismatch == -1 {
v.firstMismatch = v.totalReceived + int64(i)
}
}
}
v.totalReceived += int64(len(p))
return len(p), nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| expected := make([]byte, len(p)) | |
| v.expectedGen.Read(expected) | |
| for i := 0; i < len(p); i++ { | |
| if p[i] != expected[i] { | |
| v.mismatches++ | |
| if v.firstMismatch == -1 { | |
| v.firstMismatch = v.totalReceived + int64(i) | |
| } | |
| } | |
| } | |
| expected := make([]byte, len(p)) | |
| expectedN, _ := v.expectedGen.Read(expected) | |
| for i := 0; i < expectedN; i++ { | |
| if p[i] != expected[i] { | |
| v.mismatches++ | |
| if v.firstMismatch == -1 { | |
| v.firstMismatch = v.totalReceived + int64(i) | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/verifier.go` around lines 29 - 39, The code currently
ignores the return values from v.expectedGen.Read when filling expected, causing
comparisons against unwritten bytes; change the read to capture (n, err) :=
v.expectedGen.Read(expected), handle err (treat io.EOF as a valid partial read
or propagate unexpected errors), and only compare the first n bytes of expected
to p (and adjust how v.totalReceived / v.firstMismatch are computed to use i
within the n range); ensure you update v.mismatches and v.firstMismatch only for
indices 0..n-1 and handle the case where n < len(p) appropriately.
…to handle out of order acks
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@cmd/test-streammanager/main-test-streammanager.go`:
- Around line 183-188: In runWriterMode, when
writerBroker.CreateStreamWriter(streamMeta) returns an error, don’t return nil;
instead create and return a channel that immediately sends that error so callers
(like runTest) can wait and handle it; update the error handling in
runWriterMode (where CreateStreamWriter is called) to construct a chan error,
send the err on it, close the channel, and return it so the failure is
propagated to runTest.
- Around line 243-253: The SlowReader.Read method unconditionally calls
log.Printf on every read, ignoring the global verbosity flag; update the logging
to respect config.Verbose (or the chosen verbose logger) by either gating the
log.Printf inside SlowReader.Read with an if config.Verbose check or by
configuring the logger used by SlowReader in runTest so it no-ops when verbose
is false; specifically, modify SlowReader.Read (the Read method on type
SlowReader) to only emit the "SlowReader: read ..." message when config.Verbose
is true or ensure runTest injects a verbose/no-op logger into SlowReader so the
verbose flag controls output.
- Around line 162-181: runStreamManagerMode currently returns nil on setup
errors which hides failures; change it to create a buffered error channel (e.g.,
errCh := make(chan error, 1)), and on any error from
streamManager.ClientConnected or streamManager.AttachReader send the error into
errCh (errCh <- err) and return errCh instead of nil; ensure the success path
returns a nil or a closed channel per the caller's expectation, but at minimum
propagate setup failures by returning the error channel from
runStreamManagerMode so callers can fail fast.
| func runStreamManagerMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error { | ||
| streamManager := jobmanager.MakeStreamManagerWithSizes(config.WindowSize, 2*1024*1024) | ||
| writerBroker.AttachStreamWriter(streamMeta, streamManager) | ||
|
|
||
| dataSender := &BrokerDataSender{broker: writerBroker} | ||
| startSeq, err := streamManager.ClientConnected(streamMeta.Id, dataSender, config.WindowSize, 0) | ||
| if err != nil { | ||
| fmt.Printf("failed to connect stream manager: %v\n", err) | ||
| return nil | ||
| } | ||
| fmt.Printf(" Stream connected, startSeq: %d\n", startSeq) | ||
|
|
||
| generator := NewTestDataGenerator(config.DataSize) | ||
| if err := streamManager.AttachReader(generator); err != nil { | ||
| fmt.Printf("failed to attach reader: %v\n", err) | ||
| return nil | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Propagate setup failures instead of returning nil.
On errors (Line 167/175), this returns nil, so runTest will continue and may hang or falsely pass. Return a channel containing the error so the caller can fail fast.
🛠️ Suggested fix
func runStreamManagerMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
streamManager := jobmanager.MakeStreamManagerWithSizes(config.WindowSize, 2*1024*1024)
writerBroker.AttachStreamWriter(streamMeta, streamManager)
dataSender := &BrokerDataSender{broker: writerBroker}
startSeq, err := streamManager.ClientConnected(streamMeta.Id, dataSender, config.WindowSize, 0)
if err != nil {
- fmt.Printf("failed to connect stream manager: %v\n", err)
- return nil
+ fmt.Printf("failed to connect stream manager: %v\n", err)
+ done := make(chan error, 1)
+ done <- err
+ return done
}
fmt.Printf(" Stream connected, startSeq: %d\n", startSeq)
generator := NewTestDataGenerator(config.DataSize)
if err := streamManager.AttachReader(generator); err != nil {
- fmt.Printf("failed to attach reader: %v\n", err)
- return nil
+ fmt.Printf("failed to attach reader: %v\n", err)
+ done := make(chan error, 1)
+ done <- err
+ return done
}
return nil
}🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 162 - 181,
runStreamManagerMode currently returns nil on setup errors which hides failures;
change it to create a buffered error channel (e.g., errCh := make(chan error,
1)), and on any error from streamManager.ClientConnected or
streamManager.AttachReader send the error into errCh (errCh <- err) and return
errCh instead of nil; ensure the success path returns a nil or a closed channel
per the caller's expectation, but at minimum propagate setup failures by
returning the error channel from runStreamManagerMode so callers can fail fast.
| func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error { | ||
| writer, err := writerBroker.CreateStreamWriter(streamMeta) | ||
| if err != nil { | ||
| fmt.Printf("failed to create stream writer: %v\n", err) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Propagate writer creation failures.
If CreateStreamWriter fails (Line 184–186), returning nil skips waiting and hides the error. Return a channel with the error so runTest can handle it.
🛠️ Suggested fix
func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
writer, err := writerBroker.CreateStreamWriter(streamMeta)
if err != nil {
fmt.Printf("failed to create stream writer: %v\n", err)
- return nil
+ done := make(chan error, 1)
+ done <- err
+ return done
}
fmt.Printf(" Stream writer created\n")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error { | |
| writer, err := writerBroker.CreateStreamWriter(streamMeta) | |
| if err != nil { | |
| fmt.Printf("failed to create stream writer: %v\n", err) | |
| return nil | |
| } | |
| func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error { | |
| writer, err := writerBroker.CreateStreamWriter(streamMeta) | |
| if err != nil { | |
| fmt.Printf("failed to create stream writer: %v\n", err) | |
| done := make(chan error, 1) | |
| done <- err | |
| return done | |
| } | |
| fmt.Printf(" Stream writer created\n") |
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 183 - 188, In
runWriterMode, when writerBroker.CreateStreamWriter(streamMeta) returns an
error, don’t return nil; instead create and return a channel that immediately
sends that error so callers (like runTest) can wait and handle it; update the
error handling in runWriterMode (where CreateStreamWriter is called) to
construct a chan error, send the err on it, close the channel, and return it so
the failure is propagated to runTest.
| func (sr *SlowReader) Read(p []byte) (n int, err error) { | ||
| time.Sleep(1 * time.Second) | ||
|
|
||
| readSize := sr.bytesPerSec | ||
| if readSize > len(p) { | ||
| readSize = len(p) | ||
| } | ||
|
|
||
| n, err = sr.reader.Read(p[:readSize]) | ||
| log.Printf("SlowReader: read %d bytes, err=%v", n, err) | ||
| return n, err |
There was a problem hiding this comment.
Verbose flag isn’t applied to SlowReader logging.
Line 252 logs on every read regardless of config.Verbose, so the flag has no effect on verbosity. Consider gating this log or configuring log output based on the flag.
🛠️ One option (configure in runTest)
func runTest(config TestConfig) error {
+ if !config.Verbose {
+ log.SetOutput(io.Discard)
+ }🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 243 - 253,
The SlowReader.Read method unconditionally calls log.Printf on every read,
ignoring the global verbosity flag; update the logging to respect config.Verbose
(or the chosen verbose logger) by either gating the log.Printf inside
SlowReader.Read with an if config.Verbose check or by configuring the logger
used by SlowReader in runTest so it no-ops when verbose is false; specifically,
modify SlowReader.Read (the Read method on type SlowReader) to only emit the
"SlowReader: read ..." message when config.Verbose is true or ensure runTest
injects a verbose/no-op logger into SlowReader so the verbose flag controls
output.
This pull request introduces a new integration test tool for the StreamManager streaming system, adding a standalone test binary with supporting modules for simulating and verifying high-throughput data transfer. The changes include a test driver, a configurable in-memory delivery pipe for simulating network conditions, a data generator, a verifier for end-to-end integrity, and a metrics tracker. Additionally, several improvements are made to the circular buffer and StreamManager for better handling of blocking writes and out-of-order acknowledgments.
New StreamManager Integration Test Tool
cmd/test-streammanagerwith a main driver (main-test-streammanager.go) that orchestrates end-to-end streaming tests, including configuration for data size, delivery delay/skew, window size, slow reader simulation, and verbose logging.DeliveryPipe(deliverypipe.go) for simulating network delivery with delay and skew, supporting separate data and ack channels, out-of-order delivery, and high water mark tracking.WriterBridgeandReaderBridgemodules for interfacing between brokers and the delivery pipe, enforcing correct directionality of data and acks.generator.go) and a verifier (verifier.go) for checking data integrity and reporting mismatches. [1] [2]metrics.go) for tracking throughput, packet counts, out-of-order events, and pipe usage, with a summary report at test completion.StreamManager and CirBuf Improvements
pkg/jobmanager/cirbuf.go) to replace blocking writes with a non-blockingWriteAvailablemethod, returning a wait channel for buffer-full scenarios, and removed context-based cancellation logic.pkg/jobmanager/streammanager.go) to track the maximum acknowledged sequence/rwnd tuple, ignoring stale or out-of-order ACKs, and resetting this state on disconnect.Minor Cleanup
cirbuf.go.streambroker.go.