Skip to content
79 changes: 66 additions & 13 deletions core/sdk/src/clients/consumer.rs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is per-consumer, not per-partition. With consumer groups assigned
multiple partitions, an InvalidOffset on partition A would force
PollingStrategy::first() on partition B on the next poll, potentially
re-reading from the start of an unrelated partition. I think this should be
a DashMap<u32, AtomicBool> keyed by partition_id instead?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed: Arc<AtomicBool> replaced with Arc<DashMap<u32, ()>> keyed by partition_id. InvalidOffset on partition A no longer touches other partitions. For auto-assign consumers (partition_id = None) the sentinel u32::MAX is used. The PR has been rebased and now contains only this single-file fix in core/sdk/src/clients/consumer.rs.

Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ pub struct IggyConsumer {
topic_id: Arc<Identifier>,
partition_id: Option<u32>,
polling_strategy: PollingStrategy,
// Per-partition recovery set: when a partition's stored offset falls below
// its earliest available offset, its ID is inserted here so the next poll
// for that partition uses PollingStrategy::last(). Removed after the first
// successful recovery poll. Keyed by partition_id; u32::MAX is the sentinel
// for consumers with no fixed partition (consumer-group auto-assign).
fallback_to_last: Arc<DashMap<u32, ()>>,
poll_interval_micros: u64,
batch_length: u32,
auto_commit: AutoCommit,
Expand Down Expand Up @@ -170,6 +176,7 @@ impl IggyConsumer {
topic_id: Arc::new(topic_id),
partition_id,
polling_strategy,
fallback_to_last: Arc::new(DashMap::new()),
poll_interval_micros: polling_interval.map_or(0, |interval| interval.as_micros()),
last_stored_offsets: Arc::new(DashMap::new()),
last_consumed_offsets: Arc::new(DashMap::new()),
Expand Down Expand Up @@ -521,7 +528,7 @@ impl IggyConsumer {
self.consumer
);

Self::initialize_consumer_group(
let newly_created = Self::initialize_consumer_group(
self.client.clone(),
self.create_consumer_group_if_not_exists,
self.stream_id.clone(),
Expand All @@ -530,7 +537,17 @@ impl IggyConsumer {
&self.consumer_name,
self.joined_consumer_group.clone(),
)
.await
.await?;

// A brand-new consumer group starts at stored offset 0. If the topic has
// had retention run, offset 0 may no longer exist. Pre-arm the fallback
// so the very first poll uses PollingStrategy::last() instead of Next,
// avoiding a guaranteed InvalidOffset error on startup.
if newly_created {
self.fallback_to_last.insert(u32::MAX, ());
}

Ok(())
}

async fn subscribe_events(&self) {
Expand All @@ -551,6 +568,7 @@ impl IggyConsumer {
let consumer_name = self.consumer_name.clone();
let can_poll = self.can_poll.clone();
let joined_consumer_group = self.joined_consumer_group.clone();
let fallback_to_last = self.fallback_to_last.clone();
let mut reconnected = false;
let mut disconnected = false;

Expand Down Expand Up @@ -608,7 +626,7 @@ impl IggyConsumer {
info!(
"Rejoining consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}..."
);
if let Err(error) = Self::initialize_consumer_group(
match Self::initialize_consumer_group(
client.clone(),
create_consumer_group_if_not_exists,
stream_id.clone(),
Expand All @@ -619,10 +637,17 @@ impl IggyConsumer {
)
.await
{
error!(
"Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}"
);
continue;
Err(error) => {
error!(
"Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}"
);
continue;
}
Ok(newly_created) => {
if newly_created {
fallback_to_last.insert(u32::MAX, ());
}
}
}
info!(
"Rejoined consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}"
Expand All @@ -645,7 +670,8 @@ impl IggyConsumer {
let topic_id = self.topic_id.clone();
let partition_id = self.partition_id;
let consumer = self.consumer.clone();
let polling_strategy = self.polling_strategy;
let configured_strategy = self.polling_strategy;
let fallback_to_last = self.fallback_to_last.clone();
let client = self.client.clone();
let count = self.batch_length;
let auto_commit_after_polling = self.auto_commit_after_polling;
Expand Down Expand Up @@ -676,6 +702,13 @@ impl IggyConsumer {
sleep(retry_interval.get_duration()).await;
}

let effective_pid = partition_id.unwrap_or(u32::MAX);
let polling_strategy = if fallback_to_last.contains_key(&effective_pid) {
PollingStrategy::last()
} else {
configured_strategy
};

trace!("Sending poll messages request");
last_polled_at.store(IggyTimestamp::now().into(), ORDERING);
let polled_messages = client
Expand All @@ -696,6 +729,7 @@ impl IggyConsumer {
if polled_messages.messages.is_empty() {
return Ok(polled_messages);
}
fallback_to_last.remove(&effective_pid);

let partition_id = polled_messages.partition_id;
let consumed_offset;
Expand Down Expand Up @@ -783,6 +817,18 @@ impl IggyConsumer {
let error = polled_messages.unwrap_err();
error!("Failed to poll messages: {error}");

// When the consumer group's stored offset falls below the topic's
// earliest available offset (e.g. after retention removes old
// segments), seek to the most recent message on the next poll
// instead of looping forever at the invalid offset.
if matches!(error, IggyError::InvalidOffset(_)) {
warn!(
"Consumer offset is before the earliest available message in topic: {topic_id}, stream: {stream_id}. \
Falling back to latest offset on next poll."
);
fallback_to_last.insert(effective_pid, ());
}

// Handle connection/auth errors - disable polling until event task re-enables
// it after reconnection and rejoin complete
if matches!(
Expand Down Expand Up @@ -827,6 +873,10 @@ impl IggyConsumer {
sleep(Duration::from_micros(remaining)).await;
}

// Returns true if the consumer group was freshly created (stored offset = 0,
// which may be below the topic's earliest available offset after retention).
// Callers use this to pre-arm fallback_to_last so the very first poll uses
// PollingStrategy::last() and avoids an InvalidOffset error.
async fn initialize_consumer_group(
client: IggyRwLock<ClientWrapper>,
create_consumer_group_if_not_exists: bool,
Expand All @@ -835,9 +885,9 @@ impl IggyConsumer {
consumer: Arc<Consumer>,
consumer_name: &str,
joined_consumer_group: Arc<AtomicBool>,
) -> Result<(), IggyError> {
) -> Result<bool, IggyError> {
if joined_consumer_group.load(ORDERING) {
return Ok(());
return Ok(false);
}

let client = client.read().await;
Expand All @@ -850,7 +900,7 @@ impl IggyConsumer {
trace!(
"Validating consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}"
);
if client
let newly_created = if client
.get_consumer_group(&stream_id, &topic_id, &consumer_group_id)
.await?
.is_none()
Expand Down Expand Up @@ -880,7 +930,10 @@ impl IggyConsumer {
return Err(error);
}
}
}
true
} else {
false
};

info!(
"Joining consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}",
Expand All @@ -900,7 +953,7 @@ impl IggyConsumer {
info!(
"Joined consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}"
);
Ok(())
Ok(newly_created)
}
}

Expand Down
Loading