diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 918245953e..2b5aba0a98 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -111,6 +111,12 @@ pub struct IggyConsumer { topic_id: Arc, partition_id: Option, polling_strategy: PollingStrategy, + // Per-partition recovery set: when a partition's stored offset falls below + // its earliest available offset, its ID is inserted here so the next poll + // for that partition uses PollingStrategy::last(). Removed after the first + // successful recovery poll. Keyed by partition_id; u32::MAX is the sentinel + // for consumers with no fixed partition (consumer-group auto-assign). + fallback_to_last: Arc>, poll_interval_micros: u64, batch_length: u32, auto_commit: AutoCommit, @@ -170,6 +176,7 @@ impl IggyConsumer { topic_id: Arc::new(topic_id), partition_id, polling_strategy, + fallback_to_last: Arc::new(DashMap::new()), poll_interval_micros: polling_interval.map_or(0, |interval| interval.as_micros()), last_stored_offsets: Arc::new(DashMap::new()), last_consumed_offsets: Arc::new(DashMap::new()), @@ -521,7 +528,7 @@ impl IggyConsumer { self.consumer ); - Self::initialize_consumer_group( + let newly_created = Self::initialize_consumer_group( self.client.clone(), self.create_consumer_group_if_not_exists, self.stream_id.clone(), @@ -530,7 +537,17 @@ impl IggyConsumer { &self.consumer_name, self.joined_consumer_group.clone(), ) - .await + .await?; + + // A brand-new consumer group starts at stored offset 0. If the topic has + // had retention run, offset 0 may no longer exist. Pre-arm the fallback + // so the very first poll uses PollingStrategy::last() instead of Next, + // avoiding a guaranteed InvalidOffset error on startup. + if newly_created { + self.fallback_to_last.insert(u32::MAX, ()); + } + + Ok(()) } async fn subscribe_events(&self) { @@ -551,6 +568,7 @@ impl IggyConsumer { let consumer_name = self.consumer_name.clone(); let can_poll = self.can_poll.clone(); let joined_consumer_group = self.joined_consumer_group.clone(); + let fallback_to_last = self.fallback_to_last.clone(); let mut reconnected = false; let mut disconnected = false; @@ -608,7 +626,7 @@ impl IggyConsumer { info!( "Rejoining consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}..." ); - if let Err(error) = Self::initialize_consumer_group( + match Self::initialize_consumer_group( client.clone(), create_consumer_group_if_not_exists, stream_id.clone(), @@ -619,10 +637,17 @@ impl IggyConsumer { ) .await { - error!( - "Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}" - ); - continue; + Err(error) => { + error!( + "Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}" + ); + continue; + } + Ok(newly_created) => { + if newly_created { + fallback_to_last.insert(u32::MAX, ()); + } + } } info!( "Rejoined consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}" @@ -645,7 +670,8 @@ impl IggyConsumer { let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; let consumer = self.consumer.clone(); - let polling_strategy = self.polling_strategy; + let configured_strategy = self.polling_strategy; + let fallback_to_last = self.fallback_to_last.clone(); let client = self.client.clone(); let count = self.batch_length; let auto_commit_after_polling = self.auto_commit_after_polling; @@ -676,6 +702,13 @@ impl IggyConsumer { sleep(retry_interval.get_duration()).await; } + let effective_pid = partition_id.unwrap_or(u32::MAX); + let polling_strategy = if fallback_to_last.contains_key(&effective_pid) { + PollingStrategy::last() + } else { + configured_strategy + }; + trace!("Sending poll messages request"); last_polled_at.store(IggyTimestamp::now().into(), ORDERING); let polled_messages = client @@ -696,6 +729,7 @@ impl IggyConsumer { if polled_messages.messages.is_empty() { return Ok(polled_messages); } + fallback_to_last.remove(&effective_pid); let partition_id = polled_messages.partition_id; let consumed_offset; @@ -783,6 +817,18 @@ impl IggyConsumer { let error = polled_messages.unwrap_err(); error!("Failed to poll messages: {error}"); + // When the consumer group's stored offset falls below the topic's + // earliest available offset (e.g. after retention removes old + // segments), seek to the most recent message on the next poll + // instead of looping forever at the invalid offset. + if matches!(error, IggyError::InvalidOffset(_)) { + warn!( + "Consumer offset is before the earliest available message in topic: {topic_id}, stream: {stream_id}. \ + Falling back to latest offset on next poll." + ); + fallback_to_last.insert(effective_pid, ()); + } + // Handle connection/auth errors - disable polling until event task re-enables // it after reconnection and rejoin complete if matches!( @@ -827,6 +873,10 @@ impl IggyConsumer { sleep(Duration::from_micros(remaining)).await; } + // Returns true if the consumer group was freshly created (stored offset = 0, + // which may be below the topic's earliest available offset after retention). + // Callers use this to pre-arm fallback_to_last so the very first poll uses + // PollingStrategy::last() and avoids an InvalidOffset error. async fn initialize_consumer_group( client: IggyRwLock, create_consumer_group_if_not_exists: bool, @@ -835,9 +885,9 @@ impl IggyConsumer { consumer: Arc, consumer_name: &str, joined_consumer_group: Arc, - ) -> Result<(), IggyError> { + ) -> Result { if joined_consumer_group.load(ORDERING) { - return Ok(()); + return Ok(false); } let client = client.read().await; @@ -850,7 +900,7 @@ impl IggyConsumer { trace!( "Validating consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}" ); - if client + let newly_created = if client .get_consumer_group(&stream_id, &topic_id, &consumer_group_id) .await? .is_none() @@ -880,7 +930,10 @@ impl IggyConsumer { return Err(error); } } - } + true + } else { + false + }; info!( "Joining consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}", @@ -900,7 +953,7 @@ impl IggyConsumer { info!( "Joined consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}" ); - Ok(()) + Ok(newly_created) } }