diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 30639bd741..6118868642 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -205,33 +205,40 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err := pc.grabConn() if err != nil { - pc.log.WithError(err).Error("Failed to create consumer") - pc.nackTracker.Close() - return nil, err - } - pc.log.Info("Created consumer") - pc.setConsumerState(consumerReady) - - if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { - msgID, err := pc.requestGetLastMessageID() - if err != nil { + errMsg := err.Error() + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) && + !strings.EqualFold(errMsg, pb.ServerError_MetadataError.String()) { + // when topic is deleted, we should give up reconnection. + pc.log.WithError(err).Error("Failed to create consumer") pc.nackTracker.Close() return nil, err } - if msgID.entryID != noMessageEntry { - pc.startMessageID = msgID + pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!") + pc.nackTracker.Close() + } else { + pc.log.Info("Created consumer") + pc.setConsumerState(consumerReady) - // use the WithoutClear version because the dispatcher is not started yet - err = pc.requestSeekWithoutClear(msgID.messageID) + if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { + msgID, err := pc.requestGetLastMessageID() if err != nil { pc.nackTracker.Close() return nil, err } + if msgID.entryID != noMessageEntry { + pc.startMessageID = msgID + + // use the WithoutClear version because the dispatcher is not started yet + err = pc.requestSeekWithoutClear(msgID.messageID) + if err != nil { + pc.nackTracker.Close() + return nil, err + } + } } + go pc.dispatcher() } - - go pc.dispatcher() - go pc.runEventsLoop() return pc, nil diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b2b92735c2..7ab9f9c0b2 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -160,22 +160,28 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions err := p.grabCnx() if err != nil { - logger.WithError(err).Error("Failed to create producer") - return nil, err - } - - p.log = p.log.SubLogger(log.Fields{ - "producer_name": p.producerName, - "producerID": p.producerID, - }) + errMsg := err.Error() + if !strings.EqualFold(errMsg, pb.ServerError_ServiceNotReady.String()) && + !strings.EqualFold(errMsg, pb.ServerError_TooManyRequests.String()) && + !strings.EqualFold(errMsg, pb.ServerError_MetadataError.String()) { + // when topic is deleted, we should give up reconnection. + logger.WithError(err).Error("Failed to create producer") + return nil, err + } + logger.WithError(err).Error("Failed to create producer, it will be retried later!") + } else { + p.log = p.log.SubLogger(log.Fields{ + "producer_name": p.producerName, + "producerID": p.producerID, + }) - p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") - p.setProducerState(producerReady) + p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") + p.setProducerState(producerReady) - if p.options.SendTimeout > 0 { - go p.failTimeoutMessages() + if p.options.SendTimeout > 0 { + go p.failTimeoutMessages() + } } - go p.runEventsLoop() return p, nil