Skip to content
Open
65 changes: 33 additions & 32 deletions foreign/go/client/tcp/tcp_core.go

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the server returns an empty body (length <= 1), sendLockedInto returns buf unchanged -- still holding data from the previous call. DeserializeFetchMessagesResponse then re-parses that stale data and returns ghost messages. The old code returned []byte{} here. I think you need return buf[:0], nil to preserve the reusable backing array without leaking old contents.

I reproduced this locally: first poll returns 1 message, second poll has an empty server response, but caller still sees 1 stale message from the first call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, added regression test

Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,6 @@ func releaseRequestBuf(bp *[]byte) {
requestBufPool.Put(bp)
}

func (c *IggyTcpClient) read(expectedSize int) (int, []byte, error) {
buffer := make([]byte, expectedSize)
n, err := c.readInto(buffer)
if err != nil {
return n, buffer[:n], err
}
return n, buffer, nil
}

// readInto reads exactly len(buf) bytes from the connection into buf.
func (c *IggyTcpClient) readInto(buf []byte) (int, error) {
var totalRead int
Expand Down Expand Up @@ -290,17 +281,21 @@ func (c *IggyTcpClient) write(payload []byte) (int, error) {
// do sends the command and returns the response body. Commands implementing
// the appender interface encode directly into a pooled buffer.
func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte, error) {
return c.doInto(ctx, cmd, nil)
}

func (c *IggyTcpClient) doInto(ctx context.Context, cmd command.Command, buf []byte) ([]byte, error) {
bp := acquireRequestBuf()
buf, err := encodeWireRequest(*bp, cmd)
wire, err := encodeWireRequest(*bp, cmd)
if err != nil {
releaseRequestBuf(bp)
return nil, err
return buf, err
}
*bp = buf
*bp = wire

resp, err := c.sendWireAndFetchResponse(ctx, buf)
buf, err = c.sendWireAndFetchResponseInto(ctx, wire, buf)
releaseRequestBuf(bp)
return resp, err
return buf, err
}

// encodeWireRequest writes the wire-format request (4-byte length, 4-byte
Expand Down Expand Up @@ -331,23 +326,26 @@ func encodeWireRequest(buf []byte, cmd command.Command) ([]byte, error) {
// sendWireAndFetchResponse sends a pre-built wire payload (length header,
// command code, body) and returns the response body.
func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayload []byte) ([]byte, error) {
return c.sendWireAndFetchResponseInto(ctx, wirePayload, nil)
}

func (c *IggyTcpClient) sendWireAndFetchResponseInto(ctx context.Context, wirePayload, buf []byte) ([]byte, error) {
if ctx == nil {
return nil, ierror.ErrNilContext
return buf, ierror.ErrNilContext
}
if err := ctx.Err(); err != nil {
return nil, err
return buf, err
}

c.mtx.Lock()
defer c.mtx.Unlock()

if err := ctx.Err(); err != nil {
return nil, err
return buf, err
}

// fast path for non-cancellable ctx.
if ctx.Done() == nil {
return c.sendLocked(wirePayload)
return c.sendLockedInto(wirePayload, buf)
}

conn := c.conn
Expand All @@ -366,50 +364,53 @@ func (c *IggyTcpClient) sendWireAndFetchResponse(ctx context.Context, wirePayloa
})
defer stop()

result, err := c.sendLocked(wirePayload)
result, err := c.sendLockedInto(wirePayload, buf)

// clear the deadline of connection.
deadlineMu.Lock()
cleared = true
_ = conn.SetDeadline(time.Time{})
deadlineMu.Unlock()

if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return result, ctxErr
}
return nil, err
return result, err
}
return result, nil
}

func (c *IggyTcpClient) sendLocked(wirePayload []byte) ([]byte, error) {
func (c *IggyTcpClient) sendLockedInto(wirePayload, buf []byte) ([]byte, error) {
if _, err := c.write(wirePayload); err != nil {
c.invalidateConnLocked()
return nil, err
return buf, err
}

if _, err := c.readInto(c.respHeader[:]); err != nil {
c.invalidateConnLocked()
return nil, err
return buf, err
}

if status := ierror.Code(binary.LittleEndian.Uint32(c.respHeader[0:4])); status != 0 {
return nil, ierror.FromCode(status)
return buf, ierror.FromCode(status)
}

length := int(binary.LittleEndian.Uint32(c.respHeader[4:]))
if length <= 1 {
return []byte{}, nil
return buf[:0], nil
}

_, buffer, err := c.read(length)
if err != nil {
if cap(buf) < length {
buf = make([]byte, length)
}
buf = buf[:length]

if _, err := c.readInto(buf); err != nil {
c.invalidateConnLocked()
return nil, err
return buf, err
}

return buffer, nil
return buf, nil
}

// invalidateConnLocked closes the connection and marks it as disconnected
Expand Down
30 changes: 30 additions & 0 deletions foreign/go/client/tcp/tcp_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,36 @@ func TestSendAndFetchResponse_SuccessWithBody(t *testing.T) {
}
}

// TestSendAndFetchResponse_EmptyBodyAfterNonEmpty guards against a regression
// where sendLockedInto returned the buffer unchanged (still holding prior data)
// when the server replied with length <= 1, causing callers to deserialize stale
// content as ghost messages.
func TestSendAndFetchResponse_EmptyBodyAfterNonEmpty(t *testing.T) {
c, serverConn := newTestClient(t)

body := []byte("stale data that must not leak")

// First call: server returns a non-empty body.
go serverRespond(t, serverConn, 0, body)
first, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq)
if err != nil {
t.Fatalf("first call: unexpected error: %v", err)
}
if !bytes.Equal(first, body) {
t.Fatalf("first call: got %q, want %q", first, body)
}

// Second call: server returns an empty body (length == 0).
go serverRespond(t, serverConn, 0, nil)
second, err := c.sendWireAndFetchResponse(context.Background(), emptyWireReq)
if err != nil {
t.Fatalf("second call: unexpected error: %v", err)
}
if len(second) != 0 {
t.Errorf("second call: got %d bytes (%q), want empty — stale buffer leaked", len(second), second)
}
}

func TestNewIggyTcpClient_StoresProvidedLogger(t *testing.T) {
var buf bytes.Buffer
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
Expand Down
26 changes: 22 additions & 4 deletions foreign/go/client/tcp/tcp_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,36 @@ func (c *IggyTcpClient) PollMessages(
autoCommit bool,
partitionId *uint32,
) (*iggcon.PolledMessage, error) {
buffer, err := c.do(ctx, &command.PollMessages{
polled, _, err := c.PollMessagesInto(ctx, streamId, topicId, consumer, strategy, count, autoCommit, partitionId, nil)
return polled, err
}

// Payload and UserHeaders in the returned messages alias buf; copy out bytes
// you need before the next call.
func (c *IggyTcpClient) PollMessagesInto(
ctx context.Context,
streamId iggcon.Identifier,
topicId iggcon.Identifier,
consumer iggcon.Consumer,
strategy iggcon.PollingStrategy,
count uint32,
autoCommit bool,
partitionId *uint32,
buf []byte,
) (*iggcon.PolledMessage, []byte, error) {
buf, err := c.doInto(ctx, &command.PollMessages{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
AutoCommit: autoCommit,
Strategy: strategy,
Count: count,
PartitionId: partitionId,
})
}, buf)
if err != nil {
return nil, err
return nil, buf, err
}

return binaryserialization.DeserializeFetchMessagesResponse(buffer, c.MessageCompression)
polled, err := binaryserialization.DeserializeFetchMessagesResponse(buf, c.MessageCompression)
return polled, buf, err
}
98 changes: 98 additions & 0 deletions foreign/go/client/tcp/tcp_messaging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp

import (
"context"
"encoding/binary"
"testing"
"unsafe"

iggcon "github.com/apache/iggy/foreign/go/contracts"
)

// buildPolledMessageResponse returns a valid PollMessages response body
// containing a single message with the given payload.
func buildPolledMessageResponse(payload []byte) []byte {
hdr := iggcon.MessageHeader{PayloadLength: uint32(len(payload))}
hdrBytes := hdr.ToBytes()

// [partitionId:4][currentOffset:8][messageCount:4][header:64][payload:N]
buf := make([]byte, 4+8+4+len(hdrBytes)+len(payload))
binary.LittleEndian.PutUint32(buf[0:4], 1)
binary.LittleEndian.PutUint64(buf[4:12], 0)
binary.LittleEndian.PutUint32(buf[12:16], 1)
copy(buf[16:], hdrBytes)
copy(buf[16+len(hdrBytes):], payload)
return buf
}

func basePtr(b []byte) uintptr {
if len(b) == 0 {
return 0
}
return uintptr(unsafe.Pointer(&b[0]))
}

func TestPollMessagesInto_given_repeated_polls_when_payload_size_is_constant_should_reuse_buffer(t *testing.T) {
c, serverConn := newTestClient(t)

payload := []byte("hello iggy")
responseBody := buildPolledMessageResponse(payload)

streamId, _ := iggcon.NewIdentifier(uint32(1))
topicId, _ := iggcon.NewIdentifier(uint32(1))
consumerId, _ := iggcon.NewIdentifier(uint32(1))

var buf []byte
var firstPtr uintptr

for i := range 3 {
go serverRespond(t, serverConn, 0, responseBody)

var polled *iggcon.PolledMessage
var err error
polled, buf, err = c.PollMessagesInto(
context.Background(),
streamId,
topicId,
iggcon.NewSingleConsumer(consumerId),
iggcon.NextPollingStrategy(),
1,
false,
nil,
buf,
)
if err != nil {
t.Fatalf("poll %d: unexpected error: %v", i, err)
}
if len(polled.Messages) != 1 {
t.Fatalf("poll %d: expected 1 message, got %d", i, len(polled.Messages))
}
if string(polled.Messages[0].Payload) != string(payload) {
t.Errorf("poll %d: got payload %q, want %q", i, polled.Messages[0].Payload, payload)
}

ptr := basePtr(buf)
if i == 0 {
firstPtr = ptr
} else if ptr != firstPtr {
t.Errorf("poll %d: buffer reallocated (addr changed from %x to %x)", i, firstPtr, ptr)
}
}
}
14 changes: 14 additions & 0 deletions foreign/go/contracts/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@ type Client interface {
partitionId *uint32,
) (*PolledMessage, error)

// Payload and UserHeaders in the returned messages alias buf; copy out bytes
// you need before calling PollMessagesInto again.
PollMessagesInto(
ctx context.Context,
streamId Identifier,
topicId Identifier,
consumer Consumer,
strategy PollingStrategy,
count uint32,
autoCommit bool,
partitionId *uint32,
buf []byte,
) (*PolledMessage, []byte, error)
Comment on lines +119 to +129

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding PollMessagesInto to the Client interface is a compile-breaking change for anyone implementing or mocking it (test doubles, alternative transports). Would it make sense to keep this as a concrete method on IggyTcpClient only, or put it on a separate optional interface like BufferedPoller that callers can type-assert to?


// StoreConsumerOffset store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to poll the messages.
StoreConsumerOffset(
Expand Down
Loading