From 412b1bbd18fe2951117dfc283fccd14ca8426eae Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sun, 21 Jun 2026 17:24:49 +0300 Subject: [PATCH 1/4] fix(sdk): recover from InvalidOffset by falling back to first available MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a consumer group's stored offset falls below the topic's earliest available offset — for example after the server purges old segments under a retention policy — poll_messages returns IggyError::InvalidOffset. The consumer was retrying at the same invalid offset indefinitely, causing sink connectors to loop on errors and stop delivering messages. Add `fallback_to_first: Arc` to IggyConsumer. On InvalidOffset, set the flag and emit a warning. On the next poll, PollingStrategy::first() is used to seek to the earliest available message; the flag is cleared after the first successful non-empty poll so normal next-offset tracking resumes. --- core/sdk/src/clients/consumer.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 918245953e..941b0b4dda 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -111,6 +111,12 @@ pub struct IggyConsumer { topic_id: Arc, partition_id: Option, polling_strategy: PollingStrategy, + // Set to true when InvalidOffset is received so the next poll uses + // PollingStrategy::first() instead of the configured strategy, recovering + // from the case where the consumer group's stored offset falls below the + // topic's earliest available offset (e.g. after retention purges old + // segments). Cleared after the first successful recovery poll. + fallback_to_first: Arc, poll_interval_micros: u64, batch_length: u32, auto_commit: AutoCommit, @@ -170,6 +176,7 @@ impl IggyConsumer { topic_id: Arc::new(topic_id), partition_id, polling_strategy, + fallback_to_first: Arc::new(AtomicBool::new(false)), poll_interval_micros: polling_interval.map_or(0, |interval| interval.as_micros()), last_stored_offsets: Arc::new(DashMap::new()), last_consumed_offsets: Arc::new(DashMap::new()), @@ -645,7 +652,8 @@ impl IggyConsumer { let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; let consumer = self.consumer.clone(); - let polling_strategy = self.polling_strategy; + let configured_strategy = self.polling_strategy; + let fallback_to_first = self.fallback_to_first.clone(); let client = self.client.clone(); let count = self.batch_length; let auto_commit_after_polling = self.auto_commit_after_polling; @@ -676,6 +684,12 @@ impl IggyConsumer { sleep(retry_interval.get_duration()).await; } + let polling_strategy = if fallback_to_first.load(ORDERING) { + PollingStrategy::first() + } else { + configured_strategy + }; + trace!("Sending poll messages request"); last_polled_at.store(IggyTimestamp::now().into(), ORDERING); let polled_messages = client @@ -696,6 +710,7 @@ impl IggyConsumer { if polled_messages.messages.is_empty() { return Ok(polled_messages); } + fallback_to_first.store(false, ORDERING); let partition_id = polled_messages.partition_id; let consumed_offset; @@ -783,6 +798,18 @@ impl IggyConsumer { let error = polled_messages.unwrap_err(); error!("Failed to poll messages: {error}"); + // When the consumer group's stored offset falls below the topic's + // earliest available offset (e.g. after retention removes old + // segments), seek to the first available message on the next poll + // instead of looping forever at the invalid offset. + if matches!(error, IggyError::InvalidOffset(_)) { + warn!( + "Consumer offset is before the earliest available message in topic: {topic_id}, stream: {stream_id}. \ + Falling back to first available offset on next poll." + ); + fallback_to_first.store(true, ORDERING); + } + // Handle connection/auth errors - disable polling until event task re-enables // it after reconnection and rejoin complete if matches!( From abd030597f29d15d5549a193278aed7661dc003f Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 11:15:13 +0300 Subject: [PATCH 2/4] fix(sdk): track InvalidOffset fallback per partition, not per consumer A single IggyConsumer can be assigned multiple partitions by the consumer group. The previous AtomicBool was global: an InvalidOffset on partition A would force PollingStrategy::first() on all partitions (including B, C, ...) on the next poll, causing unnecessary rewinds. Replace with DashMap keyed by partition_id so only the affected partition triggers a first()-recovery. u32::MAX is the sentinel for auto-assign consumers (partition_id = None) where the server determines the assigned partition; behavior for those is unchanged from before. Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01Qb1ctTeXahLw5EWWHP69gK --- core/sdk/src/clients/consumer.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 941b0b4dda..ff391b7204 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -111,12 +111,12 @@ pub struct IggyConsumer { topic_id: Arc, partition_id: Option, polling_strategy: PollingStrategy, - // Set to true when InvalidOffset is received so the next poll uses - // PollingStrategy::first() instead of the configured strategy, recovering - // from the case where the consumer group's stored offset falls below the - // topic's earliest available offset (e.g. after retention purges old - // segments). Cleared after the first successful recovery poll. - fallback_to_first: Arc, + // Per-partition recovery set: when a partition's stored offset falls below + // its earliest available offset, its ID is inserted here so the next poll + // for that partition uses PollingStrategy::first(). Removed after the first + // successful recovery poll. Keyed by partition_id; u32::MAX is the sentinel + // for consumers with no fixed partition (consumer-group auto-assign). + fallback_to_first: Arc>, poll_interval_micros: u64, batch_length: u32, auto_commit: AutoCommit, @@ -176,7 +176,7 @@ impl IggyConsumer { topic_id: Arc::new(topic_id), partition_id, polling_strategy, - fallback_to_first: Arc::new(AtomicBool::new(false)), + fallback_to_first: Arc::new(DashMap::new()), poll_interval_micros: polling_interval.map_or(0, |interval| interval.as_micros()), last_stored_offsets: Arc::new(DashMap::new()), last_consumed_offsets: Arc::new(DashMap::new()), @@ -684,7 +684,8 @@ impl IggyConsumer { sleep(retry_interval.get_duration()).await; } - let polling_strategy = if fallback_to_first.load(ORDERING) { + let effective_pid = partition_id.unwrap_or(u32::MAX); + let polling_strategy = if fallback_to_first.contains_key(&effective_pid) { PollingStrategy::first() } else { configured_strategy @@ -710,7 +711,7 @@ impl IggyConsumer { if polled_messages.messages.is_empty() { return Ok(polled_messages); } - fallback_to_first.store(false, ORDERING); + fallback_to_first.remove(&effective_pid); let partition_id = polled_messages.partition_id; let consumed_offset; @@ -807,7 +808,7 @@ impl IggyConsumer { "Consumer offset is before the earliest available message in topic: {topic_id}, stream: {stream_id}. \ Falling back to first available offset on next poll." ); - fallback_to_first.store(true, ORDERING); + fallback_to_first.insert(effective_pid, ()); } // Handle connection/auth errors - disable polling until event task re-enables From e2bcc2be58b0c72c624f95ed8b035b66bfb5af1b Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 15:06:51 +0300 Subject: [PATCH 3/4] fix(sdk/consumer): pre-arm first-poll fallback for new consumer groups When a consumer group is freshly created its server-side stored offset is 0. If the topic has had retention run, offset 0 is no longer valid and the very first poll with PollingStrategy::Next returns InvalidOffset. The existing fallback (PR #3525) detected this on the first failed poll and recovered on the second. This meant every new consumer group always logged one InvalidOffset error on startup before settling. Fix: initialize_consumer_group now returns a bool indicating whether the group was newly created. Callers set fallback_to_first = true in that case so the very first poll uses PollingStrategy::First (earliest available) instead of Next, eliminating the guaranteed startup error. Rejoin paths (subscribe_events) receive the same treatment so a group created during a reconnect also starts clean. Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01Qb1ctTeXahLw5EWWHP69gK --- core/sdk/src/clients/consumer.rs | 49 ++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index ff391b7204..5f326dfe0d 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -528,7 +528,7 @@ impl IggyConsumer { self.consumer ); - Self::initialize_consumer_group( + let newly_created = Self::initialize_consumer_group( self.client.clone(), self.create_consumer_group_if_not_exists, self.stream_id.clone(), @@ -537,7 +537,17 @@ impl IggyConsumer { &self.consumer_name, self.joined_consumer_group.clone(), ) - .await + .await?; + + // A brand-new consumer group starts at stored offset 0. If the topic has + // had retention run, offset 0 may no longer exist. Pre-arm the fallback + // so the very first poll uses PollingStrategy::first() instead of Next, + // avoiding a guaranteed InvalidOffset error on startup. + if newly_created { + self.fallback_to_first.store(true, ORDERING); + } + + Ok(()) } async fn subscribe_events(&self) { @@ -558,6 +568,7 @@ impl IggyConsumer { let consumer_name = self.consumer_name.clone(); let can_poll = self.can_poll.clone(); let joined_consumer_group = self.joined_consumer_group.clone(); + let fallback_to_first = self.fallback_to_first.clone(); let mut reconnected = false; let mut disconnected = false; @@ -615,7 +626,7 @@ impl IggyConsumer { info!( "Rejoining consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}..." ); - if let Err(error) = Self::initialize_consumer_group( + match Self::initialize_consumer_group( client.clone(), create_consumer_group_if_not_exists, stream_id.clone(), @@ -626,10 +637,17 @@ impl IggyConsumer { ) .await { - error!( - "Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}" - ); - continue; + Err(error) => { + error!( + "Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}" + ); + continue; + } + Ok(newly_created) => { + if newly_created { + fallback_to_first.store(true, ORDERING); + } + } } info!( "Rejoined consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}" @@ -855,6 +873,10 @@ impl IggyConsumer { sleep(Duration::from_micros(remaining)).await; } + // Returns true if the consumer group was freshly created (stored offset = 0, + // which may be below the topic's earliest available offset after retention). + // Callers use this to pre-arm fallback_to_first so the very first poll uses + // PollingStrategy::first() and avoids an InvalidOffset error. async fn initialize_consumer_group( client: IggyRwLock, create_consumer_group_if_not_exists: bool, @@ -863,9 +885,9 @@ impl IggyConsumer { consumer: Arc, consumer_name: &str, joined_consumer_group: Arc, - ) -> Result<(), IggyError> { + ) -> Result { if joined_consumer_group.load(ORDERING) { - return Ok(()); + return Ok(false); } let client = client.read().await; @@ -878,7 +900,7 @@ impl IggyConsumer { trace!( "Validating consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}" ); - if client + let newly_created = if client .get_consumer_group(&stream_id, &topic_id, &consumer_group_id) .await? .is_none() @@ -908,7 +930,10 @@ impl IggyConsumer { return Err(error); } } - } + true + } else { + false + }; info!( "Joining consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}", @@ -928,7 +953,7 @@ impl IggyConsumer { info!( "Joined consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}" ); - Ok(()) + Ok(newly_created) } } From e72898b380d3f6b24bcc07e695ddbd032c38b08c Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 18:46:12 +0300 Subject: [PATCH 4/4] fix(sdk/consumer): use last() instead of first() for InvalidOffset recovery PollingStrategy::first() maps to PollingKind::First on the server, which returned InvalidOffset(0) via the TCP binary protocol even when offset 0 exists (HTTP path works). Root cause unclear -- likely a server-side auto_commit + TCP handler interaction. PollingStrategy::last() always succeeds since the most recently written message is always in an active segment. After the recovery poll commits the latest offset, subsequent polls continue from there normally. TBD: investigate why first() returns InvalidOffset(0) over TCP while HTTP works fine. Once resolved, prefer first() to avoid skipping history. Co-Authored-By: Claude Sonnet 4.6 --- core/sdk/src/clients/consumer.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 5f326dfe0d..2b5aba0a98 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -113,10 +113,10 @@ pub struct IggyConsumer { polling_strategy: PollingStrategy, // Per-partition recovery set: when a partition's stored offset falls below // its earliest available offset, its ID is inserted here so the next poll - // for that partition uses PollingStrategy::first(). Removed after the first + // for that partition uses PollingStrategy::last(). Removed after the first // successful recovery poll. Keyed by partition_id; u32::MAX is the sentinel // for consumers with no fixed partition (consumer-group auto-assign). - fallback_to_first: Arc>, + fallback_to_last: Arc>, poll_interval_micros: u64, batch_length: u32, auto_commit: AutoCommit, @@ -176,7 +176,7 @@ impl IggyConsumer { topic_id: Arc::new(topic_id), partition_id, polling_strategy, - fallback_to_first: Arc::new(DashMap::new()), + fallback_to_last: Arc::new(DashMap::new()), poll_interval_micros: polling_interval.map_or(0, |interval| interval.as_micros()), last_stored_offsets: Arc::new(DashMap::new()), last_consumed_offsets: Arc::new(DashMap::new()), @@ -541,10 +541,10 @@ impl IggyConsumer { // A brand-new consumer group starts at stored offset 0. If the topic has // had retention run, offset 0 may no longer exist. Pre-arm the fallback - // so the very first poll uses PollingStrategy::first() instead of Next, + // so the very first poll uses PollingStrategy::last() instead of Next, // avoiding a guaranteed InvalidOffset error on startup. if newly_created { - self.fallback_to_first.store(true, ORDERING); + self.fallback_to_last.insert(u32::MAX, ()); } Ok(()) @@ -568,7 +568,7 @@ impl IggyConsumer { let consumer_name = self.consumer_name.clone(); let can_poll = self.can_poll.clone(); let joined_consumer_group = self.joined_consumer_group.clone(); - let fallback_to_first = self.fallback_to_first.clone(); + let fallback_to_last = self.fallback_to_last.clone(); let mut reconnected = false; let mut disconnected = false; @@ -645,7 +645,7 @@ impl IggyConsumer { } Ok(newly_created) => { if newly_created { - fallback_to_first.store(true, ORDERING); + fallback_to_last.insert(u32::MAX, ()); } } } @@ -671,7 +671,7 @@ impl IggyConsumer { let partition_id = self.partition_id; let consumer = self.consumer.clone(); let configured_strategy = self.polling_strategy; - let fallback_to_first = self.fallback_to_first.clone(); + let fallback_to_last = self.fallback_to_last.clone(); let client = self.client.clone(); let count = self.batch_length; let auto_commit_after_polling = self.auto_commit_after_polling; @@ -703,8 +703,8 @@ impl IggyConsumer { } let effective_pid = partition_id.unwrap_or(u32::MAX); - let polling_strategy = if fallback_to_first.contains_key(&effective_pid) { - PollingStrategy::first() + let polling_strategy = if fallback_to_last.contains_key(&effective_pid) { + PollingStrategy::last() } else { configured_strategy }; @@ -729,7 +729,7 @@ impl IggyConsumer { if polled_messages.messages.is_empty() { return Ok(polled_messages); } - fallback_to_first.remove(&effective_pid); + fallback_to_last.remove(&effective_pid); let partition_id = polled_messages.partition_id; let consumed_offset; @@ -819,14 +819,14 @@ impl IggyConsumer { // When the consumer group's stored offset falls below the topic's // earliest available offset (e.g. after retention removes old - // segments), seek to the first available message on the next poll + // segments), seek to the most recent message on the next poll // instead of looping forever at the invalid offset. if matches!(error, IggyError::InvalidOffset(_)) { warn!( "Consumer offset is before the earliest available message in topic: {topic_id}, stream: {stream_id}. \ - Falling back to first available offset on next poll." + Falling back to latest offset on next poll." ); - fallback_to_first.insert(effective_pid, ()); + fallback_to_last.insert(effective_pid, ()); } // Handle connection/auth errors - disable polling until event task re-enables @@ -875,8 +875,8 @@ impl IggyConsumer { // Returns true if the consumer group was freshly created (stored offset = 0, // which may be below the topic's earliest available offset after retention). - // Callers use this to pre-arm fallback_to_first so the very first poll uses - // PollingStrategy::first() and avoids an InvalidOffset error. + // Callers use this to pre-arm fallback_to_last so the very first poll uses + // PollingStrategy::last() and avoids an InvalidOffset error. async fn initialize_consumer_group( client: IggyRwLock, create_consumer_group_if_not_exists: bool,