diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 47f8f4c592..d22c3e04e0 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -251,15 +251,6 @@ func releaseRequestBuf(bp *[]byte) { requestBufPool.Put(bp) } -func (c *IggyTcpClient) read(expectedSize int) (int, []byte, error) { - buffer := make([]byte, expectedSize) - n, err := c.readInto(buffer) - if err != nil { - return n, buffer[:n], err - } - return n, buffer, nil -} - // readInto reads exactly len(buf) bytes from the connection into buf. func (c *IggyTcpClient) readInto(buf []byte) (int, error) { var totalRead int @@ -290,17 +281,21 @@ func (c *IggyTcpClient) write(payload []byte) (int, error) { // do sends the command and returns the response body. Commands implementing // the appender interface encode directly into a pooled buffer. func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte, error) { + return c.doInto(ctx, cmd, nil) +} + +func (c *IggyTcpClient) doInto(ctx context.Context, cmd command.Command, buf []byte) ([]byte, error) { bp := acquireRequestBuf() - buf, err := encodeWireRequest(*bp, cmd) + wire, err := encodeWireRequest(*bp, cmd) if err != nil { releaseRequestBuf(bp) - return nil, err + return buf, err } - *bp = buf + *bp = wire - resp, err := c.sendWireAndFetchResponse(ctx, buf) + buf, err = c.sendWireAndFetchResponseInto(ctx, wire, buf) releaseRequestBuf(bp) - return resp, err + return buf, err } // encodeWireRequest writes the wire-format request (4-byte length, 4-byte @@ -331,23 +326,26 @@ func encodeWireRequest(buf []byte, cmd command.Command) ([]byte, error) { // sendWireAndFetchResponse sends a pre-built wire payload (length header, // command code, body) and returns the response body. func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayload []byte) ([]byte, error) { + return c.sendWireAndFetchResponseInto(ctx, wirePayload, nil) +} + +func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePayload, buf []byte) ([]byte, error) { if ctx == nil { - return nil, ierror.ErrNilContext + return buf, ierror.ErrNilContext } if err := ctx.Err(); err != nil { - return nil, err + return buf, err } c.mtx.Lock() defer c.mtx.Unlock() if err := ctx.Err(); err != nil { - return nil, err + return buf, err } - // fast path for non-cancellable ctx. if ctx.Done() == nil { - return c.sendLocked(wirePayload) + return c.sendLockedInto(wirePayload, buf) } conn := c.conn @@ -366,9 +364,8 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa }) defer stop() - result, err := c.sendLocked(wirePayload) + result, err := c.sendLockedInto(wirePayload, buf) - // clear the deadline of connection. deadlineMu.Lock() cleared = true _ = conn.SetDeadline(time.Time{}) @@ -376,40 +373,44 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa if err != nil { if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return result, ctxErr } - return nil, err + return result, err } return result, nil } -func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) { +func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) { if _, err := c.write(wirePayload); err != nil { c.invalidateConnLocked() - return nil, err + return buf, err } if _, err := c.readInto(c.respHeader[:]); err != nil { c.invalidateConnLocked() - return nil, err + return buf, err } if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 { - return nil, ierror.FromCode(status) + return buf, ierror.FromCode(status) } length := int(binary.LittleEndian.Uint32(c.respHeader[4:])) if length <= 1 { - return []byte{}, nil + return buf[:0], nil } - _, buffer, err := c.read(length) - if err != nil { + if cap(buf) < length { + buf = make([]byte, length) + } + buf = buf[:length] + + if _, err := c.readInto(buf); err != nil { c.invalidateConnLocked() - return nil, err + return buf, err } - return buffer, nil + return buf, nil } // invalidateConnLocked closes the connection and marks it as disconnected diff --git a/foreign/go/client/tcp/tcp_core_test.go b/foreign/go/client/tcp/tcp_core_test.go index d992010bf9..2a8014274b 100644 --- a/foreign/go/client/tcp/tcp_core_test.go +++ b/foreign/go/client/tcp/tcp_core_test.go @@ -254,6 +254,36 @@ func TestSendAndFetchResponse_SuccessWithBody(t *testing.T) { } } +// TestSendAndFetchResponse_EmptyBodyAfterNonEmpty guards against a regression +// where sendLockedInto returned the buffer unchanged (still holding prior data) +// when the server replied with length <= 1, causing callers to deserialize stale +// content as ghost messages. +func TestSendAndFetchResponse_EmptyBodyAfterNonEmpty(t *testing.T) { + c, serverConn := newTestClient(t) + + body := []byte("stale data that must not leak") + + // First call: server returns a non-empty body. + go serverRespond(t, serverConn, 0, body) + first, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq) + if err != nil { + t.Fatalf("first call: unexpected error: %v", err) + } + if !bytes.Equal(first, body) { + t.Fatalf("first call: got %q, want %q", first, body) + } + + // Second call: server returns an empty body (length == 0). + go serverRespond(t, serverConn, 0, nil) + second, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq) + if err != nil { + t.Fatalf("second call: unexpected error: %v", err) + } + if len(second) != 0 { + t.Errorf("second call: got %d bytes (%q), want empty — stale buffer leaked", len(second), second) + } +} + func TestNewIggyTcpClient_StoresProvidedLogger(t *testing.T) { var buf bytes.Buffer logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ diff --git a/foreign/go/client/tcp/tcp_messaging.go b/foreign/go/client/tcp/tcp_messaging.go index 710cb5b0b4..90a3885953 100644 --- a/foreign/go/client/tcp/tcp_messaging.go +++ b/foreign/go/client/tcp/tcp_messaging.go @@ -60,7 +60,24 @@ func (c *IggyTcpClient) PollMessages( autoCommit bool, partitionId *uint32, ) (*iggcon.PolledMessage, error) { - buffer, err := c.do(ctx, &command.PollMessages{ + polled, _, err := c.PollMessagesInto(ctx, streamId, topicId, consumer, strategy, count, autoCommit, partitionId, nil) + return polled, err +} + +// Payload and UserHeaders in the returned messages alias buf; copy out bytes +// you need before the next call. +func (c *IggyTcpClient) PollMessagesInto( + ctx context.Context, + streamId iggcon.Identifier, + topicId iggcon.Identifier, + consumer iggcon.Consumer, + strategy iggcon.PollingStrategy, + count uint32, + autoCommit bool, + partitionId *uint32, + buf []byte, +) (*iggcon.PolledMessage, []byte, error) { + buf, err := c.doInto(ctx, &command.PollMessages{ StreamId: streamId, TopicId: topicId, Consumer: consumer, @@ -68,10 +85,11 @@ func (c *IggyTcpClient) PollMessages( Strategy: strategy, Count: count, PartitionId: partitionId, - }) + }, buf) if err != nil { - return nil, err + return nil, buf, err } - return binaryserialization.DeserializeFetchMessagesResponse(buffer, c.MessageCompression) + polled, err := binaryserialization.DeserializeFetchMessagesResponse(buf, c.MessageCompression) + return polled, buf, err } diff --git a/foreign/go/client/tcp/tcp_messaging_test.go b/foreign/go/client/tcp/tcp_messaging_test.go new file mode 100644 index 0000000000..64c616be63 --- /dev/null +++ b/foreign/go/client/tcp/tcp_messaging_test.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + "context" + "encoding/binary" + "testing" + "unsafe" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +// buildPolledMessageResponse returns a valid PollMessages response body +// containing a single message with the given payload. +func buildPolledMessageResponse(payload []byte) []byte { + hdr := iggcon.MessageHeader{PayloadLength: uint32(len(payload))} + hdrBytes := hdr.ToBytes() + + // [partitionId:4][currentOffset:8][messageCount:4][header:64][payload:N] + buf := make([]byte, 4+8+4+len(hdrBytes)+len(payload)) + binary.LittleEndian.PutUint32(buf[0:4], 1) + binary.LittleEndian.PutUint64(buf[4:12], 0) + binary.LittleEndian.PutUint32(buf[12:16], 1) + copy(buf[16:], hdrBytes) + copy(buf[16+len(hdrBytes):], payload) + return buf +} + +func basePtr(b []byte) uintptr { + if len(b) == 0 { + return 0 + } + return uintptr(unsafe.Pointer(&b[0])) +} + +func TestPollMessagesInto_given_repeated_polls_when_payload_size_is_constant_should_reuse_buffer(t *testing.T) { + c, serverConn := newTestClient(t) + + payload := []byte("hello iggy") + responseBody := buildPolledMessageResponse(payload) + + streamId, _ := iggcon.NewIdentifier(uint32(1)) + topicId, _ := iggcon.NewIdentifier(uint32(1)) + consumerId, _ := iggcon.NewIdentifier(uint32(1)) + + var buf []byte + var firstPtr uintptr + + for i := range 3 { + go serverRespond(t, serverConn, 0, responseBody) + + var polled *iggcon.PolledMessage + var err error + polled, buf, err = c.PollMessagesInto( + context.Background(), + streamId, + topicId, + iggcon.NewSingleConsumer(consumerId), + iggcon.NextPollingStrategy(), + 1, + false, + nil, + buf, + ) + if err != nil { + t.Fatalf("poll %d: unexpected error: %v", i, err) + } + if len(polled.Messages) != 1 { + t.Fatalf("poll %d: expected 1 message, got %d", i, len(polled.Messages)) + } + if string(polled.Messages[0].Payload) != string(payload) { + t.Errorf("poll %d: got payload %q, want %q", i, polled.Messages[0].Payload, payload) + } + + ptr := basePtr(buf) + if i == 0 { + firstPtr = ptr + } else if ptr != firstPtr { + t.Errorf("poll %d: buffer reallocated (addr changed from %x to %x)", i, firstPtr, ptr) + } + } +} diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go index 144cc17e3e..198a88c1fb 100644 --- a/foreign/go/contracts/client.go +++ b/foreign/go/contracts/client.go @@ -114,6 +114,20 @@ type Client interface { partitionId *uint32, ) (*PolledMessage, error) + // Payload and UserHeaders in the returned messages alias buf; copy out bytes + // you need before calling PollMessagesInto again. + PollMessagesInto( + ctx context.Context, + streamId Identifier, + topicId Identifier, + consumer Consumer, + strategy PollingStrategy, + count uint32, + autoCommit bool, + partitionId *uint32, + buf []byte, + ) (*PolledMessage, []byte, error) + // StoreConsumerOffset store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. // Authentication is required, and the permission to poll the messages. StoreConsumerOffset(