From b6a8be87b6f66bf5380bc41aa60dba095a8473a1 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 09:53:36 +0300 Subject: [PATCH 1/7] Add a new method PollMessagesInto that reads the response body into a caller-provided buffer --- foreign/go/client/tcp/tcp_core.go | 97 +++++++++++++++----------- foreign/go/client/tcp/tcp_messaging.go | 38 +++++++++- foreign/go/contracts/client.go | 17 +++++ 3 files changed, 108 insertions(+), 44 deletions(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 47f8f4c592..c05d72093d 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -331,23 +331,71 @@ func encodeWireRequest(buf []byte, cmd command.Command) ([]byte, error) { // sendWireAndFetchResponse sends a pre-built wire payload (length header, // command code, body) and returns the response body. func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayload []byte) ([]byte, error) { + return c.sendWireAndFetchResponseInto(ctx, wirePayload, nil) +} + +func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { + return c.sendLockedInto(wirePayload, nil) +} + +// sendLockedInto is like sendLocked but reads the response body into buf, +// growing it when its capacity is insufficient. The returned slice is buf +// resliced to the response length; Payload/UserHeaders in the parsed message +// will alias this slice, so the caller must not recycle it until done. +func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) { + if _, err := c.write(wirePayload); err != nil { + c.invalidateConnLocked() + return buf, err + } + + if _, err := c.readInto(c.respHeader[:]); err != nil { + c.invalidateConnLocked() + return buf, err + } + + if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 { + return buf, ierror.FromCode(status) + } + + length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) + if length <= 1 { + return buf[:0], nil + } + + if cap(buf) < length { + buf = make([]byte, length) + } else { + buf = buf[:length] + } + + if _, err := c.readInto(buf); err != nil { + c.invalidateConnLocked() + return buf, err + } + + return buf, nil +} + +// sendWireAndFetchResponseInto is like sendWireAndFetchResponse but reads the +// response body into buf, growing it when capacity is insufficient. The +// returned slice aliases buf; see sendLockedInto for aliasing semantics. +func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePayload, buf []byte) ([]byte, error) { if ctx == nil { - return nil, ierror.ErrNilContext + return buf, ierror.ErrNilContext } if err := ctx.Err(); err != nil { - return nil, err + return buf, err } c.mtx.Lock() defer c.mtx.Unlock() if err := ctx.Err(); err != nil { - return nil, err + return buf, err } - // fast path for non-cancellable ctx. if ctx.Done() == nil { - return c.sendLocked(wirePayload) + return c.sendLockedInto(wirePayload, buf) } conn := c.conn @@ -358,17 +406,13 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa deadlineMu.Lock() defer deadlineMu.Unlock() if !cleared { - // Set a deadline in the past to unblock any ongoing read/write operations on the connection. - // This must use the snapshotted conn, not c.conn, to avoid setting a deadline on a - // new connection if Connect() reestablishes the connection after the context is cancelled. _ = conn.SetDeadline(time.Now()) } }) defer stop() - result, err := c.sendLocked(wirePayload) + result, err := c.sendLockedInto(wirePayload, buf) - // clear the deadline of connection. deadlineMu.Lock() cleared = true _ = conn.SetDeadline(time.Time{}) @@ -376,42 +420,13 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa if err != nil { if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return result, ctxErr } - return nil, err + return result, err } return result, nil } -func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { - if _, err := c.write(wirePayload); err != nil { - c.invalidateConnLocked() - return nil, err - } - - if _, err := c.readInto(c.respHeader[:]); err != nil { - c.invalidateConnLocked() - return nil, err - } - - if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 { - return nil, ierror.FromCode(status) - } - - length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) - if length <= 1 { - return []byte{}, nil - } - - _, buffer, err := c.read(length) - if err != nil { - c.invalidateConnLocked() - return nil, err - } - - return buffer, nil -} - // invalidateConnLocked closes the connection and marks it as disconnected func (c *IggyTcpClient) invalidateConnLocked() { if c.conn != nil { diff --git a/foreign/go/client/tcp/tcp_messaging.go b/foreign/go/client/tcp/tcp_messaging.go index 710cb5b0b4..dc24a96a3c 100644 --- a/foreign/go/client/tcp/tcp_messaging.go +++ b/foreign/go/client/tcp/tcp_messaging.go @@ -60,7 +60,30 @@ func (c *IggyTcpClient) PollMessages( autoCommit bool, partitionId *uint32, ) (*iggcon.PolledMessage, error) { - buffer, err := c.do(ctx, &command.PollMessages{ + polled, _, err := c.PollMessagesInto(ctx, streamId, topicId, consumer, strategy, count, autoCommit, partitionId, nil) + return polled, err +} + +// PollMessagesInto polls messages like PollMessages but reads the response +// body into the caller-supplied buf, growing it as needed. The returned buf +// (possibly reallocated) should be passed back on the next call to avoid +// per-RPC allocations once the buffer is large enough. +// +// The Payload and UserHeaders fields of every returned message alias buf; +// copy out any bytes you need before the next PollMessagesInto call. +func (c *IggyTcpClient) PollMessagesInto( + ctx context.Context, + streamId iggcon.Identifier, + topicId iggcon.Identifier, + consumer iggcon.Consumer, + strategy iggcon.PollingStrategy, + count uint32, + autoCommit bool, + partitionId *uint32, + buf []byte, +) (*iggcon.PolledMessage, []byte, error) { + bp := acquireRequestBuf() + wire, err := encodeWireRequest(*bp, &command.PollMessages{ StreamId: streamId, TopicId: topicId, Consumer: consumer, @@ -70,8 +93,17 @@ func (c *IggyTcpClient) PollMessages( PartitionId: partitionId, }) if err != nil { - return nil, err + releaseRequestBuf(bp) + return nil, buf, err + } + *bp = wire + + buf, err = c.sendWireAndFetchResponseInto(ctx, wire, buf) + releaseRequestBuf(bp) + if err != nil { + return nil, buf, err } - return binaryserialization.DeserializeFetchMessagesResponse(buffer, c.MessageCompression) + polled, err := binaryserialization.DeserializeFetchMessagesResponse(buf, c.MessageCompression) + return polled, buf, err } diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go index 144cc17e3e..1bbb6cb3ba 100644 --- a/foreign/go/contracts/client.go +++ b/foreign/go/contracts/client.go @@ -114,6 +114,23 @@ type Client interface { partitionId *uint32, ) (*PolledMessage, error) + // PollMessagesInto is like PollMessages but reads the response body into + // the caller-supplied buf, growing it as needed. The returned buf (possibly + // reallocated) should be passed back on the next call to amortize + // allocations. Payload and UserHeaders in the returned messages alias buf; + // copy out bytes you need before calling PollMessagesInto again. + PollMessagesInto( + ctx context.Context, + streamId Identifier, + topicId Identifier, + consumer Consumer, + strategy PollingStrategy, + count uint32, + autoCommit bool, + partitionId *uint32, + buf []byte, + ) (*PolledMessage, []byte, error) + // StoreConsumerOffset store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. // Authentication is required, and the permission to poll the messages. StoreConsumerOffset( From c5532ea6cf8004b877a28a82093cd157191aa0c4 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 09:57:23 +0300 Subject: [PATCH 2/7] improve readability --- foreign/go/client/tcp/tcp_core.go | 86 ++++++++++++-------------- foreign/go/client/tcp/tcp_messaging.go | 9 +-- foreign/go/contracts/client.go | 7 +-- 3 files changed, 45 insertions(+), 57 deletions(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index c05d72093d..dd2be61656 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -334,51 +334,6 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa return c.sendWireAndFetchResponseInto(ctx, wirePayload, nil) } -func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { - return c.sendLockedInto(wirePayload, nil) -} - -// sendLockedInto is like sendLocked but reads the response body into buf, -// growing it when its capacity is insufficient. The returned slice is buf -// resliced to the response length; Payload/UserHeaders in the parsed message -// will alias this slice, so the caller must not recycle it until done. -func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) { - if _, err := c.write(wirePayload); err != nil { - c.invalidateConnLocked() - return buf, err - } - - if _, err := c.readInto(c.respHeader[:]); err != nil { - c.invalidateConnLocked() - return buf, err - } - - if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 { - return buf, ierror.FromCode(status) - } - - length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) - if length <= 1 { - return buf[:0], nil - } - - if cap(buf) < length { - buf = make([]byte, length) - } else { - buf = buf[:length] - } - - if _, err := c.readInto(buf); err != nil { - c.invalidateConnLocked() - return buf, err - } - - return buf, nil -} - -// sendWireAndFetchResponseInto is like sendWireAndFetchResponse but reads the -// response body into buf, growing it when capacity is insufficient. The -// returned slice aliases buf; see sendLockedInto for aliasing semantics. func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePayload, buf []byte) ([]byte, error) { if ctx == nil { return buf, ierror.ErrNilContext @@ -406,6 +361,9 @@ func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePa deadlineMu.Lock() defer deadlineMu.Unlock() if !cleared { + // Set a deadline in the past to unblock any ongoing read/write operations on the connection. + // This must use the snapshotted conn, not c.conn, to avoid setting a deadline on a + // new connection if Connect() reestablishes the connection after the context is cancelled. _ = conn.SetDeadline(time.Now()) } }) @@ -427,6 +385,44 @@ func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePa return result, nil } +func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { + return c.sendLockedInto(wirePayload, nil) +} + +func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) { + if _, err := c.write(wirePayload); err != nil { + c.invalidateConnLocked() + return buf, err + } + + if _, err := c.readInto(c.respHeader[:]); err != nil { + c.invalidateConnLocked() + return buf, err + } + + if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 { + return buf, ierror.FromCode(status) + } + + length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) + if length <= 1 { + return buf[:0], nil + } + + if cap(buf) < length { + buf = make([]byte, length) + } else { + buf = buf[:length] + } + + if _, err := c.readInto(buf); err != nil { + c.invalidateConnLocked() + return buf, err + } + + return buf, nil +} + // invalidateConnLocked closes the connection and marks it as disconnected func (c *IggyTcpClient) invalidateConnLocked() { if c.conn != nil { diff --git a/foreign/go/client/tcp/tcp_messaging.go b/foreign/go/client/tcp/tcp_messaging.go index dc24a96a3c..876292c9e0 100644 --- a/foreign/go/client/tcp/tcp_messaging.go +++ b/foreign/go/client/tcp/tcp_messaging.go @@ -64,13 +64,8 @@ func (c *IggyTcpClient) PollMessages( return polled, err } -// PollMessagesInto polls messages like PollMessages but reads the response -// body into the caller-supplied buf, growing it as needed. The returned buf -// (possibly reallocated) should be passed back on the next call to avoid -// per-RPC allocations once the buffer is large enough. -// -// The Payload and UserHeaders fields of every returned message alias buf; -// copy out any bytes you need before the next PollMessagesInto call. +// Payload and UserHeaders in the returned messages alias buf; copy out bytes +// you need before the next call. func (c *IggyTcpClient) PollMessagesInto( ctx context.Context, streamId iggcon.Identifier, diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go index 1bbb6cb3ba..198a88c1fb 100644 --- a/foreign/go/contracts/client.go +++ b/foreign/go/contracts/client.go @@ -114,11 +114,8 @@ type Client interface { partitionId *uint32, ) (*PolledMessage, error) - // PollMessagesInto is like PollMessages but reads the response body into - // the caller-supplied buf, growing it as needed. The returned buf (possibly - // reallocated) should be passed back on the next call to amortize - // allocations. Payload and UserHeaders in the returned messages alias buf; - // copy out bytes you need before calling PollMessagesInto again. + // Payload and UserHeaders in the returned messages alias buf; copy out bytes + // you need before calling PollMessagesInto again. PollMessagesInto( ctx context.Context, streamId Identifier, From e743d85b2e3c981388fd3a54959324bc30741738 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 10:03:51 +0300 Subject: [PATCH 3/7] Add doInto func --- foreign/go/client/tcp/tcp_core.go | 14 +++++++++----- foreign/go/client/tcp/tcp_messaging.go | 13 ++----------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index dd2be61656..537500d9e3 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -290,17 +290,21 @@ func (c *IggyTcpClient) write(payload []byte) (int, error) { // do sends the command and returns the response body. Commands implementing // the appender interface encode directly into a pooled buffer. func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte, error) { + return c.doInto(ctx, cmd, nil) +} + +func (c *IggyTcpClient) doInto(ctx context.Context, cmd command.Command, buf []byte) ([]byte, error) { bp := acquireRequestBuf() - buf, err := encodeWireRequest(*bp, cmd) + wire, err := encodeWireRequest(*bp, cmd) if err != nil { releaseRequestBuf(bp) - return nil, err + return buf, err } - *bp = buf + *bp = wire - resp, err := c.sendWireAndFetchResponse(ctx, buf) + buf, err = c.sendWireAndFetchResponseInto(ctx, wire, buf) releaseRequestBuf(bp) - return resp, err + return buf, err } // encodeWireRequest writes the wire-format request (4-byte length, 4-byte diff --git a/foreign/go/client/tcp/tcp_messaging.go b/foreign/go/client/tcp/tcp_messaging.go index 876292c9e0..90a3885953 100644 --- a/foreign/go/client/tcp/tcp_messaging.go +++ b/foreign/go/client/tcp/tcp_messaging.go @@ -77,8 +77,7 @@ func (c *IggyTcpClient) PollMessagesInto( partitionId *uint32, buf []byte, ) (*iggcon.PolledMessage, []byte, error) { - bp := acquireRequestBuf() - wire, err := encodeWireRequest(*bp, &command.PollMessages{ + buf, err := c.doInto(ctx, &command.PollMessages{ StreamId: streamId, TopicId: topicId, Consumer: consumer, @@ -86,15 +85,7 @@ func (c *IggyTcpClient) PollMessagesInto( Strategy: strategy, Count: count, PartitionId: partitionId, - }) - if err != nil { - releaseRequestBuf(bp) - return nil, buf, err - } - *bp = wire - - buf, err = c.sendWireAndFetchResponseInto(ctx, wire, buf) - releaseRequestBuf(bp) + }, buf) if err != nil { return nil, buf, err } From f6d571756fda2a9e36e6c7e3599a6fd48fd2662d Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 10:07:05 +0300 Subject: [PATCH 4/7] simplify buffers slices --- foreign/go/client/tcp/tcp_core.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 537500d9e3..42d23120a8 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -410,14 +410,13 @@ func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) if length <= 1 { - return buf[:0], nil + return buf, nil } if cap(buf) < length { buf = make([]byte, length) - } else { - buf = buf[:length] } + buf = buf[:length] if _, err := c.readInto(buf); err != nil { c.invalidateConnLocked() From bea9495037f2fbc05f6b397eb9ec9151dd89c104 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 12:45:07 +0300 Subject: [PATCH 5/7] added test --- foreign/go/client/tcp/tcp_messaging_test.go | 98 +++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 foreign/go/client/tcp/tcp_messaging_test.go diff --git a/foreign/go/client/tcp/tcp_messaging_test.go b/foreign/go/client/tcp/tcp_messaging_test.go new file mode 100644 index 0000000000..64c616be63 --- /dev/null +++ b/foreign/go/client/tcp/tcp_messaging_test.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + "context" + "encoding/binary" + "testing" + "unsafe" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +// buildPolledMessageResponse returns a valid PollMessages response body +// containing a single message with the given payload. +func buildPolledMessageResponse(payload []byte) []byte { + hdr := iggcon.MessageHeader{PayloadLength: uint32(len(payload))} + hdrBytes := hdr.ToBytes() + + // [partitionId:4][currentOffset:8][messageCount:4][header:64][payload:N] + buf := make([]byte, 4+8+4+len(hdrBytes)+len(payload)) + binary.LittleEndian.PutUint32(buf[0:4], 1) + binary.LittleEndian.PutUint64(buf[4:12], 0) + binary.LittleEndian.PutUint32(buf[12:16], 1) + copy(buf[16:], hdrBytes) + copy(buf[16+len(hdrBytes):], payload) + return buf +} + +func basePtr(b []byte) uintptr { + if len(b) == 0 { + return 0 + } + return uintptr(unsafe.Pointer(&b[0])) +} + +func TestPollMessagesInto_given_repeated_polls_when_payload_size_is_constant_should_reuse_buffer(t *testing.T) { + c, serverConn := newTestClient(t) + + payload := []byte("hello iggy") + responseBody := buildPolledMessageResponse(payload) + + streamId, _ := iggcon.NewIdentifier(uint32(1)) + topicId, _ := iggcon.NewIdentifier(uint32(1)) + consumerId, _ := iggcon.NewIdentifier(uint32(1)) + + var buf []byte + var firstPtr uintptr + + for i := range 3 { + go serverRespond(t, serverConn, 0, responseBody) + + var polled *iggcon.PolledMessage + var err error + polled, buf, err = c.PollMessagesInto( + context.Background(), + streamId, + topicId, + iggcon.NewSingleConsumer(consumerId), + iggcon.NextPollingStrategy(), + 1, + false, + nil, + buf, + ) + if err != nil { + t.Fatalf("poll %d: unexpected error: %v", i, err) + } + if len(polled.Messages) != 1 { + t.Fatalf("poll %d: expected 1 message, got %d", i, len(polled.Messages)) + } + if string(polled.Messages[0].Payload) != string(payload) { + t.Errorf("poll %d: got payload %q, want %q", i, polled.Messages[0].Payload, payload) + } + + ptr := basePtr(buf) + if i == 0 { + firstPtr = ptr + } else if ptr != firstPtr { + t.Errorf("poll %d: buffer reallocated (addr changed from %x to %x)", i, firstPtr, ptr) + } + } +} From b63e96937c36153db087b6ebd885be564a0cbd37 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 16 Jun 2026 12:47:06 +0300 Subject: [PATCH 6/7] remove unused functions --- foreign/go/client/tcp/tcp_core.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 42d23120a8..710689af44 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -251,15 +251,6 @@ func releaseRequestBuf(bp *[]byte) { requestBufPool.Put(bp) } -func (c *IggyTcpClient) read(expectedSize int) (int, []byte, error) { - buffer := make([]byte, expectedSize) - n, err := c.readInto(buffer) - if err != nil { - return n, buffer[:n], err - } - return n, buffer, nil -} - // readInto reads exactly len(buf) bytes from the connection into buf. func (c *IggyTcpClient) readInto(buf []byte) (int, error) { var totalRead int @@ -389,10 +380,6 @@ func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePa return result, nil } -func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { - return c.sendLockedInto(wirePayload, nil) -} - func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) { if _, err := c.write(wirePayload); err != nil { c.invalidateConnLocked() From 9c666ce2d31a27064020a88fff821081fc517824 Mon Sep 17 00:00:00 2001 From: matanper Date: Fri, 19 Jun 2026 01:32:28 +0300 Subject: [PATCH 7/7] fix buffer reuse bug on empty body --- foreign/go/client/tcp/tcp_core.go | 2 +- foreign/go/client/tcp/tcp_core_test.go | 30 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 710689af44..d22c3e04e0 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -397,7 +397,7 @@ func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) if length <= 1 { - return buf, nil + return buf[:0], nil } if cap(buf) < length { diff --git a/foreign/go/client/tcp/tcp_core_test.go b/foreign/go/client/tcp/tcp_core_test.go index d992010bf9..2a8014274b 100644 --- a/foreign/go/client/tcp/tcp_core_test.go +++ b/foreign/go/client/tcp/tcp_core_test.go @@ -254,6 +254,36 @@ func TestSendAndFetchResponse_SuccessWithBody(t *testing.T) { } } +// TestSendAndFetchResponse_EmptyBodyAfterNonEmpty guards against a regression +// where sendLockedInto returned the buffer unchanged (still holding prior data) +// when the server replied with length <= 1, causing callers to deserialize stale +// content as ghost messages. +func TestSendAndFetchResponse_EmptyBodyAfterNonEmpty(t *testing.T) { + c, serverConn := newTestClient(t) + + body := []byte("stale data that must not leak") + + // First call: server returns a non-empty body. + go serverRespond(t, serverConn, 0, body) + first, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq) + if err != nil { + t.Fatalf("first call: unexpected error: %v", err) + } + if !bytes.Equal(first, body) { + t.Fatalf("first call: got %q, want %q", first, body) + } + + // Second call: server returns an empty body (length == 0). + go serverRespond(t, serverConn, 0, nil) + second, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq) + if err != nil { + t.Fatalf("second call: unexpected error: %v", err) + } + if len(second) != 0 { + t.Errorf("second call: got %d bytes (%q), want empty — stale buffer leaked", len(second), second) + } +} + func TestNewIggyTcpClient_StoresProvidedLogger(t *testing.T) { var buf bytes.Buffer logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{