From 0d6646fe3fe810ee396f3ed4d3f77979d25f813e Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:31:39 +0800 Subject: [PATCH 1/2] fix: prevent sendRequest double-done panic Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pulsar/message_chunking_test.go | 42 ++++++------ pulsar/producer_partition.go | 107 +++++++++++++++++++------------ pulsar/send_request_pool_test.go | 45 +++++++++++++ 3 files changed, 130 insertions(+), 64 deletions(-) create mode 100644 pulsar/send_request_pool_test.go diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 12f0517c44..3175d5ae93 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -26,7 +26,6 @@ import ( "net/http" "os" "strings" - "sync" "testing" "time" @@ -554,30 +553,29 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) + sr := newSendRequest( + producerImpl, + context.Background(), + msg, + func(MessageID, *ProducerMessage, error) {}, + true, + ) + sr.totalChunks = totalChunks + sr.chunkID = chunkID + sr.uuid = uuid + sr.chunkRecorder = newChunkRecorder() + sr.uncompressedPayload = wholePayload + sr.uncompressedSize = int64(len(wholePayload)) + sr.compressedPayload = wholePayload + sr.compressedSize = len(wholePayload) + sr.payloadChunkSize = internal.MaxMessageSize - proto.Size(mm) + sr.mm = mm + sr.deliverAt = time.Now() + sr.maxMessageSize = internal.MaxMessageSize producerImpl.internalSingleSend( mm, msg.Payload, - &sendRequest{ - callback: func(id MessageID, producerMessage *ProducerMessage, err error) { - }, - callbackOnce: &sync.Once{}, - ctx: context.Background(), - msg: msg, - producer: producerImpl, - flushImmediately: true, - totalChunks: totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: newChunkRecorder(), - uncompressedPayload: wholePayload, - uncompressedSize: int64(len(wholePayload)), - compressedPayload: wholePayload, - compressedSize: len(wholePayload), - payloadChunkSize: internal.MaxMessageSize - proto.Size(mm), - mm: mm, - deliverAt: time.Now(), - maxMessageSize: internal.MaxMessageSize, - }, + sr, uint32(internal.MaxMessageSize), ) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 833e9f4b38..26c81a142e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -660,36 +660,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { } // update chunk id sr.mm.ChunkId = proto.Int32(int32(chunkID)) - nsr := sendRequestPool.Get().(*sendRequest) - *nsr = sendRequest{ - pool: sendRequestPool, - ctx: sr.ctx, - msg: sr.msg, - producer: sr.producer, - callback: sr.callback, - callbackOnce: sr.callbackOnce, - publishTime: sr.publishTime, - flushImmediately: sr.flushImmediately, - totalChunks: sr.totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: cr, - transaction: sr.transaction, - memLimit: sr.memLimit, - semaphore: sr.semaphore, - reservedMem: int64(rhs - lhs), - sendAsBatch: sr.sendAsBatch, - schema: sr.schema, - schemaVersion: sr.schemaVersion, - uncompressedPayload: sr.uncompressedPayload, - uncompressedSize: sr.uncompressedSize, - compressedPayload: sr.compressedPayload, - compressedSize: sr.compressedSize, - payloadChunkSize: sr.payloadChunkSize, - mm: sr.mm, - deliverAt: sr.deliverAt, - maxMessageSize: sr.maxMessageSize, - } + nsr := newChunkSendRequest(sr, chunkID, uuid, cr, int64(rhs-lhs)) p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize)) } @@ -1326,18 +1297,7 @@ func (p *partitionProducer) internalSendAsync( return } - sr := sendRequestPool.Get().(*sendRequest) - *sr = sendRequest{ - pool: sendRequestPool, - ctx: ctx, - msg: msg, - producer: p, - callback: callback, - callbackOnce: &sync.Once{}, - flushImmediately: flushImmediately, - publishTime: time.Now(), - chunkID: -1, - } + sr := newSendRequest(p, ctx, msg, callback, flushImmediately) if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) @@ -1612,6 +1572,7 @@ func (p *partitionProducer) Close() { } type sendRequest struct { + doneFlag atomic.Bool pool *sync.Pool ctx context.Context msg *ProducerMessage @@ -1648,7 +1609,67 @@ type sendRequest struct { maxMessageSize int32 } +func newSendRequest( + p *partitionProducer, + ctx context.Context, + msg *ProducerMessage, + callback func(MessageID, *ProducerMessage, error), + flushImmediately bool, +) *sendRequest { + sr := sendRequestPool.Get().(*sendRequest) + *sr = sendRequest{ + pool: sendRequestPool, + ctx: ctx, + msg: msg, + producer: p, + callback: callback, + callbackOnce: &sync.Once{}, + flushImmediately: flushImmediately, + publishTime: time.Now(), + chunkID: -1, + } + return sr +} + +func newChunkSendRequest(parent *sendRequest, chunkID int, uuid string, cr *chunkRecorder, reservedMem int64) *sendRequest { + sr := sendRequestPool.Get().(*sendRequest) + *sr = sendRequest{ + pool: sendRequestPool, + ctx: parent.ctx, + msg: parent.msg, + producer: parent.producer, + callback: parent.callback, + callbackOnce: parent.callbackOnce, + publishTime: parent.publishTime, + flushImmediately: parent.flushImmediately, + totalChunks: parent.totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: cr, + transaction: parent.transaction, + memLimit: parent.memLimit, + semaphore: parent.semaphore, + reservedMem: reservedMem, + sendAsBatch: parent.sendAsBatch, + schema: parent.schema, + schemaVersion: parent.schemaVersion, + uncompressedPayload: parent.uncompressedPayload, + uncompressedSize: parent.uncompressedSize, + compressedPayload: parent.compressedPayload, + compressedSize: parent.compressedSize, + payloadChunkSize: parent.payloadChunkSize, + mm: parent.mm, + deliverAt: parent.deliverAt, + maxMessageSize: parent.maxMessageSize, + } + return sr +} + func (sr *sendRequest) done(msgID MessageID, err error) { + if !sr.doneFlag.CompareAndSwap(false, true) { + return + } + if err == nil { sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) sr.producer.metrics.MessagesPublished.Inc() @@ -1700,6 +1721,8 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if pool != nil { // reset all the fields *sr = sendRequest{} + // Keep the guard raised until the object is reinitialized from the pool. + sr.doneFlag.Store(true) pool.Put(sr) } } diff --git a/pulsar/send_request_pool_test.go b/pulsar/send_request_pool_test.go new file mode 100644 index 0000000000..f19f45d312 --- /dev/null +++ b/pulsar/send_request_pool_test.go @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "errors" + "testing" + + plog "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/stretchr/testify/require" +) + +func TestSendRequestDoneIsIdempotentAfterPutToPool(t *testing.T) { + sr := newSendRequest( + &partitionProducer{log: plog.DefaultNopLogger()}, + context.Background(), + &ProducerMessage{Properties: map[string]string{"k": "v"}}, + func(MessageID, *ProducerMessage, error) {}, + false, + ) + + // First done() call returns sr to the pool and resets it. + sr.done(nil, errors.New("first error")) + + // A second done() call on the same pointer should be ignored safely. + require.NotPanics(t, func() { + sr.done(nil, errors.New("second error")) + }) +} From f627549b12e73f046f048a8cf05ead4042d5d6dc Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:53:24 +0800 Subject: [PATCH 2/2] Fix lint --- pulsar/message_chunking_test.go | 2 +- pulsar/producer_partition.go | 50 ++++++++++++++++---------------- pulsar/send_request_pool_test.go | 2 +- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 3175d5ae93..b7918756b7 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -554,8 +554,8 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) sr := newSendRequest( - producerImpl, context.Background(), + producerImpl, msg, func(MessageID, *ProducerMessage, error) {}, true, diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 26c81a142e..c6a0364e07 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1297,7 +1297,7 @@ func (p *partitionProducer) internalSendAsync( return } - sr := newSendRequest(p, ctx, msg, callback, flushImmediately) + sr := newSendRequest(ctx, p, msg, callback, flushImmediately) if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) @@ -1610,8 +1610,8 @@ type sendRequest struct { } func newSendRequest( - p *partitionProducer, ctx context.Context, + p *partitionProducer, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool, @@ -1631,36 +1631,36 @@ func newSendRequest( return sr } -func newChunkSendRequest(parent *sendRequest, chunkID int, uuid string, cr *chunkRecorder, reservedMem int64) *sendRequest { +func newChunkSendRequest(p *sendRequest, chunkID int, uuid string, cr *chunkRecorder, reservedMem int64) *sendRequest { sr := sendRequestPool.Get().(*sendRequest) *sr = sendRequest{ pool: sendRequestPool, - ctx: parent.ctx, - msg: parent.msg, - producer: parent.producer, - callback: parent.callback, - callbackOnce: parent.callbackOnce, - publishTime: parent.publishTime, - flushImmediately: parent.flushImmediately, - totalChunks: parent.totalChunks, + ctx: p.ctx, + msg: p.msg, + producer: p.producer, + callback: p.callback, + callbackOnce: p.callbackOnce, + publishTime: p.publishTime, + flushImmediately: p.flushImmediately, + totalChunks: p.totalChunks, chunkID: chunkID, uuid: uuid, chunkRecorder: cr, - transaction: parent.transaction, - memLimit: parent.memLimit, - semaphore: parent.semaphore, + transaction: p.transaction, + memLimit: p.memLimit, + semaphore: p.semaphore, reservedMem: reservedMem, - sendAsBatch: parent.sendAsBatch, - schema: parent.schema, - schemaVersion: parent.schemaVersion, - uncompressedPayload: parent.uncompressedPayload, - uncompressedSize: parent.uncompressedSize, - compressedPayload: parent.compressedPayload, - compressedSize: parent.compressedSize, - payloadChunkSize: parent.payloadChunkSize, - mm: parent.mm, - deliverAt: parent.deliverAt, - maxMessageSize: parent.maxMessageSize, + sendAsBatch: p.sendAsBatch, + schema: p.schema, + schemaVersion: p.schemaVersion, + uncompressedPayload: p.uncompressedPayload, + uncompressedSize: p.uncompressedSize, + compressedPayload: p.compressedPayload, + compressedSize: p.compressedSize, + payloadChunkSize: p.payloadChunkSize, + mm: p.mm, + deliverAt: p.deliverAt, + maxMessageSize: p.maxMessageSize, } return sr } diff --git a/pulsar/send_request_pool_test.go b/pulsar/send_request_pool_test.go index f19f45d312..40c642530f 100644 --- a/pulsar/send_request_pool_test.go +++ b/pulsar/send_request_pool_test.go @@ -28,8 +28,8 @@ import ( func TestSendRequestDoneIsIdempotentAfterPutToPool(t *testing.T) { sr := newSendRequest( - &partitionProducer{log: plog.DefaultNopLogger()}, context.Background(), + &partitionProducer{log: plog.DefaultNopLogger()}, &ProducerMessage{Properties: map[string]string{"k": "v"}}, func(MessageID, *ProducerMessage, error) {}, false,