From 5ecd7eed667543a37f56252e991b1a9f191dccc8 Mon Sep 17 00:00:00 2001 From: xia Date: Tue, 19 May 2026 18:02:17 +0800 Subject: [PATCH] fix: add transaction id to message metadata --- pulsar/internal/batch_builder.go | 6 ++ pulsar/internal/batch_builder_test.go | 93 +++++++++++++++++++++++++++ pulsar/producer_partition.go | 5 ++ pulsar/producer_test.go | 28 ++++++++ 4 files changed, 132 insertions(+) create mode 100644 pulsar/internal/batch_builder_test.go diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index ee5c1299dd..ee315ce236 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -233,6 +233,8 @@ func (bc *batchContainer) Add( if useTxn { bc.cmdSend.Send.TxnidMostBits = proto.Uint64(mostSigBits) bc.cmdSend.Send.TxnidLeastBits = proto.Uint64(leastSigBits) + bc.msgMetadata.TxnidMostBits = proto.Uint64(mostSigBits) + bc.msgMetadata.TxnidLeastBits = proto.Uint64(leastSigBits) } } addSingleMessageToBatch(bc.buffer, metadata, payload) @@ -250,6 +252,10 @@ func (bc *batchContainer) reset() { bc.msgMetadata.DeliverAtTime = nil bc.msgMetadata.SchemaVersion = nil bc.msgMetadata.Properties = nil + bc.msgMetadata.TxnidMostBits = nil + bc.msgMetadata.TxnidLeastBits = nil + bc.cmdSend.Send.TxnidMostBits = nil + bc.cmdSend.Send.TxnidLeastBits = nil } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. diff --git a/pulsar/internal/batch_builder_test.go b/pulsar/internal/batch_builder_test.go new file mode 100644 index 0000000000..ae7bea3bc5 --- /dev/null +++ b/pulsar/internal/batch_builder_test.go @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestBatchBuilderAddsTxnIDToMessageMetadata(t *testing.T) { + batcher, err := NewBatchBuilder( + 1000, + 1000, + 1000, + "test", + 1, + pb.CompressionType_NONE, + compression.Level(0), + &bufferPoolImpl{}, + NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer), + log.NewLoggerWithLogrus(logrus.StandardLogger()), + &mockEncryptor{}, + ) + require.NoError(t, err) + + sequenceID := uint64(0) + metadata := &pb.SingleMessageMetadata{ + PayloadSize: proto.Int32(0), + } + + require.True(t, batcher.Add(metadata, &sequenceID, []byte("test"), nil, nil, time.Time{}, + nil, false, true, 12, 34)) + + bc := batcher.(*batchContainer) + assert.Equal(t, uint64(12), bc.msgMetadata.GetTxnidMostBits()) + assert.Equal(t, uint64(34), bc.msgMetadata.GetTxnidLeastBits()) +} + +func TestBatchBuilderClearsTxnIDAfterFlush(t *testing.T) { + batcher, err := NewBatchBuilder( + 1000, + 1000, + 1000, + "test", + 1, + pb.CompressionType_NONE, + compression.Level(0), + &bufferPoolImpl{}, + NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer), + log.NewLoggerWithLogrus(logrus.StandardLogger()), + &mockEncryptor{}, + ) + require.NoError(t, err) + + sequenceID := uint64(0) + metadata := &pb.SingleMessageMetadata{ + PayloadSize: proto.Int32(0), + } + + require.True(t, batcher.Add(metadata, &sequenceID, []byte("test"), nil, nil, time.Time{}, + nil, false, true, 12, 34)) + require.NotNil(t, batcher.Flush()) + + bc := batcher.(*batchContainer) + assert.Nil(t, bc.msgMetadata.TxnidMostBits) + assert.Nil(t, bc.msgMetadata.TxnidLeastBits) + assert.Nil(t, bc.cmdSend.Send.TxnidMostBits) + assert.Nil(t, bc.cmdSend.Send.TxnidLeastBits) +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 833e9f4b38..be7adbfdfb 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1252,6 +1252,11 @@ func (p *partitionProducer) updateMetaData(sr *sendRequest) { } sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt) + if sr.transaction != nil { + txnID := sr.transaction.GetTxnID() + sr.mm.TxnidMostBits = proto.Uint64(txnID.MostSigBits) + sr.mm.TxnidLeastBits = proto.Uint64(txnID.LeastSigBits) + } sr.sendAsBatch = !p.options.DisableBatching && sr.msg.ReplicationClusters == nil && diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 357aa690f3..f49d71c588 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -81,6 +81,34 @@ func TestProducerConnectError(t *testing.T) { assert.ErrorContains(t, err, "connection error") } +func TestUpdateMetaDataAddsTxnID(t *testing.T) { + sequenceID := uint64(0) + pp := &partitionProducer{ + producerName: "test-producer", + options: &ProducerOptions{DisableBatching: true}, + log: plog.DefaultNopLogger(), + sequenceIDGenerator: &sequenceID, + } + txn := &transaction{ + txnID: TxnID{ + MostSigBits: 12, + LeastSigBits: 34, + }, + } + sr := &sendRequest{ + msg: &ProducerMessage{ + Payload: []byte("test"), + }, + transaction: txn, + } + + pp.updateMetaData(sr) + + require.NotNil(t, sr.mm) + assert.Equal(t, uint64(12), sr.mm.GetTxnidMostBits()) + assert.Equal(t, uint64(34), sr.mm.GetTxnidLeastBits()) +} + func TestProducerNoTopic(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://localhost:6650",