Skip to content

stress test for the new RPC streaming primitives (+ bug fixes)#2828

Merged
sawka merged 8 commits intomainfrom
sawka/stream-test
Feb 5, 2026
Merged

stress test for the new RPC streaming primitives (+ bug fixes)#2828
sawka merged 8 commits intomainfrom
sawka/stream-test

Conversation

@sawka
Copy link
Member

@sawka sawka commented Feb 5, 2026

This pull request introduces a new integration test tool for the StreamManager streaming system, adding a standalone test binary with supporting modules for simulating and verifying high-throughput data transfer. The changes include a test driver, a configurable in-memory delivery pipe for simulating network conditions, a data generator, a verifier for end-to-end integrity, and a metrics tracker. Additionally, several improvements are made to the circular buffer and StreamManager for better handling of blocking writes and out-of-order acknowledgments.

New StreamManager Integration Test Tool

  • Added a new test binary cmd/test-streammanager with a main driver (main-test-streammanager.go) that orchestrates end-to-end streaming tests, including configuration for data size, delivery delay/skew, window size, slow reader simulation, and verbose logging.
  • Implemented a configurable DeliveryPipe (deliverypipe.go) for simulating network delivery with delay and skew, supporting separate data and ack channels, out-of-order delivery, and high water mark tracking.
  • Added WriterBridge and ReaderBridge modules for interfacing between brokers and the delivery pipe, enforcing correct directionality of data and acks.
  • Created a sequential test data generator (generator.go) and a verifier (verifier.go) for checking data integrity and reporting mismatches. [1] [2]
  • Introduced a metrics module (metrics.go) for tracking throughput, packet counts, out-of-order events, and pipe usage, with a summary report at test completion.

StreamManager and CirBuf Improvements

  • Refactored circular buffer (pkg/jobmanager/cirbuf.go) to replace blocking writes with a non-blocking WriteAvailable method, returning a wait channel for buffer-full scenarios, and removed context-based cancellation logic.
  • Updated StreamManager (pkg/jobmanager/streammanager.go) to track the maximum acknowledged sequence/rwnd tuple, ignoring stale or out-of-order ACKs, and resetting this state on disconnect.
  • Modified StreamManager's data handling to use the new non-blocking buffer write logic, ensuring correct signaling and waiting for space when needed.

Minor Cleanup

  • Removed unused context import from cirbuf.go.
  • Minor whitespace cleanup in streambroker.go.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 5, 2026

Walkthrough

Adds a test harness for StreamManager and refactors ACK/buffer APIs. New cmd/test-streammanager files provide: a DeliveryPipe simulator (configurable delay/skew, data/ack queues, delivery loops, metrics), WriterBridge/ReaderBridge for routing stream commands, a deterministic TestDataGenerator, a thread-safe Metrics collector, a Verifier for data integrity, and a Cobra-based integration CLI that wires these pieces together. pkg/jobmanager replaces blocking Write/WriteCtx with a non-blocking WriteAvailable API. pkg/jobmanager/streammanager and pkg/streamclient switch from single-value ACK tracking to tuple-based tracking (maxAckedSeq, maxAckedRwnd) to filter stale ACKs.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: introducing a stress test tool for RPC streaming primitives along with associated bug fixes to StreamManager and CirBuf.
Description check ✅ Passed The description comprehensively covers all aspects of the changeset: the new test tool components, delivery pipe, bridges, generator, verifier, metrics, and the improvements to StreamManager and circular buffer.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sawka/stream-test

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@cmd/test-streammanager/deliverypipe.go`:
- Around line 161-182: The callback in DeliveryPipe.dataDeliveryLoop accesses
and updates dp.lastDataSeqNum without holding dp.lock, causing a data race;
modify the deliver closure used in deliveryLoop so that any read/write of
dp.lastDataSeqNum (and the corresponding metrics.AddOOOPacket() decision) is
done while holding dp.lock — i.e., acquire dp.lock before comparing
pkt.dataPk.Seq to dp.lastDataSeqNum and before assigning dp.lastDataSeqNum, then
release the lock; apply the same pattern in ackDeliveryLoop to protect
dp.lastAckSeqNum (wrap the check, metrics call, and update in dp.lock).

In `@cmd/test-streammanager/main-test-streammanager.go`:
- Around line 134-135: The inline step comment before the completion wait is
mis-numbered: update the comment "// 15. Wait for completion" immediately above
the statement "err = <-done" (in the main test stream manager flow) to read "//
16. Wait for completion" so the step numbering is sequential with the earlier
"// 15." comment; adjust only the comment text.

In `@cmd/test-streammanager/metrics.go`:
- Around line 87-105: The throughput calculation in Metrics.Report can divide by
zero or use a negative duration; guard against this by computing duration :=
m.endTime.Sub(m.startTime) and if duration <= 0 (or duration.Seconds() == 0) set
throughput to 0.0 instead of performing the division, then continue using
throughput in the fmt.Sprintf; update the function Metrics.Report to use this
guarded throughput calculation referencing m.startTime, m.endTime, m.totalBytes
and the Report method.

In `@cmd/test-streammanager/verifier.go`:
- Around line 29-39: The code currently ignores the return values from
v.expectedGen.Read when filling expected, causing comparisons against unwritten
bytes; change the read to capture (n, err) := v.expectedGen.Read(expected),
handle err (treat io.EOF as a valid partial read or propagate unexpected
errors), and only compare the first n bytes of expected to p (and adjust how
v.totalReceived / v.firstMismatch are computed to use i within the n range);
ensure you update v.mismatches and v.firstMismatch only for indices 0..n-1 and
handle the case where n < len(p) appropriately.
🧹 Nitpick comments (3)
cmd/test-streammanager/deliverypipe.go (1)

98-99: Consider using a heap for pending packets.

Calling sortPending after every EnqueueData/EnqueueAck is O(n log n) per insertion. For a stress test tool that may enqueue many packets, using container/heap would reduce this to O(log n) per operation.

This is acceptable for a test tool but may become a bottleneck under heavy load.

cmd/test-streammanager/main-test-streammanager.go (1)

197-207: SlowReader rate limiting is coarse-grained.

The implementation sleeps a full second before every read, regardless of actual bytes read. If the underlying reader returns fewer bytes (e.g., near EOF or buffer smaller than bytesPerSec), the effective rate will be lower than intended.

For a test tool this is acceptable, but consider a more precise approach using token bucket or proportional sleep based on actual bytes read.

pkg/jobmanager/cirbuf.go (1)

77-98: Consider: byte-by-byte loop could be optimized with bulk copy.

The current implementation iterates one byte at a time. For a 4KB packet, this executes 4096 loop iterations. While functionally correct, bulk copying (handling wrap-around) would improve throughput.

Since this is supporting a test tool, the current approach is acceptable. If this path becomes latency-sensitive in production, consider refactoring to calculate contiguous segments and use copy() for bulk transfers.

♻️ Sketch of bulk copy optimization
// Calculate how many bytes fit before window limit or wrap-around
available := cb.windowSize - cb.count
if cb.syncMode && available <= 0 {
    // handle blocking case
}
toWrite := min(len(data), available)
// Handle wrap-around with at most two copy() calls
firstChunk := min(toWrite, len(cb.buf)-cb.writePos)
copy(cb.buf[cb.writePos:], data[:firstChunk])
if firstChunk < toWrite {
    copy(cb.buf[0:], data[firstChunk:toWrite])
}
// Update positions and counters

Comment on lines +161 to +182
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.dataTarget(pkt.dataPk)

dp.lock.Lock()
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
}
},
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Data race on lastDataSeqNum access.

The deliver callback accesses and modifies dp.lastDataSeqNum (lines 169, 173) outside of the mutex lock. Since the deliver function runs after dp.lock.Unlock() in deliveryLoop, this creates a data race if multiple goroutines or iterations access these fields concurrently.

The same issue exists in ackDeliveryLoop with lastAckSeqNum (lines 192, 196).

🔒 Proposed fix to protect lastDataSeqNum access
 func (dp *DeliveryPipe) dataDeliveryLoop() {
 	defer dp.wg.Done()
 	dp.deliveryLoop(
 		func() *[]taggedPacket { return &dp.dataPending },
 		func(pkt taggedPacket) {
 			if dp.dataTarget != nil {
+				dp.lock.Lock()
 				// Track out-of-order packets
 				if dp.metrics != nil && dp.lastDataSeqNum != -1 {
 					if pkt.dataPk.Seq < dp.lastDataSeqNum {
 						dp.metrics.AddOOOPacket()
 					}
 				}
 				dp.lastDataSeqNum = pkt.dataPk.Seq
+				dp.currentBytes -= int64(pkt.dataSize)
+				dp.lock.Unlock()
+
 				dp.dataTarget(pkt.dataPk)
-
-				dp.lock.Lock()
-				dp.currentBytes -= int64(pkt.dataSize)
-				dp.lock.Unlock()
 			}
 		},
 	)
 }

Apply the same pattern to ackDeliveryLoop for lastAckSeqNum.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.dataTarget(pkt.dataPk)
dp.lock.Lock()
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
}
},
)
}
func (dp *DeliveryPipe) dataDeliveryLoop() {
defer dp.wg.Done()
dp.deliveryLoop(
func() *[]taggedPacket { return &dp.dataPending },
func(pkt taggedPacket) {
if dp.dataTarget != nil {
dp.lock.Lock()
// Track out-of-order packets
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
if pkt.dataPk.Seq < dp.lastDataSeqNum {
dp.metrics.AddOOOPacket()
}
}
dp.lastDataSeqNum = pkt.dataPk.Seq
dp.currentBytes -= int64(pkt.dataSize)
dp.lock.Unlock()
dp.dataTarget(pkt.dataPk)
}
},
)
}
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/deliverypipe.go` around lines 161 - 182, The callback
in DeliveryPipe.dataDeliveryLoop accesses and updates dp.lastDataSeqNum without
holding dp.lock, causing a data race; modify the deliver closure used in
deliveryLoop so that any read/write of dp.lastDataSeqNum (and the corresponding
metrics.AddOOOPacket() decision) is done while holding dp.lock — i.e., acquire
dp.lock before comparing pkt.dataPk.Seq to dp.lastDataSeqNum and before
assigning dp.lastDataSeqNum, then release the lock; apply the same pattern in
ackDeliveryLoop to protect dp.lastAckSeqNum (wrap the check, metrics call, and
update in dp.lock).

Comment on lines 134 to 135
// 15. Wait for completion
err = <-done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Duplicate step number in comment.

Line 134 uses // 15. but the previous step (line 125) already uses // 15.. This should be // 16. for consistency.

📝 Proposed fix
-	// 15. Wait for completion
+	// 16. Wait for completion
 	err = <-done
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 134 - 135,
The inline step comment before the completion wait is mis-numbered: update the
comment "// 15. Wait for completion" immediately above the statement "err =
<-done" (in the main test stream manager flow) to read "// 16. Wait for
completion" so the step numbering is sequential with the earlier "// 15."
comment; adjust only the comment text.

Comment on lines 87 to 105
func (m *Metrics) Report() string {
m.lock.Lock()
defer m.lock.Unlock()

duration := m.endTime.Sub(m.startTime)
throughput := float64(m.totalBytes) / duration.Seconds() / 1024 / 1024

return fmt.Sprintf(`
StreamManager Integration Test Results
======================================
Duration: %v
Total Bytes: %d
Throughput: %.2f MB/s
Data Packets: %d
Ack Packets: %d
OOO Packets: %d
Pipe High Water: %d bytes (%.2f KB)
`, duration, m.totalBytes, throughput, m.dataPackets, m.ackPackets, m.oooPackets,
m.pipeHighWaterMark, float64(m.pipeHighWaterMark)/1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential division by zero in throughput calculation.

If Start() and End() are called in quick succession (or End() is called before Start()), duration.Seconds() could be zero or negative, causing a division by zero or producing +Inf/NaN.

🛡️ Proposed fix to guard against zero duration
 func (m *Metrics) Report() string {
 	m.lock.Lock()
 	defer m.lock.Unlock()

 	duration := m.endTime.Sub(m.startTime)
-	throughput := float64(m.totalBytes) / duration.Seconds() / 1024 / 1024
+	var throughput float64
+	if duration.Seconds() > 0 {
+		throughput = float64(m.totalBytes) / duration.Seconds() / 1024 / 1024
+	}
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/metrics.go` around lines 87 - 105, The throughput
calculation in Metrics.Report can divide by zero or use a negative duration;
guard against this by computing duration := m.endTime.Sub(m.startTime) and if
duration <= 0 (or duration.Seconds() == 0) set throughput to 0.0 instead of
performing the division, then continue using throughput in the fmt.Sprintf;
update the function Metrics.Report to use this guarded throughput calculation
referencing m.startTime, m.endTime, m.totalBytes and the Report method.

Comment on lines 29 to 39
expected := make([]byte, len(p))
v.expectedGen.Read(expected)

for i := 0; i < len(p); i++ {
if p[i] != expected[i] {
v.mismatches++
if v.firstMismatch == -1 {
v.firstMismatch = v.totalReceived + int64(i)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return value from expectedGen.Read() is ignored, causing potential false mismatches.

The read from expectedGen may return fewer bytes than len(p) (especially near EOF), but the comparison loop always iterates over all len(p) bytes. This would compare incoming data against zero/uninitialized bytes in expected, producing spurious mismatch reports.

🐛 Proposed fix to handle partial reads
 func (v *Verifier) Write(p []byte) (n int, err error) {
 	v.lock.Lock()
 	defer v.lock.Unlock()

 	expected := make([]byte, len(p))
-	v.expectedGen.Read(expected)
+	expectedN, _ := v.expectedGen.Read(expected)

-	for i := 0; i < len(p); i++ {
+	for i := 0; i < expectedN; i++ {
 		if p[i] != expected[i] {
 			v.mismatches++
 			if v.firstMismatch == -1 {
 				v.firstMismatch = v.totalReceived + int64(i)
 			}
 		}
 	}

 	v.totalReceived += int64(len(p))
 	return len(p), nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expected := make([]byte, len(p))
v.expectedGen.Read(expected)
for i := 0; i < len(p); i++ {
if p[i] != expected[i] {
v.mismatches++
if v.firstMismatch == -1 {
v.firstMismatch = v.totalReceived + int64(i)
}
}
}
expected := make([]byte, len(p))
expectedN, _ := v.expectedGen.Read(expected)
for i := 0; i < expectedN; i++ {
if p[i] != expected[i] {
v.mismatches++
if v.firstMismatch == -1 {
v.firstMismatch = v.totalReceived + int64(i)
}
}
}
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/verifier.go` around lines 29 - 39, The code currently
ignores the return values from v.expectedGen.Read when filling expected, causing
comparisons against unwritten bytes; change the read to capture (n, err) :=
v.expectedGen.Read(expected), handle err (treat io.EOF as a valid partial read
or propagate unexpected errors), and only compare the first n bytes of expected
to p (and adjust how v.totalReceived / v.firstMismatch are computed to use i
within the n range); ensure you update v.mismatches and v.firstMismatch only for
indices 0..n-1 and handle the case where n < len(p) appropriately.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@cmd/test-streammanager/main-test-streammanager.go`:
- Around line 183-188: In runWriterMode, when
writerBroker.CreateStreamWriter(streamMeta) returns an error, don’t return nil;
instead create and return a channel that immediately sends that error so callers
(like runTest) can wait and handle it; update the error handling in
runWriterMode (where CreateStreamWriter is called) to construct a chan error,
send the err on it, close the channel, and return it so the failure is
propagated to runTest.
- Around line 243-253: The SlowReader.Read method unconditionally calls
log.Printf on every read, ignoring the global verbosity flag; update the logging
to respect config.Verbose (or the chosen verbose logger) by either gating the
log.Printf inside SlowReader.Read with an if config.Verbose check or by
configuring the logger used by SlowReader in runTest so it no-ops when verbose
is false; specifically, modify SlowReader.Read (the Read method on type
SlowReader) to only emit the "SlowReader: read ..." message when config.Verbose
is true or ensure runTest injects a verbose/no-op logger into SlowReader so the
verbose flag controls output.
- Around line 162-181: runStreamManagerMode currently returns nil on setup
errors which hides failures; change it to create a buffered error channel (e.g.,
errCh := make(chan error, 1)), and on any error from
streamManager.ClientConnected or streamManager.AttachReader send the error into
errCh (errCh <- err) and return errCh instead of nil; ensure the success path
returns a nil or a closed channel per the caller's expectation, but at minimum
propagate setup failures by returning the error channel from
runStreamManagerMode so callers can fail fast.

Comment on lines +162 to +181
func runStreamManagerMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
streamManager := jobmanager.MakeStreamManagerWithSizes(config.WindowSize, 2*1024*1024)
writerBroker.AttachStreamWriter(streamMeta, streamManager)

dataSender := &BrokerDataSender{broker: writerBroker}
startSeq, err := streamManager.ClientConnected(streamMeta.Id, dataSender, config.WindowSize, 0)
if err != nil {
fmt.Printf("failed to connect stream manager: %v\n", err)
return nil
}
fmt.Printf(" Stream connected, startSeq: %d\n", startSeq)

generator := NewTestDataGenerator(config.DataSize)
if err := streamManager.AttachReader(generator); err != nil {
fmt.Printf("failed to attach reader: %v\n", err)
return nil
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate setup failures instead of returning nil.
On errors (Line 167/175), this returns nil, so runTest will continue and may hang or falsely pass. Return a channel containing the error so the caller can fail fast.

🛠️ Suggested fix
 func runStreamManagerMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
 	streamManager := jobmanager.MakeStreamManagerWithSizes(config.WindowSize, 2*1024*1024)
 	writerBroker.AttachStreamWriter(streamMeta, streamManager)

 	dataSender := &BrokerDataSender{broker: writerBroker}
 	startSeq, err := streamManager.ClientConnected(streamMeta.Id, dataSender, config.WindowSize, 0)
 	if err != nil {
-		fmt.Printf("failed to connect stream manager: %v\n", err)
-		return nil
+		fmt.Printf("failed to connect stream manager: %v\n", err)
+		done := make(chan error, 1)
+		done <- err
+		return done
 	}
 	fmt.Printf("  Stream connected, startSeq: %d\n", startSeq)

 	generator := NewTestDataGenerator(config.DataSize)
 	if err := streamManager.AttachReader(generator); err != nil {
-		fmt.Printf("failed to attach reader: %v\n", err)
-		return nil
+		fmt.Printf("failed to attach reader: %v\n", err)
+		done := make(chan error, 1)
+		done <- err
+		return done
 	}

 	return nil
 }
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 162 - 181,
runStreamManagerMode currently returns nil on setup errors which hides failures;
change it to create a buffered error channel (e.g., errCh := make(chan error,
1)), and on any error from streamManager.ClientConnected or
streamManager.AttachReader send the error into errCh (errCh <- err) and return
errCh instead of nil; ensure the success path returns a nil or a closed channel
per the caller's expectation, but at minimum propagate setup failures by
returning the error channel from runStreamManagerMode so callers can fail fast.

Comment on lines +183 to +188
func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
writer, err := writerBroker.CreateStreamWriter(streamMeta)
if err != nil {
fmt.Printf("failed to create stream writer: %v\n", err)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate writer creation failures.
If CreateStreamWriter fails (Line 184–186), returning nil skips waiting and hides the error. Return a channel with the error so runTest can handle it.

🛠️ Suggested fix
 func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
 	writer, err := writerBroker.CreateStreamWriter(streamMeta)
 	if err != nil {
 		fmt.Printf("failed to create stream writer: %v\n", err)
-		return nil
+		done := make(chan error, 1)
+		done <- err
+		return done
 	}
 	fmt.Printf("  Stream writer created\n")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
writer, err := writerBroker.CreateStreamWriter(streamMeta)
if err != nil {
fmt.Printf("failed to create stream writer: %v\n", err)
return nil
}
func runWriterMode(config TestConfig, writerBroker *streamclient.Broker, streamMeta *wshrpc.StreamMeta) chan error {
writer, err := writerBroker.CreateStreamWriter(streamMeta)
if err != nil {
fmt.Printf("failed to create stream writer: %v\n", err)
done := make(chan error, 1)
done <- err
return done
}
fmt.Printf(" Stream writer created\n")
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 183 - 188, In
runWriterMode, when writerBroker.CreateStreamWriter(streamMeta) returns an
error, don’t return nil; instead create and return a channel that immediately
sends that error so callers (like runTest) can wait and handle it; update the
error handling in runWriterMode (where CreateStreamWriter is called) to
construct a chan error, send the err on it, close the channel, and return it so
the failure is propagated to runTest.

Comment on lines +243 to +253
func (sr *SlowReader) Read(p []byte) (n int, err error) {
time.Sleep(1 * time.Second)

readSize := sr.bytesPerSec
if readSize > len(p) {
readSize = len(p)
}

n, err = sr.reader.Read(p[:readSize])
log.Printf("SlowReader: read %d bytes, err=%v", n, err)
return n, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Verbose flag isn’t applied to SlowReader logging.
Line 252 logs on every read regardless of config.Verbose, so the flag has no effect on verbosity. Consider gating this log or configuring log output based on the flag.

🛠️ One option (configure in runTest)
 func runTest(config TestConfig) error {
+	if !config.Verbose {
+		log.SetOutput(io.Discard)
+	}
🤖 Prompt for AI Agents
In `@cmd/test-streammanager/main-test-streammanager.go` around lines 243 - 253,
The SlowReader.Read method unconditionally calls log.Printf on every read,
ignoring the global verbosity flag; update the logging to respect config.Verbose
(or the chosen verbose logger) by either gating the log.Printf inside
SlowReader.Read with an if config.Verbose check or by configuring the logger
used by SlowReader in runTest so it no-ops when verbose is false; specifically,
modify SlowReader.Read (the Read method on type SlowReader) to only emit the
"SlowReader: read ..." message when config.Verbose is true or ensure runTest
injects a verbose/no-op logger into SlowReader so the verbose flag controls
output.

@sawka sawka changed the title stream test to stress test the new RPC streaming primitives (+ bug fixes) stress test for the new RPC streaming primitives (+ bug fixes) Feb 5, 2026
@sawka sawka merged commit f36187f into main Feb 5, 2026
7 checks passed
@sawka sawka deleted the sawka/stream-test branch February 5, 2026 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant