From e9c219c65b2b7f6526ff046690d7afa465e71e8d Mon Sep 17 00:00:00 2001 From: nicobao Date: Thu, 14 Oct 2021 20:32:07 +0800 Subject: [PATCH 01/10] fix 640,change create way is same as java client --- pulsar/consumer_partition.go | 10 ++++++---- pulsar/producer_partition.go | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 30639bd741..01dc6b1ca3 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1016,8 +1016,9 @@ func (pc *partitionConsumer) reconnectToBroker() { func (pc *partitionConsumer) grabConn() error { lr, err := pc.client.lookupService.Lookup(pc.topic) if err != nil { - pc.log.WithError(err).Warn("Failed to lookup topic") - return err + pc.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") + pc.connectClosedCh <- connectionClosed{} + return nil } pc.log.Debugf("Lookup result: %+v", lr) @@ -1079,8 +1080,9 @@ func (pc *partitionConsumer) grabConn() error { pb.BaseCommand_SUBSCRIBE, cmdSubscribe) if err != nil { - pc.log.WithError(err).Error("Failed to create consumer") - return err + pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") + pc.connectClosedCh <- connectionClosed{} + return nil } if res.Response.ConsumerStatsResponse != nil { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b2b92735c2..231164bf08 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -184,8 +184,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions func (p *partitionProducer) grabCnx() error { lr, err := p.client.lookupService.Lookup(p.topic) if err != nil { - p.log.WithError(err).Warn("Failed to lookup topic") - return err + p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") + p.connectClosedCh <- connectionClosed{} + return nil } p.log.Debug("Lookup result: ", lr) @@ -227,8 +228,9 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer") - return err + p.log.WithError(err).Error("Failed to create producer, it will be retried later!") + p.connectClosedCh <- connectionClosed{} + return nil } p.producerName = res.Response.ProducerSuccess.GetProducerName() From c7a346702e318b7d67fb9b6bc6a06d0ecb6d6b73 Mon Sep 17 00:00:00 2001 From: nicobao Date: Thu, 14 Oct 2021 23:51:03 +0800 Subject: [PATCH 02/10] fix 640,handle test error --- pulsar/consumer_partition.go | 39 +++++++++++++++++------------------- pulsar/producer_partition.go | 27 +++++++++++-------------- 2 files changed, 30 insertions(+), 36 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 01dc6b1ca3..73a7a56c73 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -207,31 +207,29 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if err != nil { pc.log.WithError(err).Error("Failed to create consumer") pc.nackTracker.Close() - return nil, err - } - pc.log.Info("Created consumer") - pc.setConsumerState(consumerReady) - - if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { - msgID, err := pc.requestGetLastMessageID() - if err != nil { - pc.nackTracker.Close() - return nil, err - } - if msgID.entryID != noMessageEntry { - pc.startMessageID = msgID + } else { + pc.log.Info("Created consumer") + pc.setConsumerState(consumerReady) - // use the WithoutClear version because the dispatcher is not started yet - err = pc.requestSeekWithoutClear(msgID.messageID) + if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { + msgID, err := pc.requestGetLastMessageID() if err != nil { pc.nackTracker.Close() return nil, err } + if msgID.entryID != noMessageEntry { + pc.startMessageID = msgID + + // use the WithoutClear version because the dispatcher is not started yet + err = pc.requestSeekWithoutClear(msgID.messageID) + if err != nil { + pc.nackTracker.Close() + return nil, err + } + } } + go pc.dispatcher() } - - go pc.dispatcher() - go pc.runEventsLoop() return pc, nil @@ -1004,7 +1002,6 @@ func (pc *partitionConsumer) reconnectToBroker() { if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") - break } if maxRetry > 0 { @@ -1018,7 +1015,7 @@ func (pc *partitionConsumer) grabConn() error { if err != nil { pc.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") pc.connectClosedCh <- connectionClosed{} - return nil + return err } pc.log.Debugf("Lookup result: %+v", lr) @@ -1082,7 +1079,7 @@ func (pc *partitionConsumer) grabConn() error { if err != nil { pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") pc.connectClosedCh <- connectionClosed{} - return nil + return err } if res.Response.ConsumerStatsResponse != nil { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 231164bf08..9fd8159713 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -161,21 +161,19 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions err := p.grabCnx() if err != nil { logger.WithError(err).Error("Failed to create producer") - return nil, err - } - - p.log = p.log.SubLogger(log.Fields{ - "producer_name": p.producerName, - "producerID": p.producerID, - }) + } else { + p.log = p.log.SubLogger(log.Fields{ + "producer_name": p.producerName, + "producerID": p.producerID, + }) - p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") - p.setProducerState(producerReady) + p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") + p.setProducerState(producerReady) - if p.options.SendTimeout > 0 { - go p.failTimeoutMessages() + if p.options.SendTimeout > 0 { + go p.failTimeoutMessages() + } } - go p.runEventsLoop() return p, nil @@ -186,7 +184,7 @@ func (p *partitionProducer) grabCnx() error { if err != nil { p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") p.connectClosedCh <- connectionClosed{} - return nil + return err } p.log.Debug("Lookup result: ", lr) @@ -230,7 +228,7 @@ func (p *partitionProducer) grabCnx() error { if err != nil { p.log.WithError(err).Error("Failed to create producer, it will be retried later!") p.connectClosedCh <- connectionClosed{} - return nil + return err } p.producerName = res.Response.ProducerSuccess.GetProducerName() @@ -361,7 +359,6 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic Not Found.") - break } if maxRetry > 0 { From 84de13cd009bde9ed417d4197dad6193325846c1 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 09:35:56 +0800 Subject: [PATCH 03/10] add handler different err --- pulsar/consumer_partition.go | 10 ++++++++-- pulsar/producer_partition.go | 17 ++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 73a7a56c73..2ccad2392c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -205,7 +205,13 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err := pc.grabConn() if err != nil { - pc.log.WithError(err).Error("Failed to create consumer") + errMsg := err.Error() + if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) { + // when topic is deleted, we should give up reconnection. + pc.log.WithError(err).Error("Failed to create consumer") + return nil, err + } + pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") pc.nackTracker.Close() } else { pc.log.Info("Created consumer") @@ -1077,7 +1083,7 @@ func (pc *partitionConsumer) grabConn() error { pb.BaseCommand_SUBSCRIBE, cmdSubscribe) if err != nil { - pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") + pc.log.WithError(err).Error("Failed to create consumer, it may be retried later when connection error!") pc.connectClosedCh <- connectionClosed{} return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9fd8159713..080bbd0c87 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "strings" "sync" @@ -61,6 +62,10 @@ var ( var errTopicNotFount = "TopicNotFound" +var errConnectError = "connection error" + +var errLookupError = "lookup error" + type partitionProducer struct { state ua.Int32 client *client @@ -160,7 +165,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions err := p.grabCnx() if err != nil { - logger.WithError(err).Error("Failed to create producer") + errMsg := err.Error() + if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) { + // when topic is deleted, we should give up reconnection. + logger.WithError(err).Error("Failed to create producer") + return nil, err + } + logger.WithError(err).Error("Failed to create producer, it will be retried later!") } else { p.log = p.log.SubLogger(log.Fields{ "producer_name": p.producerName, @@ -184,7 +195,7 @@ func (p *partitionProducer) grabCnx() error { if err != nil { p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") p.connectClosedCh <- connectionClosed{} - return err + return errors.New(errLookupError) } p.log.Debug("Lookup result: ", lr) @@ -226,7 +237,7 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer, it will be retried later!") + p.log.WithError(err).Error("Failed to create producer, it may be retried later when connection error!") p.connectClosedCh <- connectionClosed{} return err } From 5085a1736f3a78ac9ef452013900ad0cd2c13ef4 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 09:57:05 +0800 Subject: [PATCH 04/10] modify test error --- pulsar/reader_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index bdafea086f..8f65f9d995 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -107,10 +107,10 @@ func TestReaderConnectError(t *testing.T) { }) // Expect error in creating consumer - assert.Nil(t, reader) - assert.NotNil(t, err) + assert.NotNil(t, reader) + assert.Nil(t, err) - assert.Equal(t, err.Error(), "connection error") + //assert.Equal(t, err.Error(), "connection error") } func TestReaderOnSpecificMessage(t *testing.T) { From d5cf4b6e1834cc2130c069105e45eda4974950b4 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 14:33:54 +0800 Subject: [PATCH 05/10] recover code whit some advice --- pulsar/consumer_partition.go | 3 +-- pulsar/producer_partition.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2ccad2392c..8a61a85d85 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1008,6 +1008,7 @@ func (pc *partitionConsumer) reconnectToBroker() { if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") + break } if maxRetry > 0 { @@ -1020,7 +1021,6 @@ func (pc *partitionConsumer) grabConn() error { lr, err := pc.client.lookupService.Lookup(pc.topic) if err != nil { pc.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") - pc.connectClosedCh <- connectionClosed{} return err } pc.log.Debugf("Lookup result: %+v", lr) @@ -1084,7 +1084,6 @@ func (pc *partitionConsumer) grabConn() error { if err != nil { pc.log.WithError(err).Error("Failed to create consumer, it may be retried later when connection error!") - pc.connectClosedCh <- connectionClosed{} return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 080bbd0c87..36bb85cd7b 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -194,7 +194,6 @@ func (p *partitionProducer) grabCnx() error { lr, err := p.client.lookupService.Lookup(p.topic) if err != nil { p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") - p.connectClosedCh <- connectionClosed{} return errors.New(errLookupError) } @@ -238,7 +237,6 @@ func (p *partitionProducer) grabCnx() error { res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { p.log.WithError(err).Error("Failed to create producer, it may be retried later when connection error!") - p.connectClosedCh <- connectionClosed{} return err } @@ -370,6 +368,7 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic Not Found.") + break } if maxRetry > 0 { From 8df4e6d24293f54bccfb562e516a5e885c47c419 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 17:29:13 +0800 Subject: [PATCH 06/10] only handle server not already and too many request lookup error --- pulsar/consumer_partition.go | 5 +++-- pulsar/producer_partition.go | 11 +++-------- pulsar/reader_test.go | 6 +++--- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 8a61a85d85..441276ce77 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -206,9 +206,10 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err := pc.grabConn() if err != nil { errMsg := err.Error() - if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) { + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { // when topic is deleted, we should give up reconnection. pc.log.WithError(err).Error("Failed to create consumer") + pc.nackTracker.Close() return nil, err } pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") @@ -1020,7 +1021,7 @@ func (pc *partitionConsumer) reconnectToBroker() { func (pc *partitionConsumer) grabConn() error { lr, err := pc.client.lookupService.Lookup(pc.topic) if err != nil { - pc.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") + pc.log.WithError(err).Warn("Failed to lookup topic") return err } pc.log.Debugf("Lookup result: %+v", lr) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 36bb85cd7b..96ca404b8f 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,7 +19,6 @@ package pulsar import ( "context" - "errors" "fmt" "strings" "sync" @@ -62,10 +61,6 @@ var ( var errTopicNotFount = "TopicNotFound" -var errConnectError = "connection error" - -var errLookupError = "lookup error" - type partitionProducer struct { state ua.Int32 client *client @@ -166,7 +161,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions err := p.grabCnx() if err != nil { errMsg := err.Error() - if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) { + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { // when topic is deleted, we should give up reconnection. logger.WithError(err).Error("Failed to create producer") return nil, err @@ -193,8 +188,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions func (p *partitionProducer) grabCnx() error { lr, err := p.client.lookupService.Lookup(p.topic) if err != nil { - p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!") - return errors.New(errLookupError) + p.log.WithError(err).Warn("Failed to lookup topic") + return err } p.log.Debug("Lookup result: ", lr) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 8f65f9d995..bdafea086f 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -107,10 +107,10 @@ func TestReaderConnectError(t *testing.T) { }) // Expect error in creating consumer - assert.NotNil(t, reader) - assert.Nil(t, err) + assert.Nil(t, reader) + assert.NotNil(t, err) - //assert.Equal(t, err.Error(), "connection error") + assert.Equal(t, err.Error(), "connection error") } func TestReaderOnSpecificMessage(t *testing.T) { From 94c2db57b39657ef2d88d5c6d55c5852c6c2e51f Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 17:35:18 +0800 Subject: [PATCH 07/10] modify log text --- pulsar/consumer_partition.go | 2 +- pulsar/producer_partition.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 441276ce77..93ffd5df2d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1084,7 +1084,7 @@ func (pc *partitionConsumer) grabConn() error { pb.BaseCommand_SUBSCRIBE, cmdSubscribe) if err != nil { - pc.log.WithError(err).Error("Failed to create consumer, it may be retried later when connection error!") + pc.log.WithError(err).Error("Failed to create consumer") return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 96ca404b8f..5e29aab8f5 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -231,7 +231,7 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer, it may be retried later when connection error!") + p.log.WithError(err).Error("Failed to create producer") return err } From d018309cd22ebdc7d54c5cee4e35e9a3e19db4df Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 18:47:40 +0800 Subject: [PATCH 08/10] check code style --- pulsar/consumer_partition.go | 3 ++- pulsar/producer_partition.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 93ffd5df2d..ece2e040d6 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -206,7 +206,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err := pc.grabConn() if err != nil { errMsg := err.Error() - if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { // when topic is deleted, we should give up reconnection. pc.log.WithError(err).Error("Failed to create consumer") pc.nackTracker.Close() diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5e29aab8f5..4a76aabbe1 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -161,7 +161,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions err := p.grabCnx() if err != nil { errMsg := err.Error() - if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { // when topic is deleted, we should give up reconnection. logger.WithError(err).Error("Failed to create producer") return nil, err From 9d864c2be13dcb7198ff4bb07e2edeeee87f43a6 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 18:54:03 +0800 Subject: [PATCH 09/10] add handle metadataError --- pulsar/consumer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index ece2e040d6..6118868642 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -207,7 +207,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if err != nil { errMsg := err.Error() if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && - !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) && + !strings.EqualFold(errMsg, pb.ServerError_MetadataError.String()) { // when topic is deleted, we should give up reconnection. pc.log.WithError(err).Error("Failed to create consumer") pc.nackTracker.Close() From 9fd5a9ee96d85819e6936be68fc93c06dcb44658 Mon Sep 17 00:00:00 2001 From: nicobao Date: Fri, 15 Oct 2021 18:54:56 +0800 Subject: [PATCH 10/10] add handle metadataError --- pulsar/producer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 4a76aabbe1..7ab9f9c0b2 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -162,7 +162,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions if err != nil { errMsg := err.Error() if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && - !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) { + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) && + !strings.EqualFold(errMsg, pb.ServerError_MetadataError.String()) { // when topic is deleted, we should give up reconnection. logger.WithError(err).Error("Failed to create producer") return nil, err