diff --git a/Cargo.lock b/Cargo.lock index be96205a9e..acd1998d82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9283,6 +9283,7 @@ dependencies = [ "iggy_common", "journal", "message_bus", + "papaya", "ringbuffer", "server_common", "smallvec", diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index ed2040e536..98eab35a89 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -69,6 +69,8 @@ pub mod snapshot_scenario; pub mod stale_client_consumer_group_scenario; #[cfg(not(feature = "vsr"))] pub mod stream_size_validation_scenario; +#[cfg(feature = "vsr")] +pub mod stress_produce_consume_scenario; #[cfg(not(feature = "vsr"))] pub mod system_scenario; #[cfg(not(feature = "vsr"))] diff --git a/core/integration/tests/server/scenarios/stress_produce_consume_scenario.rs b/core/integration/tests/server/scenarios/stress_produce_consume_scenario.rs new file mode 100644 index 0000000000..507df4300b --- /dev/null +++ b/core/integration/tests/server/scenarios/stress_produce_consume_scenario.rs @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Data-plane produce+consume stress across server-ng topologies: `PRODUCERS` +//! producers and `CONSUMERS` consumers all hammer a SINGLE partition, asserting +//! no message loss and a strictly contiguous offset log. +//! +//! Targets the partition-ref-across-await UB fix: the consume poll path and the +//! produce/commit pump run as sibling tasks over the same partition, so +//! concentrating every producer and consumer on one partition maximizes the +//! `&`/`&mut` aliasing window on that partition's pump that the fix closes. All +//! producers run concurrently with all consumers for `HAMMER_DURATION`; the +//! consumers then drain. A single partition lives on a single shard, so the +//! multi-shard variants still spin up N shards but concentrate the load on the +//! one owning shard. +//! +//! Oracle: producers interleave on the partition's shared offset sequence, so +//! per-producer contiguity does not hold. Instead every consumer reads the +//! partition in full and must observe a contiguous `0..total` (no gap = no loss, +//! no dup) with a count equal to the sum of all producers' sends. +//! +//! Strictly data plane: polls by explicit offset with `auto_commit = false` and +//! performs no mid-run topic/partition mutation, so it never drives the metadata +//! consensus plane concurrently. That avoids a separate, still-open `on_ack` +//! journal-durability race that panics the primary under concurrent metadata ops +//! (see the gated `concurrent_produce_consume_scenario` in `scenarios/mod.rs`). + +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::TestHarness; +use integration::iggy_harness; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, Instant}; + +const STREAM_NAME: &str = "stress-pc-stream"; +const TOPIC_NAME: &str = "stress-pc-topic"; +// All traffic targets this one partition to concentrate produce+consume +// contention on a single partition pump. +const PARTITION_ID: u32 = 0; +const PRODUCERS: u32 = 4; +const CONSUMERS: u32 = 4; +const PRODUCER_BATCH: u32 = 16; +const CONSUMER_BATCH: u32 = 64; +const HAMMER_DURATION: Duration = Duration::from_secs(20); +// Safety net so a wedged consumer fails loudly instead of hanging the suite. +const MAX_TEST_DURATION: Duration = Duration::from_secs(120); +// Whole-test wall-clock guard. A server that dies at boot leaves the harness +// client retrying connect with no cap, and a parked poll never re-checks +// MAX_TEST_DURATION, so without this the suite hangs indefinitely instead of +// failing. Set above MAX_TEST_DURATION so a slow-but-progressing consumer +// trips its own informative deadline first. +const WALL_CLOCK_TIMEOUT: Duration = Duration::from_secs(150); +// Empty polls observed after producers stop before the partition is declared drained. +const DRAIN_EMPTY_POLLS: u32 = 20; + +/// Single-node, single shard (`"1"`) and multi shard (`"2"`). +#[iggy_harness( + cluster_nodes = 1, + server(system.sharding.cpu_allocation = ["1", "2"]) +)] +async fn given_single_node_when_produce_consume_hammered_should_not_lose_messages( + harness: &TestHarness, +) { + run_hammer(harness).await; +} + +/// Three-node cluster, single shard (`"1"`) and multi shard (`"2"`) per node. +/// Heavy (3 servers * N shards); run on demand with `--ignored`. +#[iggy_harness( + cluster_nodes = 3, + server(system.sharding.cpu_allocation = ["1", "2"]) +)] +#[ignore = "3-node cluster: heavy, run on demand with --ignored"] +async fn given_cluster_when_produce_consume_hammered_should_not_lose_messages( + harness: &TestHarness, +) { + run_hammer(harness).await; +} + +async fn run_hammer(harness: &TestHarness) { + tokio::time::timeout(WALL_CLOCK_TIMEOUT, run_hammer_inner(harness)) + .await + .expect("stress test exceeded WALL_CLOCK_TIMEOUT; server likely crashed at boot or a poll wedged"); +} + +async fn run_hammer_inner(harness: &TestHarness) { + let stream_id = Identifier::named(STREAM_NAME).unwrap(); + + let setup = harness.tcp_root_client().await.unwrap(); + setup.create_stream(STREAM_NAME).await.unwrap(); + setup + .create_topic( + &stream_id, + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + drop(setup); + + let producer_done = Arc::new(AtomicBool::new(false)); + + // Spawn consumers first so they poll concurrently with the producers from + // the very first send. Each reads the whole partition independently by + // explicit offset and asserts strict contiguity (no gap = no loss, no dup). + let mut consumers = Vec::with_capacity(CONSUMERS as usize); + for consumer_id in 0..CONSUMERS { + let client = harness.tcp_root_client().await.unwrap(); + let done = producer_done.clone(); + consumers.push(tokio::spawn(consume_partition(client, consumer_id, done))); + } + + // All producers hammer the single partition for HAMMER_DURATION. + let mut producers = Vec::with_capacity(PRODUCERS as usize); + for producer_id in 0..PRODUCERS { + let client = harness.tcp_root_client().await.unwrap(); + producers.push(tokio::spawn(produce_partition(client, producer_id))); + } + + // Producers stop at the hammer deadline; sum their sends, then signal + // consumers to drain. + let mut total_sent = 0u64; + for handle in producers { + total_sent += handle.await.unwrap(); + } + producer_done.store(true, Ordering::Relaxed); + + assert!( + total_sent > 0, + "hammer produced no messages; workload wiring is broken" + ); + + // Every consumer independently read the full partition; each must have seen + // exactly the committed total, contiguously (asserted inside the task). + for (consumer_id, handle) in consumers.into_iter().enumerate() { + let received = handle.await.unwrap(); + assert_eq!( + received, total_sent, + "consumer {consumer_id}: consumed {received} != produced {total_sent} (message loss)", + ); + } + + let cleanup = harness.tcp_root_client().await.unwrap(); + cleanup.delete_stream(&stream_id).await.unwrap(); +} + +/// Send `PRODUCER_BATCH`-sized batches to the shared partition until the hammer +/// deadline. Returns the number of messages sent (each send awaits commit). +async fn produce_partition(client: IggyClient, producer_id: u32) -> u64 { + let stream = Identifier::named(STREAM_NAME).unwrap(); + let topic = Identifier::named(TOPIC_NAME).unwrap(); + let partitioning = Partitioning::partition_id(PARTITION_ID); + let deadline = Instant::now() + HAMMER_DURATION; + let mut sent = 0u64; + + while Instant::now() < deadline { + let mut messages: Vec = (0..PRODUCER_BATCH) + .map(|i| { + IggyMessage::builder() + .payload(Bytes::from(format!( + "prod{producer_id}-{}", + sent + u64::from(i) + ))) + .build() + .unwrap() + }) + .collect(); + client + .send_messages(&stream, &topic, &partitioning, &mut messages) + .await + .unwrap_or_else(|e| panic!("producer {producer_id} send failed at sent={sent}: {e}")); + sent += u64::from(PRODUCER_BATCH); + } + sent +} + +/// Read the shared partition in full by explicit offset with `auto_commit = +/// false`, asserting each message arrives at the next contiguous offset. Drains +/// until producers are done and `DRAIN_EMPTY_POLLS` consecutive empty polls +/// confirm the tail. Returns the number of messages received. +async fn consume_partition( + client: IggyClient, + consumer_id: u32, + producer_done: Arc, +) -> u64 { + let stream = Identifier::named(STREAM_NAME).unwrap(); + let topic = Identifier::named(TOPIC_NAME).unwrap(); + let consumer = Consumer::default(); + let mut next_offset = 0u64; + let mut received = 0u64; + let mut consecutive_empty = 0u32; + let deadline = Instant::now() + MAX_TEST_DURATION; + + loop { + assert!( + Instant::now() < deadline, + "consumer {consumer_id} timed out: received {received}, next_offset {next_offset}" + ); + + let polled = match client + .poll_messages( + &stream, + &topic, + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(next_offset), + CONSUMER_BATCH, + false, + ) + .await + { + Ok(polled) => polled, + Err(e) => { + // Transient under load; back off and retry. + eprintln!("consumer {consumer_id} poll error: {e:?}"); + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } + }; + + if polled.messages.is_empty() { + if producer_done.load(Ordering::Relaxed) { + consecutive_empty += 1; + if consecutive_empty >= DRAIN_EMPTY_POLLS { + break; + } + } + tokio::time::sleep(Duration::from_millis(5)).await; + continue; + } + + consecutive_empty = 0; + for msg in &polled.messages { + assert_eq!( + msg.header.offset, next_offset, + "consumer {consumer_id} offset gap/dup: expected {next_offset}, got {}", + msg.header.offset + ); + next_offset += 1; + received += 1; + } + } + + received +} diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml index 940fef1462..2f40437576 100644 --- a/core/partitions/Cargo.toml +++ b/core/partitions/Cargo.toml @@ -38,6 +38,7 @@ iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } journal = { workspace = true } message_bus = { workspace = true } +papaya = { workspace = true } ringbuffer = { workspace = true } server_common = { workspace = true } smallvec = { workspace = true } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 3250f83110..ec73e1d75e 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -16,21 +16,20 @@ // under the License. use crate::iggy_index_writer::IggyIndexWriter; -use crate::journal::{ - MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal, - select_batch_slice, -}; +use crate::journal::{MessageLookup, PartitionJournal, PartitionJournalMemStorage}; use crate::log::JournalInfo; use crate::log::SegmentedLog; use crate::messages_writer::MessagesWriter; use crate::offset_storage::{delete_persisted_offset, persist_offset}; +use crate::poll_plan::{ + AutoCommitCtx, AutoCommitTarget, DiskReadPlan, DiskSegment, LastPolledCtx, PollPlan, PollTier, + ResidentTailSnapshot, +}; use crate::segment::Segment; -use crate::types::Fragment; use crate::{ - AppendResult, Partition, PartitionOffsets, PartitionsConfig, PollFragments, PollQueryResult, - PollingArgs, PollingConsumer, + AppendResult, Partition, PartitionOffsets, PartitionsConfig, PollQueryResult, PollingArgs, + PollingConsumer, }; -use compio::io::AsyncReadAtExt; use consensus::{ CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry, PlaneKind, Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, @@ -53,10 +52,9 @@ use journal::Journal as _; use message_bus::{IggyMessageBus, MessageBus}; use server_common::{ Message, SegmentStorage, - iobuf::{Frozen, Owned}, + iobuf::Frozen, send_messages2::{ - COMMAND_HEADER_SIZE, convert_request_message, decode_batch_slice, decode_prepare_slice, - stamp_prepare_for_persistence, + convert_request_message, decode_prepare_slice, stamp_prepare_for_persistence, }, sharding::IggyNamespace, }; @@ -330,32 +328,24 @@ where if pending.kind == ConsumerKind::Consumer => { let id = pending.consumer_id; - let guard = self.consumer_offsets.pin(); let key = usize::try_from(id).expect("u32 consumer id must fit usize"); - if let Some(existing) = guard.get(&key) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - let created = self.consumer_offsets_path.as_deref().map_or_else( + crate::poll_plan::upsert_offset(&self.consumer_offsets, key, offset, || { + self.consumer_offsets_path.as_deref().map_or_else( || ConsumerOffset::new(ConsumerKind::Consumer, id, 0, String::new()), |path| ConsumerOffset::default_for_consumer(id, path), - ); - created.offset.store(offset, Ordering::Relaxed); - guard.insert(key, created); - } + ) + }); Ok(()) } PendingConsumerOffsetMutation::Upsert(offset) if pending.kind == ConsumerKind::ConsumerGroup => { let group_id = pending.consumer_id; - let guard = self.consumer_group_offsets.pin(); let key = ConsumerGroupId( usize::try_from(group_id).expect("u32 group id must fit usize"), ); - if let Some(existing) = guard.get(&key) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - let created = self.consumer_group_offsets_path.as_deref().map_or_else( + crate::poll_plan::upsert_offset(&self.consumer_group_offsets, key, offset, || { + self.consumer_group_offsets_path.as_deref().map_or_else( || { ConsumerOffset::new( ConsumerKind::ConsumerGroup, @@ -365,10 +355,8 @@ where ) }, |path| ConsumerOffset::default_for_consumer_group(key, path), - ); - created.offset.store(offset, Ordering::Relaxed); - guard.insert(key, created); - } + ) + }); Ok(()) } PendingConsumerOffsetMutation::Delete if pending.kind == ConsumerKind::Consumer => { @@ -409,32 +397,57 @@ where .collect() } - /// Reclaim a deleted consumer group's offset on this partition (in-memory - /// entry + persisted file). A no-op if the group has no stored offset here. + /// Reclaim every stored consumer-group offset whose group id is no longer + /// `is_live`, returning the owned persisted-file paths the caller must unlink. /// - /// # Errors - /// Returns an I/O error if deleting the persisted offset file fails. + /// Fully synchronous (no `.await`): the in-memory papaya remove happens here, + /// the disk unlink is deferred to the caller on owned `String` data so no + /// borrow of `self` survives across the await. This is the only safe shape + /// for the reconciler, which runs on a sibling task to the pump that may + /// realloc the partitions vec during that await. The remove-then-unlink + /// ordering matches the crash-safe GC invariant (monotonic, never-reused + /// group ids mean a recreated group never reads a dead group's offset). + #[must_use] #[allow(clippy::cast_possible_truncation)] - pub async fn delete_consumer_group_offset(&self, group_id: u64) -> Result<(), IggyError> { - self.consumer_group_offsets - .pin() - .remove(&ConsumerGroupId(group_id as usize)); - if let Some(path) = self.persisted_offset_path(ConsumerKind::ConsumerGroup, group_id as u32) - { - delete_persisted_offset(&path).await?; + pub fn reclaim_dead_group_offsets(&self, is_live: impl Fn(u64) -> bool) -> Vec { + let pinned = self.consumer_group_offsets.pin(); + let dead: Vec = pinned + .keys() + .map(|key| key.0 as u64) + .filter(|group_id| !is_live(*group_id)) + .collect(); + let mut paths = Vec::with_capacity(dead.len()); + for group_id in dead { + pinned.remove(&ConsumerGroupId(group_id as usize)); + if let Some(path) = + self.persisted_offset_path(ConsumerKind::ConsumerGroup, group_id as u32) + { + paths.push(path); + } } - Ok(()) + paths } - async fn store_consumer_offset_and_persist( - &self, - consumer: PollingConsumer, - offset: u64, - ) -> Result<(), IggyError> { - let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?; - self.persist_consumer_offset_commit(pending).await?; - self.apply_consumer_offset_commit(pending)?; - Ok(()) + /// Cooperative-rebalance classification: a group's `(last_polled, committed)` + /// offsets on this partition, so the join enrichment can tell an in-flight + /// partition (committed < last-polled) from a never-polled/drained one. + #[must_use] + #[allow(clippy::cast_possible_truncation)] + pub fn group_offset_state(&self, group_id: u64) -> (Option, Option) { + let key = ConsumerGroupId(group_id as usize); + let load = |offset: &ConsumerOffset| offset.offset.load(Ordering::Relaxed); + let last_polled = self.last_polled_offsets.pin().get(&key).map(load); + let committed = self.consumer_group_offsets.pin().get(&key).map(load); + (last_polled, committed) + } + + /// Drop a group's ephemeral `last_polled` mark on this partition (residue of + /// a since-removed member that a later join would misread as a live hold). + #[allow(clippy::cast_possible_truncation)] + pub fn clear_group_last_polled(&self, group_id: u64) { + self.last_polled_offsets + .pin() + .remove(&ConsumerGroupId(group_id as usize)); } /// `AckLevel::NoAck` fast path: persist, apply, send reply, no @@ -543,6 +556,196 @@ where self.pending_consumer_offset_commits.clear(); self.observed_view = current_view; } + + /// Build an owned [`PollPlan`] synchronously (no `.await`), so the caller + /// can run the disk read + offset persist off the partition borrow. The + /// in-memory journal tier is read here directly (mem reads never yield); + /// the disk tier is captured as owned descriptors in [`DiskReadPlan`]. + pub(crate) fn build_poll_plan( + &self, + consumer: PollingConsumer, + args: &PollingArgs, + ) -> PollPlan { + // Reads the durable commit frontier (`self.offset`, stored only on + // commit). Also used below as the poll's high-water bound: this function + // is fully synchronous, so the single load cannot drift mid-plan. + let commit_offset = self.offsets().commit_offset; + if !self.should_increment_offset || args.count == 0 { + return PollPlan { + commit_offset, + auto_commit: None, + last_polled: None, + tier: PollTier::Empty, + }; + } + + let query = match args.strategy.kind { + PollingKind::Timestamp => MessageLookup::Timestamp { + timestamp: args.strategy.value, + count: args.count, + ceiling: commit_offset, + }, + kind => { + let start_offset = match kind { + PollingKind::Offset => args.strategy.value, + PollingKind::First => 0, + PollingKind::Last => commit_offset.saturating_sub(u64::from(args.count) - 1), + PollingKind::Next => self + .get_consumer_offset(consumer) + .map_or(0, |offset| offset + 1), + PollingKind::Timestamp => unreachable!(), + }; + if start_offset > commit_offset { + return PollPlan { + commit_offset, + auto_commit: None, + last_polled: None, + tier: PollTier::Empty, + }; + } + MessageLookup::Offset { + offset: start_offset, + count: args.count, + ceiling: commit_offset, + } + } + }; + + // Past the empty-return guards: only now build the auto-commit context, + // whose offset-path `format!()` is wasted on the early returns above. + let auto_commit = self.auto_commit_ctx(consumer, args.auto_commit); + // Cooperative-rebalance: record the highest offset served to a group so + // the drain reconciler can tell committed >= last-polled. Captured here + // as an owned `Arc` and applied off the borrow in `PollPlan::execute`, + // since the served offset is unknown until the poll completes. + let last_polled = match consumer { + PollingConsumer::ConsumerGroup(group_id, _) => Some(LastPolledCtx { + offsets: self.last_polled_offsets.clone(), + group_id, + }), + PollingConsumer::Consumer(..) => None, + }; + + let serve_journal_first = match query { + MessageLookup::Offset { offset, .. } => self + .log + .journal() + .inner + .oldest_resident_offset() + .is_some_and(|oldest| offset >= oldest), + MessageLookup::Timestamp { .. } => !self.has_persisted_segment_bytes(), + }; + + if serve_journal_first { + let tier = match self.journal_get_sync(&query) { + Some((fragments, last_matching_offset)) => PollTier::Resident { + fragments, + last_matching_offset, + }, + None => PollTier::Empty, + }; + return PollPlan { + commit_offset, + auto_commit, + last_polled, + tier, + }; + } + + let (start_segment, start_position) = self.disk_poll_start(&query); + // Snapshot only the segments the disk walk visits (`start_segment..`), + // so `start_position` applies to the first snapshotted segment. + let segments = self.log.segments()[start_segment..] + .iter() + .map(|segment| DiskSegment { + start_offset: segment.start_offset, + persisted: segment.size.as_bytes_u64(), + }) + .collect(); + let disk = DiskReadPlan { + partition_dir: self.partition_dir(), + segments, + start_position, + namespace_raw: self.namespace().inner(), + }; + // Snapshot the resident journal tail now (on the pump, under the + // borrow) so the straddle splice runs off-task on owned data with no + // partition reference. Point-in-time, so immune to a concurrent commit + // evicting the run just past the disk match. + let resident_tail = self.resident_tail_snapshot(); + PollPlan { + commit_offset, + auto_commit, + last_polled, + tier: PollTier::Disk { + disk, + query, + resident_tail, + }, + } + } + + /// Capture the owned inputs for an auto-commit, if requested. The committed + /// offset is unknown until the poll completes, so the persist path + fsync + /// flag (for the disk write) and the lock-free offset-map `Arc` (for the + /// in-memory apply) are captured here, so both run off the partition borrow. + fn auto_commit_ctx( + &self, + consumer: PollingConsumer, + auto_commit: bool, + ) -> Option { + if !auto_commit { + return None; + } + let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, 0).ok()?; + let offset_path = self.persisted_offset_path(pending.kind, pending.consumer_id); + let target = match pending.kind { + ConsumerKind::Consumer => AutoCommitTarget::Consumer { + offsets: self.consumer_offsets.clone(), + consumer_id: pending.consumer_id, + create_path: self.consumer_offsets_path.clone(), + }, + ConsumerKind::ConsumerGroup => AutoCommitTarget::ConsumerGroup { + offsets: self.consumer_group_offsets.clone(), + group_id: pending.consumer_id, + create_path: self.consumer_group_offsets_path.clone(), + }, + }; + Some(AutoCommitCtx { + offset_path, + enforce_fsync: self.consumer_offset_enforce_fsync, + target, + }) + } + + /// Synchronous in-memory journal poll, for the resident tier. Never awaits + /// (see [`PartitionJournal::get_sync`]), so it is safe under a partition + /// borrow. + pub(crate) fn journal_get_sync(&self, query: &MessageLookup) -> Option> { + self.log.journal().inner.get_sync(query) + } + + /// Snapshot the resident journal tail (oldest resident offset + op-ascending + /// entry clones) for the disk-tier straddle continuation. Taken + /// synchronously under the partition borrow so the splice runs off-task on + /// owned data; see [`ResidentTailSnapshot`]. + fn resident_tail_snapshot(&self) -> ResidentTailSnapshot { + let journal = &self.log.journal().inner; + let oldest_resident = journal.oldest_resident_offset(); + // Only clone the entries (a Vec + per-entry `Frozen` refcount bumps) + // when a resident tail actually exists. A fully drained journal yields + // `None`, and an empty `entries` makes `select_resident` return `None` + // (empty poll) on both the straddle and retention-recovery paths. + let entries = if oldest_resident.is_some() { + journal.resident_entries() + } else { + Vec::new() + }; + ResidentTailSnapshot { + oldest_resident, + entries, + } + } } impl Partition for IggyPartition @@ -617,93 +820,6 @@ where )) } - async fn poll_messages( - &self, - consumer: PollingConsumer, - args: PollingArgs, - ) -> Result, IggyError> { - if !self.should_increment_offset || args.count == 0 { - return Ok((PollFragments::new(), None)); - } - - let write_offset = self.offset.load(Ordering::Acquire); - - let result = match args.strategy.kind { - PollingKind::Timestamp => { - self.lookup_messages(MessageLookup::Timestamp { - timestamp: args.strategy.value, - count: args.count, - }) - .await - } - kind => { - let start_offset = match kind { - PollingKind::Offset => args.strategy.value, - PollingKind::First => 0, - PollingKind::Last => write_offset.saturating_sub(u64::from(args.count) - 1), - PollingKind::Next => self - .get_consumer_offset(consumer) - .map_or(0, |offset| offset + 1), - PollingKind::Timestamp => unreachable!(), - }; - - if start_offset > write_offset { - return Ok((PollFragments::new(), None)); - } - - self.lookup_messages(MessageLookup::Offset { - offset: start_offset, - count: args.count, - }) - .await - } - }; - - let (fragments, last_matching_offset) = - result.unwrap_or_else(|| (PollFragments::new(), None)); - - // Record the highest offset served to a consumer group, so the - // cooperative-rebalance reconciler knows when a pending-revoked - // partition has been fully drained (committed >= last polled). - if let (PollingConsumer::ConsumerGroup(group_id, _), Some(last_offset)) = - (consumer, last_matching_offset) - { - let guard = self.last_polled_offsets.pin(); - let key = ConsumerGroupId(group_id); - if let Some(existing) = guard.get(&key) { - existing.offset.fetch_max(last_offset, Ordering::Relaxed); - } else { - let created = ConsumerOffset::new( - ConsumerKind::ConsumerGroup, - u32::try_from(group_id).unwrap_or(u32::MAX), - last_offset, - String::new(), - ); - guard.insert(key, created); - } - } - - if args.auto_commit && !fragments.is_empty() { - let last_offset = - last_matching_offset.expect("non-empty poll result must have a last offset"); - if let Err(err) = self - .store_consumer_offset_and_persist(consumer, last_offset) - .await - { - // warning for now. - warn!( - target: "iggy.partitions.diag", - consumer = ?consumer, - last_offset, - %err, - "poll_messages: failed to store consumer offset" - ); - } - } - - Ok((fragments, last_matching_offset)) - } - #[allow(clippy::cast_possible_truncation)] fn store_consumer_offset( &self, @@ -747,60 +863,6 @@ where IggyNamespace::from_raw(self.consensus.namespace()) } - /// Resolve a poll query against the in-memory journal, falling back to - /// the on-disk segments for ranges the journal no longer holds (the - /// persist threshold drains committed batches to segment files). - /// - /// A query is served from exactly one tier per call: a poll that starts - /// below the journal's oldest resident offset reads from disk only, and - /// the client's next poll (advancing past what was returned) eventually - /// crosses back into the resident range. Timestamp queries try disk - /// first whenever segments hold persisted bytes -- older matches always - /// live there -- and fall back to the journal when the disk has none. - async fn lookup_messages(&self, query: MessageLookup) -> Option> { - let serve_journal_first = match query { - MessageLookup::Offset { offset, .. } => self - .log - .journal() - .inner - .oldest_resident_offset() - .is_some_and(|oldest| offset >= oldest), - MessageLookup::Timestamp { .. } => !self.has_persisted_segment_bytes(), - }; - - if serve_journal_first { - return self.log.journal().inner.get(&query).await; - } - match self.poll_from_disk(query).await { - Some((mut fragments, last_matching_offset, matched)) => { - // A poll can straddle the tiers: older messages already - // drained to segments, the tail still journal-resident. - // Continue past the last disk match by offset (timestamp - // matches are contiguous from the first hit, so an offset - // continuation is equivalent). - let remaining = query.count().saturating_sub(matched); - if remaining > 0 - && let Some(last_offset) = last_matching_offset - { - let continuation = MessageLookup::Offset { - offset: last_offset + 1, - count: remaining, - }; - if let Some((journal_fragments, journal_last)) = - self.log.journal().inner.get(&continuation).await - { - fragments.extend(journal_fragments); - return Some((fragments, journal_last.or(last_matching_offset))); - } - } - Some((fragments, last_matching_offset)) - } - // Nothing matched on disk (e.g. a timestamp newer than every - // persisted batch): the match, if any, is journal-resident. - None => self.log.journal().inner.get(&query).await, - } - } - fn partition_dir(&self) -> Option { if self.partition_dir.is_some() { return self.partition_dir.clone(); @@ -828,155 +890,6 @@ where .any(|segment| segment.size.as_bytes_u64() > 0) } - /// Serve a poll from the on-disk segment files. - /// - /// Picks the starting segment + byte position via the sparse index - /// (one entry per persist flush; a miss falls back to the segment - /// start), then walks stamped `[256B SendMessages2Header][blob]` - /// batches in chunked reads, slicing fragments with the same selector - /// the journal path uses. Batches split across a chunk boundary are - /// re-read from their start in the next chunk. - #[allow(clippy::cast_possible_truncation)] - async fn poll_from_disk( - &self, - query: MessageLookup, - ) -> Option<(PollFragments<4096>, Option, u32)> { - const DISK_POLL_CHUNK: u64 = 1 << 20; - - let count = query.count(); - if count == 0 || !self.log.has_segments() { - return None; - } - - let (start_segment, mut position) = self.disk_poll_start(&query); - - let mut fragments = PollFragments::new(); - let mut last_matching_offset = None; - let mut matched: u32 = 0; - - for segment_index in start_segment..self.log.segments().len() { - if matched >= count { - break; - } - let persisted = self.log.segments()[segment_index].size.as_bytes_u64(); - if persisted == 0 || position >= persisted { - position = 0; - continue; - } - // Sealed segments drop their writer at rotation, so resolve the - // file from the partition directory (taken from any live writer) - // plus the segment's start offset, mirroring the writer naming. - let Some(partition_dir) = self.partition_dir() else { - // Simulated in-memory persistence: no files to read. A live - // partition hitting this means no writer was resolvable - // (e.g. mid-rotation), which silently hides the disk tier. - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = self.namespace().inner(), - segment_count = self.log.segments().len(), - "disk poll: no live writer to resolve partition dir; disk tier unreadable" - ); - return None; - }; - let start_offset = self.log.segments()[segment_index].start_offset; - let path = format!("{partition_dir}/{start_offset:0>20}.log"); - let Some(file) = self.open_segment_with_retry(&path).await else { - position = 0; - continue; - }; - - let mut chunk_len = DISK_POLL_CHUNK; - while matched < count && position < persisted { - let len = (persisted - position).min(chunk_len) as usize; - let Some(chunk) = self.read_chunk_with_retry(&file, position, len).await else { - break; - }; - let consumed = walk_disk_chunk( - &chunk, - query, - count, - &mut matched, - &mut fragments, - &mut last_matching_offset, - ); - if consumed == 0 { - if (len as u64) >= persisted - position { - // The whole remainder fit and still no complete - // batch decoded: corrupt tail; stop. - break; - } - // A single batch larger than the chunk: grow and - // re-read instead of spinning. - chunk_len = chunk_len.saturating_mul(4); - continue; - } - chunk_len = DISK_POLL_CHUNK; - position += consumed as u64; - } - position = 0; - } - - if fragments.is_empty() { - None - } else { - Some((fragments, last_matching_offset, matched)) - } - } - - /// Open a segment file for a disk poll, retrying transient IO failures - /// (fd pressure under heavy parallel load) so one failed syscall does - /// not silently collapse the poll into an empty result. - async fn open_segment_with_retry(&self, path: &str) -> Option { - for attempt in 0..3u8 { - match compio::fs::File::open(path).await { - Ok(file) => return Some(file), - Err(error) => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = self.namespace().inner(), - path, - attempt, - %error, - "disk poll: failed to open segment file" - ); - compio::time::sleep(std::time::Duration::from_millis(10)).await; - } - } - } - None - } - - /// Read one chunk for a disk poll, retrying transient IO failures. - async fn read_chunk_with_retry( - &self, - file: &compio::fs::File, - position: u64, - len: usize, - ) -> Option> { - for attempt in 0..3u8 { - let buffer = Owned::<4096>::zeroed(len); - let compio::BufResult(read, buffer) = file.read_exact_at(buffer, position).await; - match read { - Ok(()) => return Some(Frozen::from(buffer)), - Err(error) => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = self.namespace().inner(), - position, - attempt, - %error, - "disk poll: segment read failed" - ); - compio::time::sleep(std::time::Duration::from_millis(10)).await; - } - } - } - None - } - /// Starting `(segment index, byte position)` for a disk poll, resolved /// via each segment's sparse index cache. An index miss starts at the /// segment's first byte (the walk filters precisely). @@ -2389,58 +2302,255 @@ fn accumulate_committed_info( info.max_timestamp = info.max_timestamp.max(base_timestamp); } -/// Walk stamped `[256B SendMessages2Header][blob]` batches in one disk -/// chunk, pushing matching fragments. Returns bytes consumed: the start -/// of the first batch that did not fully fit in the chunk (the caller -/// re-reads from there), or the chunk end when everything decoded. -fn walk_disk_chunk( - chunk: &Frozen<4096>, - query: MessageLookup, - count: u32, - matched: &mut u32, - fragments: &mut PollFragments<4096>, - last_matching_offset: &mut Option, -) -> usize { - let bytes: &[u8] = chunk; - let mut cursor = 0usize; - - while *matched < count && cursor + COMMAND_HEADER_SIZE <= bytes.len() { - let Ok(batch) = decode_batch_slice(&bytes[cursor..]) else { - // Incomplete tail batch (or corrupt data): hand the position - // back so the caller can re-read or bail. - break; +#[cfg(test)] +mod tests { + use super::*; + use crate::poll_plan::DiskReadOutcome; + use bytes::Bytes; + use compio::io::AsyncWriteAtExt; + use consensus::LocalPipeline; + use server_common::send_messages2::{ + COMMAND_HEADER_SIZE, IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned, + }; + + const TEST_CLUSTER: u128 = 1; + + fn test_partition() -> IggyPartition { + let namespace = IggyNamespace::new(1, 1, 0); + let consensus = VsrConsensus::new( + TEST_CLUSTER, + 0, + 1, + namespace.inner(), + IggyMessageBus::new(0), + LocalPipeline::new(), + ); + consensus.init(); + IggyPartition::with_in_memory_storage( + Arc::new(PartitionStats::default()), + consensus, + IggyByteSize::from(1024 * 1024), + false, + ) + } + + /// `reclaim_dead_group_offsets` must drop exactly the not-`is_live` groups + /// from the in-memory map and hand back their owned persisted-file paths, + /// leaving live groups untouched. The returned `Vec` is what the + /// reconciler unlinks off-borrow, so it carries no partition reference. + /// + /// TODO: a true cross-task interleave (pump reallocs the partitions vec + /// while the reconciler awaits the unlink) needs a two-future sim oracle + /// that does not exist yet; this covers the synchronous removal contract + /// the off-borrow split relies on. + #[compio::test] + async fn reclaim_dead_group_offsets_drops_dead_keeps_live() { + let mut partition = test_partition(); + let group_offsets_path = "/iggy-test-cg-offsets".to_owned(); + partition.consumer_group_offsets_path = Some(group_offsets_path.clone()); + + let dead: u32 = 1; + let live: u32 = 2; + partition.consumer_group_offsets.pin().insert( + ConsumerGroupId(dead as usize), + ConsumerOffset::new(ConsumerKind::ConsumerGroup, dead, 7, String::new()), + ); + partition.consumer_group_offsets.pin().insert( + ConsumerGroupId(live as usize), + ConsumerOffset::new(ConsumerKind::ConsumerGroup, live, 9, String::new()), + ); + + let paths = partition.reclaim_dead_group_offsets(|group_id| group_id == u64::from(live)); + + assert_eq!( + paths, + vec![format!("{group_offsets_path}/{dead}")], + "only the dead group's persisted path is returned for unlink" + ); + let mut remaining = partition.consumer_group_offset_ids(); + remaining.sort_unstable(); + assert_eq!( + remaining, + vec![u64::from(live)], + "dead group removed in-memory; live group retained" + ); + } + + /// One-message segment record in on-disk layout `[256B command header][blob]` + /// stamped at `base_offset`, with a valid batch checksum so it decodes + /// through `decode_batch_slice` and matches an `Offset` poll. + fn build_segment_record(namespace: IggyNamespace, base_offset: u64) -> Vec { + let mut batch = IggyMessages2::with_capacity(1); + batch.push(IggyMessage2 { + header: IggyMessage2Header { + payload_length: 8, + ..Default::default() + }, + payload: Bytes::from_static(b"abcdefgh"), + user_headers: None, + }); + let mut owned = SendMessages2Owned::from_messages(namespace, &batch) + .expect("build send_messages batch"); + owned.header.base_offset = base_offset; + owned.header.batch_checksum = owned.header.checksum_for_blob(&owned.blob); + + let mut record = vec![0u8; COMMAND_HEADER_SIZE + owned.blob.len()]; + owned.header.encode_into(&mut record[..COMMAND_HEADER_SIZE]); + record[COMMAND_HEADER_SIZE..].copy_from_slice(&owned.blob); + record + } + + /// Fail-closed disk read: an unreadable EARLIER segment must stop the walk + /// (return `Faulted`) rather than skip forward and serve a LATER segment's + /// messages, which would punch a silent gap into the poll. The second + /// segment holds a real, matchable batch at a higher offset; before the + /// fix, a missing first segment did `continue` and the walk served that + /// batch (offset 5 in response to an offset-0 poll) - the exact skip. + #[compio::test] + async fn read_disk_faults_closed_when_earlier_segment_unreadable() { + let namespace = IggyNamespace::new(1, 1, 0); + + // Unique temp dir; the first segment file is deliberately never created. + let dir = std::env::temp_dir().join(format!( + "iggy-read-disk-faulted-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock after epoch") + .as_nanos(), + )); + compio::fs::create_dir_all(&dir) + .await + .expect("create temp partition dir"); + let partition_dir = dir.to_string_lossy().into_owned(); + + // Second segment starts at offset 5 and holds a valid batch there. + let later_record = build_segment_record(namespace, 5); + let later_path = format!("{partition_dir}/{:0>20}.log", 5u64); + let later_len = later_record.len() as u64; + { + let mut file = compio::fs::File::create(&later_path) + .await + .expect("create later segment file"); + let (written, _) = file.write_all_at(later_record, 0).await.into(); + written.expect("write later segment record"); + file.sync_all().await.expect("flush later segment file"); + } + + // First segment claims persisted bytes but its file is absent, so the + // open exhausts retries -> the walk must fault-close before segment two. + let plan = DiskReadPlan { + partition_dir: Some(partition_dir), + segments: vec![ + DiskSegment { + start_offset: 0, + persisted: 512, + }, + DiskSegment { + start_offset: 5, + persisted: later_len, + }, + ], + start_position: 0, + namespace_raw: namespace.inner(), }; - let total_size = batch.header.total_size(); - if let Some(selection) = select_batch_slice(&batch, query, *matched) { - let full_body_selected = selection.start == 0 && selection.end == batch.blob().len(); - if full_body_selected { - fragments.push(Fragment::slice(chunk.clone(), cursor, cursor + total_size)); - } else { - let mut rewritten = batch.header; - rewritten.batch_length = - u64::try_from(COMMAND_HEADER_SIZE + (selection.end - selection.start)) - .expect("sliced batch length exceeds u64::MAX"); - rewritten.message_count = selection.matched_messages; - rewritten.batch_checksum = rewritten.checksum_for_blob( - batch - .blob() - .get(selection.start..selection.end) - .expect("selected batch slice must stay within blob bounds"), - ); - fragments.push(Fragment::whole(rewritten.into_frozen())); - fragments.push(Fragment::slice( - chunk.clone(), - cursor + COMMAND_HEADER_SIZE + selection.start, - cursor + COMMAND_HEADER_SIZE + selection.end, - )); - } - *last_matching_offset = Some(selection.last_matching_offset); - *matched += selection.matched_messages; - } + let outcome = plan + .read_disk(MessageLookup::Offset { + offset: 0, + count: 10, + ceiling: u64::MAX, + }) + .await; + + assert!( + matches!(outcome, DiskReadOutcome::Faulted), + "unreadable first segment must fault-close, not skip forward to the later segment", + ); - cursor += total_size; + let _ = std::fs::remove_dir_all(&dir); } - cursor.min(bytes.len()) + /// Fail-closed disk read on a CORRUPT (present-but-undecodable) batch in an + /// EARLIER segment: like a missing/unreadable segment, the walk must stop + /// (`Faulted`) rather than skip past the garbage and serve a LATER + /// segment's valid batch at a higher offset, which would punch a silent gap + /// into the poll. The first segment's file exists and claims persisted bytes + /// but holds non-decodable data; the second segment holds a real batch at + /// offset 5. + #[compio::test] + async fn read_disk_faults_closed_when_earlier_segment_corrupt() { + let namespace = IggyNamespace::new(1, 1, 0); + + let dir = std::env::temp_dir().join(format!( + "iggy-read-disk-corrupt-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock after epoch") + .as_nanos(), + )); + compio::fs::create_dir_all(&dir) + .await + .expect("create temp partition dir"); + let partition_dir = dir.to_string_lossy().into_owned(); + + // First segment (start_offset 0): garbage bytes that never decode into a + // complete batch. + let corrupt_record = vec![0xABu8; 512]; + let corrupt_len = corrupt_record.len() as u64; + let corrupt_path = format!("{partition_dir}/{:0>20}.log", 0u64); + { + let mut file = compio::fs::File::create(&corrupt_path) + .await + .expect("create corrupt segment file"); + let (written, _) = file.write_all_at(corrupt_record, 0).await.into(); + written.expect("write corrupt segment record"); + file.sync_all().await.expect("flush corrupt segment file"); + } + + // Second segment (start_offset 5): a valid, matchable batch. + let later_record = build_segment_record(namespace, 5); + let later_path = format!("{partition_dir}/{:0>20}.log", 5u64); + let later_len = later_record.len() as u64; + { + let mut file = compio::fs::File::create(&later_path) + .await + .expect("create later segment file"); + let (written, _) = file.write_all_at(later_record, 0).await.into(); + written.expect("write later segment record"); + file.sync_all().await.expect("flush later segment file"); + } + + let plan = DiskReadPlan { + partition_dir: Some(partition_dir), + segments: vec![ + DiskSegment { + start_offset: 0, + persisted: corrupt_len, + }, + DiskSegment { + start_offset: 5, + persisted: later_len, + }, + ], + start_position: 0, + namespace_raw: namespace.inner(), + }; + + let outcome = plan + .read_disk(MessageLookup::Offset { + offset: 0, + count: 10, + ceiling: u64::MAX, + }) + .await; + + assert!( + matches!(outcome, DiskReadOutcome::Faulted), + "corrupt earlier segment must fault-close, not skip forward to the later segment", + ); + + let _ = std::fs::remove_dir_all(&dir); + } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 998e132837..caf4974a9a 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -17,8 +17,9 @@ #![allow(dead_code)] -use crate::IggyPartition; +use crate::poll_plan::PollPlan; use crate::types::PartitionsConfig; +use crate::{IggyPartition, Partition, PollingArgs, PollingConsumer}; use ahash::AHashSet; use consensus::{Consensus, Plane, PlaneIdentity, VsrConsensus}; use iggy_binary_protocol::{ @@ -26,10 +27,34 @@ use iggy_binary_protocol::{ }; use message_bus::MessageBus; use server_common::sharding::{IggyNamespace, LocalIdx, ShardId}; +#[cfg(debug_assertions)] +use std::cell::Cell; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; use tracing::warn; +/// RAII counter for live [`IggyPartitions::with_partition`] borrows. The +/// decrement runs on `Drop`, so a panic inside the closure still restores the +/// count instead of poisoning the `insert` / `remove` tripwire for the rest of +/// the process. +#[cfg(debug_assertions)] +struct BorrowGuard<'a>(&'a Cell); + +#[cfg(debug_assertions)] +impl<'a> BorrowGuard<'a> { + fn new(cell: &'a Cell) -> Self { + cell.set(cell.get() + 1); + Self(cell) + } +} + +#[cfg(debug_assertions)] +impl Drop for BorrowGuard<'_> { + fn drop(&mut self) { + self.0.set(self.0.get() - 1); + } +} + /// Per-shard collection of all partitions. /// /// This struct manages ALL partitions assigned to a single shard, regardless @@ -66,6 +91,14 @@ where /// per-shard runtime is single-threaded, so runtime borrow checks /// suffice; callers must not hold a borrow across `.await`. tombstoned: RefCell>, + /// Debug-only tripwire: counts live [`Self::with_partition`] borrows so + /// `insert` / `remove` can assert the partitions vec is never mutated + /// while a sanctioned non-pump read borrow is outstanding. Cannot fire for + /// correct code: the closure is `FnOnce` (so it cannot span an `.await`) + /// and [`BorrowGuard`] decrements on unwind, so a panic inside it does not + /// leave the count stuck. + #[cfg(debug_assertions)] + borrow_active: Cell, } impl IggyPartitions @@ -80,6 +113,8 @@ where partitions: UnsafeCell::new(Vec::new()), namespace_to_local: UnsafeCell::new(HashMap::new()), tombstoned: RefCell::new(AHashSet::new()), + #[cfg(debug_assertions)] + borrow_active: Cell::new(0), } } @@ -91,6 +126,8 @@ where partitions: UnsafeCell::new(Vec::with_capacity(capacity)), namespace_to_local: UnsafeCell::new(HashMap::with_capacity(capacity)), tombstoned: RefCell::new(AHashSet::new()), + #[cfg(debug_assertions)] + borrow_active: Cell::new(0), } } @@ -99,18 +136,21 @@ where } fn partitions(&self) -> &Vec> { - // Safety: single-threaded per-shard model, no concurrent access. + // SAFETY: see the `partitions` field doc. The returned `&` is sound only + // while not held across an `.await` on a non-pump task (a sibling + // reconcile could realloc); single-threadedness alone is not enough. unsafe { &*self.partitions.get() } } fn namespace_map(&self) -> &HashMap { - // Safety: single-threaded per-shard model, no concurrent access. + // SAFETY: shared read, same borrow rule as `partitions` above. unsafe { &*self.namespace_to_local.get() } } #[allow(clippy::mut_from_ref)] fn namespace_map_mut(&self) -> &mut HashMap { - // Safety: single-threaded per-shard model, no concurrent access. + // SAFETY: `&mut` is sound because map mutation runs only on the pump + // task, the sole mutator; single-threadedness alone is not enough. unsafe { &mut *self.namespace_to_local.get() } } @@ -134,7 +174,8 @@ where /// Get mutable partition by local index. #[allow(clippy::mut_from_ref)] fn get_mut(&self, local_idx: LocalIdx) -> Option<&mut IggyPartition> { - // Safety: single-threaded per-shard model, no concurrent access. + // SAFETY: `&mut` is sound on the pump task only (the sole mutator); see + // `namespace_map_mut`. Single-threadedness alone is not enough. unsafe { (&mut *self.partitions.get()).get_mut(*local_idx) } } @@ -153,8 +194,18 @@ where /// [`Self::get_mut_by_ns`] / [`Self::get_mut`] held by a sibling /// task across an `.await`. New external call sites MUST route /// through `ReconcileOp::InsertOwned` instead. + /// + /// The debug tripwire tracks the `&` borrows handed out by + /// [`Self::with_partition`]; the `&mut` path above is uncounted (it is + /// pump-only, so it cannot alias this same-task mutation). #[doc(hidden)] pub fn insert(&self, namespace: IggyNamespace, partition: IggyPartition) -> LocalIdx { + #[cfg(debug_assertions)] + debug_assert_eq!( + self.borrow_active.get(), + 0, + "IggyPartitions::insert while a with_partition borrow is live" + ); // Safety: pump-only invariant, caller responsibility. let partitions = unsafe { &mut *self.partitions.get() }; let local_idx = LocalIdx::new(partitions.len()); @@ -171,9 +222,19 @@ where /// Get partition by namespace directly. /// /// Returns `None` for tombstoned namespaces so callers outside - /// [`Plane`] (view-change handlers, `tick_partitions`, loopback drain) + /// [`Plane`] (view-change handlers, loopback drain, `tick_partitions`) /// can't drive journal writes against a partition the reconciler has /// already fenced for delete. + /// + /// A pump-task caller MAY hold the returned reference across an `.await`: + /// the vec mutators (`apply_reconcile_ops`) run on the same task and so + /// cannot interleave and reallocate while the pump is suspended. + /// `tick_partitions` is such a caller (it holds the partition across the + /// VSR action dispatch await) and still relies on the tombstone gate above. + /// A non-pump caller must instead use [`Self::with_partition`], which scopes + /// the borrow to a synchronous closure, and must never hold the reference + /// across an `.await` (a sibling task's reconcile could reallocate the vec + /// mid-await). pub fn get_by_ns(&self, namespace: &IggyNamespace) -> Option<&IggyPartition> { if self.is_tombstoned(namespace) { return None; @@ -182,6 +243,23 @@ where self.partitions().get(**idx) } + /// Run `f` against the partition for `namespace`, returning its result. + /// + /// Sanctioned non-pump read path: unlike [`Self::get_by_ns`], the borrow + /// cannot escape `f`, so it cannot be held across an `.await` while the + /// pump task mutates the partitions vec. Returns `None` for a missing or + /// tombstoned namespace. + pub fn with_partition( + &self, + namespace: &IggyNamespace, + f: impl FnOnce(&IggyPartition) -> R, + ) -> Option { + let partition = self.get_by_ns(namespace)?; + #[cfg(debug_assertions)] + let _guard = BorrowGuard::new(&self.borrow_active); + Some(f(partition)) + } + /// Get mutable partition by namespace directly. Tombstone-gated like /// [`Self::get_by_ns`]. #[allow(clippy::mut_from_ref)] @@ -190,7 +268,8 @@ where return None; } let idx = self.namespace_map().get(namespace)?; - // Safety: single-threaded per-shard model, no concurrent access. + // SAFETY: `&mut` is sound on the pump task only (the sole mutator); see + // `namespace_map_mut`. Single-threadedness alone is not enough. unsafe { (&mut *self.partitions.get()).get_mut(**idx) } } @@ -209,8 +288,18 @@ where /// Panics if the stored `LocalIdx` is past `partitions.len()`, an /// invariant violation. Silent `None` would leave the map half-mutated /// and prime the next `insert` for a colliding index. + /// + /// The debug tripwire tracks the `&` borrows handed out by + /// [`Self::with_partition`]; the `&mut` path above is uncounted (it is + /// pump-only, so it cannot alias this same-task mutation). #[doc(hidden)] pub fn remove(&self, namespace: &IggyNamespace) -> Option> { + #[cfg(debug_assertions)] + debug_assert_eq!( + self.borrow_active.get(), + 0, + "IggyPartitions::remove while a with_partition borrow is live" + ); let local_idx = self.namespace_map_mut().remove(namespace)?; let idx = *local_idx; let partitions = unsafe { &mut *self.partitions.get() }; @@ -268,6 +357,65 @@ where pub fn untombstone(&self, namespace: &IggyNamespace) { self.tombstoned.borrow_mut().remove(namespace); } + + /// Build an owned [`PollPlan`] for a partition poll synchronously, under a + /// single [`Self::with_partition`] borrow (the in-memory journal tier + the + /// resident-tail straddle snapshot are read here; mem reads never yield). + /// Returns `None` for a missing or tombstoned namespace. + /// + /// Pairs with [`PollPlan::execute`], which runs the disk read + + /// offset persist/apply off the borrow on the owned plan. Splitting the + /// borrow-bound plan build from the borrow-free execution is what keeps + /// poll-read sound: the only partition reference is taken here, on the pump, + /// sequential with the pump's own `&mut` mutations, never on a sibling task. + pub fn build_poll_snapshot( + &self, + namespace: &IggyNamespace, + consumer: PollingConsumer, + args: &PollingArgs, + ) -> Option { + self.with_partition(namespace, |partition| { + partition.build_poll_plan(consumer, args) + }) + } + + /// Read a consumer's stored offset + the partition commit offset. Fully + /// synchronous (atomics + lock-free maps), so it runs under a single + /// [`Self::with_partition`] borrow. `None` for a missing/tombstoned + /// namespace. + pub fn consumer_offset_read( + &self, + namespace: &IggyNamespace, + consumer: PollingConsumer, + ) -> Option<(Option, u64)> { + self.with_partition(namespace, |partition| { + ( + partition.get_consumer_offset(consumer), + partition.offsets().commit_offset, + ) + }) + } + + /// Cooperative-rebalance: a group's `(last_polled, committed)` offsets on the + /// partition for `namespace`. Synchronous (lock-free maps), under a single + /// [`Self::with_partition`] borrow. `None` for a missing/tombstoned namespace. + pub fn group_offset_state( + &self, + namespace: &IggyNamespace, + group_id: u64, + ) -> Option<(Option, Option)> { + self.with_partition(namespace, |partition| { + partition.group_offset_state(group_id) + }) + } + + /// Drop a group's ephemeral `last_polled` mark on the partition for + /// `namespace`. `None` for a missing/tombstoned namespace. + pub fn clear_group_last_polled(&self, namespace: &IggyNamespace, group_id: u64) -> Option<()> { + self.with_partition(namespace, |partition| { + partition.clear_group_last_polled(group_id); + }) + } } impl Plane> for IggyPartitions @@ -362,3 +510,235 @@ where message.header().operation().is_partition() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::journal::MessageLookup; + use bytes::Bytes; + use consensus::LocalPipeline; + use iggy_binary_protocol::Operation; + use iggy_common::{IggyByteSize, PartitionStats}; + use journal::Journal as _; + use message_bus::IggyMessageBus; + use server_common::send_messages2::{ + IggyMessage2, IggyMessage2Header, IggyMessages2, PREPARE_SPLIT_POINT, SendMessages2Owned, + stamp_prepare_for_persistence, + }; + use server_common::{Message, iobuf::Frozen}; + use std::sync::Arc; + + const TEST_CLUSTER: u128 = 1; + + fn build_partition() -> IggyPartition { + let namespace = IggyNamespace::new(1, 1, 0); + let consensus = VsrConsensus::new( + TEST_CLUSTER, + 0, + 1, + namespace.inner(), + IggyMessageBus::new(0), + LocalPipeline::new(), + ); + consensus.init(); + IggyPartition::with_in_memory_storage( + Arc::new(PartitionStats::default()), + consensus, + IggyByteSize::from(1024 * 1024), + false, + ) + } + + /// One-message `SendMessages` journal entry stamped at `op` / `base_offset`. + /// Reuses the production blob builder + checksum stamping so the entry + /// decodes through `decode_prepare_slice` and indexes into `offset_to_op`, + /// the map `candidate_start_op` and the contiguity guard read. + fn build_send_messages_entry( + namespace: IggyNamespace, + op: u64, + base_offset: u64, + ) -> Frozen<4096> { + let mut batch = IggyMessages2::with_capacity(1); + batch.push(IggyMessage2 { + header: IggyMessage2Header { + payload_length: 8, + ..Default::default() + }, + payload: Bytes::from_static(b"abcdefgh"), + user_headers: None, + }); + let owned = SendMessages2Owned::from_messages(namespace, &batch) + .expect("build send_messages batch"); + + let total_size = PREPARE_SPLIT_POINT + owned.blob.len(); + let prepare = Message::::new(total_size).transmute_header( + |_, header: &mut PrepareHeader| { + header.command = Command2::Prepare; + header.operation = Operation::SendMessages; + header.op = op; + header.size = u32::try_from(total_size).expect("size fits u32"); + }, + ); + + let mut prepare = prepare; + { + let bytes = prepare.as_mut_slice(); + owned + .header + .encode_into(&mut bytes[std::mem::size_of::()..PREPARE_SPLIT_POINT]); + bytes[PREPARE_SPLIT_POINT..PREPARE_SPLIT_POINT + owned.blob.len()] + .copy_from_slice(&owned.blob); + } + + let (prepare, _command, _count) = stamp_prepare_for_persistence(prepare, base_offset, 1) + .expect("stamp prepare for persistence"); + prepare.into_frozen() + } + + /// Resident-tail straddle continuation over the OWNED snapshot, equivalent + /// to the removed `journal_get_contiguous`. Reproduces the disk->journal + /// straddle gap: a commit between the plan snapshot and the splice persists + /// offsets `D+1..R-1` and evicts them, leaving `oldest_resident > D+1`. The + /// continuation must be refused there (so the poll returns disk-only and the + /// next poll re-routes the evicted run to the flushed disk tier) instead of + /// silently splicing the next resident op `R` over the gap. + /// + /// `count: 1` makes `last_matching_offset` the single matched offset, so the + /// skip is provable without decoding fragment bytes: a lookup that asked for + /// offset 1 returning offset 3 means 1 and 2 were skipped. This drives the + /// exact `select_resident` walk + `oldest_resident <= from_offset` gate that + /// `ResidentTailSnapshot::straddle_continuation` + `PollPlan::execute` apply. + #[compio::test] + async fn straddle_continuation_refuses_journal_when_oldest_resident_advanced_past_gap() { + let namespace = IggyNamespace::new(1, 1, 0); + let partition = build_partition(); + + // Journal offsets 0..=4 (op N carries message offset N). + for offset in 0..=4u64 { + partition + .log + .journal() + .inner + .append(build_send_messages_entry(namespace, offset + 1, offset)) + .await + .expect("append journal entry"); + } + + // Commit-time flush of the prefix: offsets 0,1,2 leave the journal, + // so the oldest resident offset advances to 3 (the gap edge). + let prefix = partition.log.journal().inner.committed_prefix(3); + assert_eq!(prefix.len(), 3, "ops for offsets 0,1,2 are the prefix"); + partition + .log + .journal() + .inner + .evict_prefix(prefix.len()) + .await; + + // Snapshot the resident tail the way `build_poll_plan` does. + let oldest_resident = partition.log.journal().inner.oldest_resident_offset(); + let entries = partition.log.journal().inner.resident_entries(); + assert_eq!( + oldest_resident, + Some(3), + "offsets 0,1,2 evicted; 3 is the oldest resident", + ); + + // D = 0 (last disk match), so a straddle would continue at offset 1. + // A raw snapshot walk from offset 1 falls through to the next resident + // op (offset 3): exactly the silent skip the gate prevents. + let (_, leaky_offset) = crate::journal::select_resident( + &entries, + MessageLookup::Offset { + offset: 1, + count: 1, + ceiling: u64::MAX, + }, + ) + .expect("raw snapshot walk still returns the post-gap batch"); + assert_eq!( + leaky_offset, + Some(3), + "raw select_resident skips the evicted 1,2 and serves offset 3 (the hazard)", + ); + + // The contiguity gate refuses that continuation: oldest_resident (3) > 1. + let from_offset = 1u64; + let contiguous = oldest_resident.is_some_and(|oldest| oldest <= from_offset); + assert!( + !contiguous, + "contiguity gate must not serve offset 1 over the eviction gap", + ); + + // A contiguous continuation (start == oldest_resident) passes the gate + // and serves the resident tail. + let from_offset = 3u64; + let contiguous = oldest_resident.is_some_and(|oldest| oldest <= from_offset); + assert!(contiguous, "offset 3 is resident; gate must allow it"); + let (_, contiguous_offset) = crate::journal::select_resident( + &entries, + MessageLookup::Offset { + offset: from_offset, + count: 1, + ceiling: u64::MAX, + }, + ) + .expect("offset 3 is resident, continuation must serve it"); + assert_eq!( + contiguous_offset, + Some(3), + "contiguous continuation returns the resident tail starting at 3", + ); + } + + /// The resident journal holds replicated-but-uncommitted prepares ahead of + /// the commit frontier. A poll must clamp at `ceiling` (the commit offset) + /// so it never returns a dirty read of view-change-rollbackable data, even + /// when the requested count would otherwise reach into the uncommitted tail. + #[compio::test] + async fn poll_clamps_at_commit_ceiling() { + let namespace = IggyNamespace::new(1, 1, 0); + let partition = build_partition(); + + // Offsets 0..=5 all resident; only 0..=2 are committed. + for offset in 0..=5u64 { + partition + .log + .journal() + .inner + .append(build_send_messages_entry(namespace, offset + 1, offset)) + .await + .expect("append journal entry"); + } + + let entries = partition.log.journal().inner.resident_entries(); + let (_, last) = crate::journal::select_resident( + &entries, + MessageLookup::Offset { + offset: 0, + count: 10, + ceiling: 2, + }, + ) + .expect("offsets 0..=2 are within the ceiling and must be served"); + assert_eq!( + last, + Some(2), + "poll must stop at the commit ceiling, never serving 3..=5", + ); + + // A poll starting past the ceiling returns nothing. + assert!( + crate::journal::select_resident( + &entries, + MessageLookup::Offset { + offset: 3, + count: 10, + ceiling: 2, + }, + ) + .is_none(), + "no committed message at or after offset 3, so the poll is empty", + ); + } +} diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 3084cb8488..131831aaaf 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -26,10 +26,12 @@ use std::{ cell::UnsafeCell, collections::{BTreeMap, HashMap}, }; +use tracing::warn; use crate::{Fragment, PollFragments, PollQueryResult}; const ZERO_LEN: usize = 0; +const PREPARE_HEADER_SIZE: usize = std::mem::size_of::(); type JournalBuffer = Frozen<4096>; /// Decoded `SendMessages` header fields surfaced from a journal (re-)append so a @@ -46,12 +48,23 @@ pub struct RetainedBatchMeta { } /// Lookup key for querying messages from the journal. +/// +/// `ceiling` is the inclusive commit-frontier bound: the resident journal holds +/// replicated-but-uncommitted prepares (a pipeline ahead of the commit +/// frontier), so a poll must never return a message past `ceiling` or it leaks +/// a dirty read of view-change-rollbackable data. #[derive(Debug, Clone, Copy)] pub enum MessageLookup { - #[allow(dead_code)] - Offset { offset: u64, count: u32 }, - #[allow(dead_code)] - Timestamp { timestamp: u64, count: u32 }, + Offset { + offset: u64, + count: u32, + ceiling: u64, + }, + Timestamp { + timestamp: u64, + count: u32, + ceiling: u64, + }, } impl MessageLookup { @@ -60,6 +73,14 @@ impl MessageLookup { Self::Offset { count, .. } | Self::Timestamp { count, .. } => count, } } + + /// Inclusive commit-frontier upper bound: no message with a greater offset + /// may be served (uncommitted, rollbackable on a view change). + pub const fn ceiling(self) -> u64 { + match self { + Self::Offset { ceiling, .. } | Self::Timestamp { ceiling, .. } => ceiling, + } + } } #[derive(Debug, Clone, Copy)] @@ -70,16 +91,6 @@ pub struct SelectedBatchSlice { pub last_matching_offset: u64, } -#[allow(dead_code)] -pub trait QueryableJournal: Journal -where - S: Storage, -{ - type Query; - - fn get(&self, query: &Self::Query) -> impl Future>>; -} - /// In-memory only partition journal storage. Non-durable. /// /// # Warning — development storage only @@ -194,6 +205,22 @@ where } impl PartitionJournalMemStorage { + /// Synchronous mirror of [`Storage::read_at`] for the poll path. Mem + /// storage never hits the reactor (it copies from an in-memory `Vec`), so + /// the read can run under a partition borrow without crossing an `.await` + /// - the property that keeps poll-read sound. + fn read_at_sync(&self, offset: usize) -> JournalBuffer { + let offset_to_index = unsafe { &*self.offset_to_index.get() }; + let Some(&index) = offset_to_index.get(&offset) else { + return Owned::<4096>::zeroed(0).into(); + }; + let entries = unsafe { &*self.entries.get() }; + entries + .get(index) + .cloned() + .unwrap_or_else(|| Owned::<4096>::zeroed(0).into()) + } + fn entries(&self) -> Vec { let entries = unsafe { &*self.entries.get() }; entries.clone() @@ -222,6 +249,58 @@ impl PartitionJournalMemStorage { } impl PartitionJournal { + /// Synchronous resident-range poll read. Never awaits (mem storage reads + /// are pure memory copies), so a partition borrow held across it cannot span + /// a scheduler yield. The poll path uses this; the disk tier, which does + /// await file IO, runs off the borrow on owned descriptors. + pub fn get_sync(&self, query: &MessageLookup) -> Option> { + let query = *query; + let start_op = self.candidate_start_op(&query)?; + let result = self.load_polled_batches_from_storage_sync(start_op, query); + (!result.0.is_empty()).then_some(result) + } + + fn load_polled_batches_from_storage_sync( + &self, + start_op: u64, + query: MessageLookup, + ) -> PollQueryResult<4096> { + let count = query.count(); + if count == 0 { + return (PollFragments::new(), None); + } + + // Disjoint `UnsafeCell`s: this borrows `op_to_storage_offset` while the + // loop borrows `inner.storage` (via `read_at_sync`); the loop mutates + // neither, so iterating the range in place avoids a per-poll Vec copy. + let op_to_storage_offset = unsafe { &*self.op_to_storage_offset.get() }; + + let mut fragments = PollFragments::new(); + let mut last_matching_offset = None; + let mut matched_messages = 0u32; + + for (_, &storage_offset) in op_to_storage_offset.range(start_op..) { + if matched_messages >= count { + break; + } + + let bytes = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.read_at_sync(storage_offset) + }; + + try_push_resident_entry( + &bytes, + query, + &mut fragments, + &mut last_matching_offset, + &mut matched_messages, + ); + } + + (fragments, last_matching_offset) + } + /// Drain all accumulated batches, matching the legacy `PartitionJournal` API. pub fn commit(&self) -> Vec { let entries = { @@ -328,8 +407,7 @@ impl PartitionJournal { &self, entry: JournalBuffer, ) -> io::Result> { - let header_size = std::mem::size_of::(); - let header_bytes = &entry[..header_size]; + let header_bytes = &entry[..PREPARE_HEADER_SIZE]; let header = *bytemuck::checked::try_from_bytes::(header_bytes) .expect("partition journal append expects a valid prepare header"); let op = header.op; @@ -389,6 +467,15 @@ impl PartitionJournal { let inner = unsafe { &*self.inner.get() }; inner.storage.is_empty() } + + /// Owned, op-ascending clones of every resident journal entry. Each clone + /// is a `Frozen` refcount bump, not a deep copy. Used to snapshot the + /// resident tail at poll-plan time so a disk-tier straddle can be spliced + /// off the partition borrow on owned data ([`crate::iggy_partition`]). + pub fn resident_entries(&self) -> Vec { + let inner = unsafe { &*self.inner.get() }; + inner.storage.entries() + } } impl PartitionJournal @@ -441,7 +528,6 @@ where offset_to_op.keys().next().copied() } - #[allow(dead_code)] fn candidate_start_op(&self, query: &MessageLookup) -> Option { match query { MessageLookup::Offset { offset, .. } => { @@ -495,78 +581,6 @@ where Some(bytes) } - - #[allow(dead_code)] - async fn load_polled_batches_from_storage( - &self, - start_op: u64, - query: MessageLookup, - ) -> PollQueryResult<4096> { - let count = query.count(); - - if count == 0 { - return (PollFragments::new(), None); - } - - // Get (op, storage_offset) pairs directly from the mapping - // BTreeMap is already sorted by op - let op_offsets: Vec<(u64, usize)> = { - let op_to_storage_offset = unsafe { &*self.op_to_storage_offset.get() }; - op_to_storage_offset - .range(start_op..) - .map(|(op, offset)| (*op, *offset)) - .collect() - }; - - let mut fragments = PollFragments::new(); - let mut last_matching_offset = None; - let mut matched_messages = 0u32; - - for (_, storage_offset) in op_offsets { - if matched_messages >= count { - break; - } - - let bytes = { - let inner = unsafe { &*self.inner.get() }; - inner - .storage - .read_at(storage_offset, Owned::<4096>::zeroed(ZERO_LEN).into()) - .await - .unwrap_or_else(|_| Owned::<4096>::zeroed(ZERO_LEN).into()) - }; - - if bytes.is_empty() { - continue; - } - - let header_size = std::mem::size_of::(); - let header_bytes = &bytes[..header_size]; - let header = *bytemuck::checked::try_from_bytes::(header_bytes) - .expect("partition journal storage must contain a valid prepare header"); - if header.operation != Operation::SendMessages { - continue; - } - let Ok(batch) = decode_prepare_slice(bytes.as_slice()) else { - continue; - }; - - let Some(selection) = select_batch_slice(&batch, query, matched_messages) else { - continue; - }; - push_selected_batch_fragments( - &mut fragments, - &mut last_matching_offset, - &mut matched_messages, - &bytes, - &header, - &batch, - selection, - ); - } - - (fragments, last_matching_offset) - } } impl Journal for PartitionJournal { @@ -599,22 +613,6 @@ impl Journal for PartitionJournal for PartitionJournal { - type Query = MessageLookup; - - async fn get(&self, query: &Self::Query) -> Option> { - let query = *query; - let start_op = self.candidate_start_op(&query)?; - let result = self.load_polled_batches_from_storage(start_op, query).await; - - if result.0.is_empty() { - None - } else { - Some(result) - } - } -} - pub fn select_batch_slice( batch: &SendMessages2Ref<'_>, query: MessageLookup, @@ -631,9 +629,17 @@ pub fn select_batch_slice( let mut matched = 0u32; let mut last_matching_offset = None; + let ceiling = query.ceiling(); for record in batch.iter_with_offsets() { let offset = batch.header.base_offset + u64::from(record.message.header.offset_delta); + // Offsets within a batch ascend with the record index, so once we pass + // the commit frontier every later record is uncommitted too: stop here + // rather than skipping, which would punch a hole into the byte slice. + if offset > ceiling { + break; + } + let selected = match query { MessageLookup::Offset { offset: query_offset, @@ -666,24 +672,30 @@ pub fn select_batch_slice( }) } -fn push_selected_batch_fragments( +/// Push the fragments for one selected batch, shared by the resident-journal +/// walk and the disk-chunk walk. `source` holds a stamped +/// `[256B SendMessages2Header][blob]` batch starting at byte `batch_base` +/// (the disk walk passes the chunk cursor; the resident walk passes +/// `size_of::()`, the batch's offset past the prepare header). +/// A full-body selection forwards the original batch bytes by reference; a +/// partial selection emits a rewritten header (clamped length/count/checksum) +/// plus a body slice. +pub fn push_selected_batch_fragments( fragments: &mut PollFragments<4096>, last_matching_offset: &mut Option, matched_messages: &mut u32, - prepare: &Frozen<4096>, - prepare_header: &PrepareHeader, + source: &Frozen<4096>, + batch_base: usize, batch: &SendMessages2Ref<'_>, selection: SelectedBatchSlice, ) { - let prepare_header_size = std::mem::size_of::(); - let prepare_size = prepare_header.size as usize; let full_body_selected = selection.start == 0 && selection.end == batch.blob().len(); if full_body_selected { fragments.push(Fragment::slice( - prepare.clone(), - prepare_header_size, - prepare_size, + source.clone(), + batch_base, + batch_base + batch.header.total_size(), )); } else { let mut rewritten = batch.header; @@ -699,9 +711,9 @@ fn push_selected_batch_fragments( ); fragments.push(Fragment::whole(rewritten.into_frozen())); fragments.push(Fragment::slice( - prepare.clone(), - prepare_header_size + COMMAND_HEADER_SIZE + selection.start, - prepare_header_size + COMMAND_HEADER_SIZE + selection.end, + source.clone(), + batch_base + COMMAND_HEADER_SIZE + selection.start, + batch_base + COMMAND_HEADER_SIZE + selection.end, )); } @@ -709,6 +721,93 @@ fn push_selected_batch_fragments( *matched_messages += selection.matched_messages; } +/// Decode one resident `Frozen` entry and push its matching fragments. Shared by +/// the live storage walk and the owned-snapshot walk so the corrupt-header skip +/// and `SendMessages` filter live in one place. Skips (never panics) on a short +/// or undecodable entry: a poll must not crash the shard on bad storage. +fn try_push_resident_entry( + prepare: &Frozen<4096>, + query: MessageLookup, + fragments: &mut PollFragments<4096>, + last_matching_offset: &mut Option, + matched_messages: &mut u32, +) { + let Some(header_bytes) = prepare.as_slice().get(..PREPARE_HEADER_SIZE) else { + return; + }; + let Ok(header) = bytemuck::checked::try_from_bytes::(header_bytes) else { + warn!( + target: "iggy.partitions.diag", + "partition journal poll: skipping entry with undecodable prepare header" + ); + return; + }; + if header.operation != Operation::SendMessages { + return; + } + let Ok(batch) = decode_prepare_slice(prepare.as_slice()) else { + return; + }; + let Some(selection) = select_batch_slice(&batch, query, *matched_messages) else { + return; + }; + // The batch's 256B header sits right after the prepare header in a resident + // entry (see `decode_prepare_slice`), so the batch base is `PREPARE_HEADER_SIZE`. + push_selected_batch_fragments( + fragments, + last_matching_offset, + matched_messages, + prepare, + PREPARE_HEADER_SIZE, + &batch, + selection, + ); +} + +/// Poll an owned, point-in-time snapshot of the resident journal tail. +/// `entries` are op-ascending `Frozen` clones captured while the partition +/// borrow was held; this runs off the borrow on owned data, so no concurrent +/// commit/eviction can interleave. Mirrors [`PartitionJournal::get_sync`] but +/// over owned entries: a single forward walk where `select_batch_slice` filters +/// by `query`, which is equivalent to the live `candidate_start_op` seek (a +/// batch entirely before the query bound contributes no records). +/// +/// Used both for retention-recovery (disk walked clean, serve the journal with +/// the original query) and, after a contiguity check by the caller, for the +/// disk-tier straddle continuation. Returns `None` when nothing matched. +// +// Plain `pub` (not `pub(crate)`): the `journal` module is private, so this is +// not externally reachable, and `pub(crate)` here trips `redundant_pub_crate`. +// Matches `select_batch_slice` above. +pub fn select_resident( + entries: &[Frozen<4096>], + query: MessageLookup, +) -> Option> { + let count = query.count(); + if count == 0 { + return None; + } + + let mut fragments = PollFragments::new(); + let mut last_matching_offset = None; + let mut matched_messages = 0u32; + + for prepare in entries { + if matched_messages >= count { + break; + } + try_push_resident_entry( + prepare, + query, + &mut fragments, + &mut last_matching_offset, + &mut matched_messages, + ); + } + + (!fragments.is_empty()).then_some((fragments, last_matching_offset)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index fa0ad1558a..3942286e7a 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -25,6 +25,7 @@ mod journal; mod log; mod messages_writer; mod offset_storage; +mod poll_plan; mod segment; mod types; @@ -34,6 +35,8 @@ pub use iggy_index_writer::IggyIndexWriter; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; pub use messages_writer::MessagesWriter; +pub use offset_storage::delete_persisted_offset; +pub use poll_plan::PollPlan; pub use segment::Segment; use server_common::Message; pub use server_common::send_messages2::{IggyMessage2, IggyMessage2Header, IggyMessages2}; @@ -52,15 +55,6 @@ pub trait Partition { message: Message, ) -> impl Future>; - fn poll_messages( - &self, - consumer: PollingConsumer, - args: PollingArgs, - ) -> impl Future, IggyError>> { - let _ = (consumer, args); - async { Err(IggyError::FeatureUnavailable) } - } - /// # Errors /// Returns `IggyError::FeatureUnavailable` by default. fn store_consumer_offset( diff --git a/core/partitions/src/offset_storage.rs b/core/partitions/src/offset_storage.rs index dd98266293..249e5d8b02 100644 --- a/core/partitions/src/offset_storage.rs +++ b/core/partitions/src/offset_storage.rs @@ -53,6 +53,10 @@ pub async fn persist_offset(path: &str, offset: u64, enforce_fsync: bool) -> Res Ok(()) } +/// Unlink a persisted consumer-offset file. A no-op if the file is absent. +/// +/// # Errors +/// Returns [`IggyError::CannotDeleteConsumerOffsetFile`] if the unlink fails. pub async fn delete_persisted_offset(path: &str) -> Result<(), IggyError> { if !Path::new(path).exists() { return Ok(()); diff --git a/core/partitions/src/poll_plan.rs b/core/partitions/src/poll_plan.rs new file mode 100644 index 0000000000..aa9b7c54d8 --- /dev/null +++ b/core/partitions/src/poll_plan.rs @@ -0,0 +1,798 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Owned, borrow-free poll execution. +//! +//! A poll must not hold a partition reference across an `.await`: the shard pump +//! can reallocate the partitions `Vec` (`ReconcileOp::InsertOwned`) or take a +//! `&mut` to the same namespace while a poll is parked, dangling the reference. +//! So `IggyPartition::build_poll_plan` captures everything a poll needs +//! synchronously under the borrow into the owned types here, drops the borrow, +//! then [`PollPlan::execute`] runs the disk read + offset persist on owned data +//! alone: consumer offsets are already `Arc`, the journal tail is a +//! point-in-time `Frozen` snapshot, and segment files are re-opened by path. No +//! value in this module holds a partition reference, so executing a plan is +//! sound on a detached task concurrently with the pump's own writes. + +use crate::PollFragments; +use crate::journal::{MessageLookup, push_selected_batch_fragments, select_batch_slice}; +use crate::offset_storage::persist_offset; +use compio::io::AsyncReadAtExt; +use iggy_common::{ + ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, IggyError, +}; +use server_common::iobuf::{Frozen, Owned}; +use server_common::send_messages2::{COMMAND_HEADER_SIZE, decode_batch_slice}; +use std::hash::Hash; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use tracing::warn; + +/// Owned, borrow-free inputs for the disk tier of a poll (see module docs). +/// Segment files are re-opened by path because sealed segments drop their +/// writer at rotation. +pub struct DiskReadPlan { + pub(crate) partition_dir: Option, + /// Segments to walk, snapshotted from the poll's starting segment onward + /// (see `build_poll_plan`); `start_position` is the byte offset into the + /// first one. + pub(crate) segments: Vec, + pub(crate) start_position: u64, + pub(crate) namespace_raw: u64, +} + +pub struct DiskSegment { + pub(crate) start_offset: u64, + pub(crate) persisted: u64, +} + +/// Owned auto-commit inputs, applied off the partition borrow after a poll (see +/// module docs). The committed offset is unknown until the poll completes, so +/// the persist path + fsync flag are captured up front for the later disk write. +pub struct AutoCommitCtx { + pub(crate) offset_path: Option, + pub(crate) enforce_fsync: bool, + pub(crate) target: AutoCommitTarget, +} + +/// The lock-free offset map this auto-commit updates, captured as an owned +/// `Arc` so the apply needs no partition borrow. `create_path` builds the +/// `ConsumerOffset` entry on first commit for a consumer that has none yet. +pub enum AutoCommitTarget { + Consumer { + offsets: Arc, + consumer_id: u32, + create_path: Option, + }, + ConsumerGroup { + offsets: Arc, + group_id: u32, + create_path: Option, + }, +} + +/// Owned cooperative-rebalance input: a group's lock-free `last_polled` map +/// (captured as an `Arc`) plus its id, so the highest offset served to the group +/// is recorded off the partition borrow after the poll completes (the served +/// offset is unknown until then). See [`PollPlan::execute`]. +pub struct LastPolledCtx { + pub(crate) offsets: Arc, + pub(crate) group_id: usize, +} + +impl LastPolledCtx { + /// Bump the group's recorded high-water served offset (monotone via + /// `fetch_max`). Lock-free `papaya` on an owned `Arc`, so sound off the pump. + #[allow(clippy::cast_possible_truncation)] + fn record(&self, last_offset: u64) { + let guard = self.offsets.pin(); + let key = ConsumerGroupId(self.group_id); + if let Some(existing) = guard.get(&key) { + existing.offset.fetch_max(last_offset, Ordering::Relaxed); + } else { + let created = ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + u32::try_from(self.group_id).unwrap_or(u32::MAX), + last_offset, + String::new(), + ); + guard.insert(key, created); + } + } +} + +/// Owned, point-in-time snapshot of the resident journal tail for the disk-tier +/// straddle. `entries` are op-ascending `Frozen` clones (refcount bumps). +pub struct ResidentTailSnapshot { + pub(crate) oldest_resident: Option, + pub(crate) entries: Vec>, +} + +impl ResidentTailSnapshot { + /// Offset query to continue a disk match into the resident tail, or `None` + /// when the tail cannot contiguously extend it. The snapshot is + /// point-in-time, so the gate (`oldest_resident <= last + 1`) is race-free: + /// a commit after the snapshot cannot have evicted the run. Without it, + /// splicing the next resident op over an evicted run silently skips offsets. + fn straddle_continuation( + &self, + last_offset: u64, + remaining: u32, + ceiling: u64, + ) -> Option { + (remaining > 0 + && self + .oldest_resident + .is_some_and(|oldest| oldest <= last_offset + 1)) + .then_some(MessageLookup::Offset { + offset: last_offset + 1, + count: remaining, + ceiling, + }) + } +} + +/// Everything a poll needs, captured by `IggyPartition::build_poll_plan` (see +/// module docs for the borrow contract). +pub struct PollPlan { + /// Monotone high-water snapshot taken before the disk read, so it may lag a + /// concurrent producer by the poll duration and self-corrects next poll. + pub(crate) commit_offset: u64, + pub(crate) auto_commit: Option, + pub(crate) last_polled: Option, + pub(crate) tier: PollTier, +} + +impl PollPlan { + /// Whether executing this plan needs off-pump IO: a disk read (`Disk` tier) + /// or a consumer-offset persist (`auto_commit`). When `false`, the result is + /// fully resident and the caller can [`Self::execute_resident`] + reply on + /// the pump without spawning; when `true`, it must spawn [`Self::execute`] + /// so the pump is not blocked on file IO. + #[must_use] + pub const fn needs_off_pump_io(&self) -> bool { + if matches!(self.tier, PollTier::Disk { .. }) { + return true; + } + // A resident poll only needs a detached task when its auto-commit must + // hit disk. With no `offset_path` (sim/dev) the apply is a sync, + // in-memory store the pump runs inline via `execute_resident`. + match &self.auto_commit { + Some(auto_commit) => auto_commit.needs_persist(), + None => false, + } + } + + /// Execute this plan off the partition borrow: disk read (if any), straddle + /// splice into the owned resident-tail snapshot, then persist + apply the + /// auto-commit on the owned `Arc` offset map. Holds no partition reference + /// (see module docs), so it is safe on a detached task. + pub async fn execute(self) -> (PollFragments<4096>, u64) { + let commit_offset = self.commit_offset; + let (fragments, last_matching_offset) = match self.tier { + PollTier::Empty => (PollFragments::new(), None), + PollTier::Resident { + fragments, + last_matching_offset, + } => (fragments, last_matching_offset), + PollTier::Disk { + disk, + query, + resident_tail, + } => match disk.read_disk(query).await { + // Disk walked cleanly and matched nothing: the query offset is + // below disk retention too, so the match (if any) is journal- + // resident. Serve the journal forward (retention-recovery) from + // the resident-tail snapshot with the ORIGINAL query (offset or + // timestamp); no contiguity gate, this is not a straddle. + DiskReadOutcome::Empty => { + crate::journal::select_resident(&resident_tail.entries, query) + .unwrap_or_else(|| (PollFragments::new(), None)) + } + // Disk read stopped on an IO fault. Fail-closed: return an empty + // poll WITHOUT the journal-forward fallback. Falling forward + // here would splice the next resident op over the unreadable run + // and silently skip live data; the fault instead surfaces as a + // visibly stuck consumer that recovers on a later poll once the + // segment reads again. + DiskReadOutcome::Faulted => (PollFragments::new(), None), + // Straddle: continue past the last disk match into the resident + // tail (gate + race argument live on `straddle_continuation`). + DiskReadOutcome::Matched { + mut fragments, + last_matching_offset, + matched, + } => { + let remaining = query.count().saturating_sub(matched); + let continuation = last_matching_offset + .and_then(|last_offset| { + resident_tail.straddle_continuation( + last_offset, + remaining, + query.ceiling(), + ) + }) + .and_then(|query| { + crate::journal::select_resident(&resident_tail.entries, query) + }); + match continuation { + Some((journal_fragments, journal_last)) => { + fragments.extend(journal_fragments); + (fragments, journal_last.or(last_matching_offset)) + } + None => (fragments, last_matching_offset), + } + } + }, + }; + + if let (Some(last_polled), Some(last_offset)) = (&self.last_polled, last_matching_offset) { + last_polled.record(last_offset); + } + + if let Some(auto_commit) = self.auto_commit + && !fragments.is_empty() + && let Some(last_offset) = last_matching_offset + { + match auto_commit.persist(last_offset).await { + // Apply to the in-memory map on the owned `Arc` (no borrow). + Ok(()) => auto_commit.apply(last_offset), + Err(err) => warn!( + target: "iggy.partitions.diag", + last_offset, + %err, + "poll_read: failed to persist consumer offset" + ), + } + } + + (fragments, commit_offset) + } + + /// Synchronous fast path for a fully-resident poll + /// ([`Self::needs_off_pump_io`] is `false`): no disk read, no + /// consumer-offset persist, so the pump can reply inline without spawning. + #[must_use] + pub fn execute_resident(self) -> (PollFragments<4096>, u64) { + let commit_offset = self.commit_offset; + let (fragments, last_matching_offset) = match self.tier { + PollTier::Empty => (PollFragments::new(), None), + PollTier::Resident { + fragments, + last_matching_offset, + } => (fragments, last_matching_offset), + // `needs_off_pump_io` is true for every Disk tier, so the dispatch + // gate never routes one here. + PollTier::Disk { .. } => { + unreachable!("execute_resident on Disk tier; needs_off_pump_io guards this") + } + }; + if let (Some(last_polled), Some(last_offset)) = (&self.last_polled, last_matching_offset) { + last_polled.record(last_offset); + } + // An auto-commit reaches the resident fast path only when it needs no + // disk persist (`needs_off_pump_io` gate), so apply the offset to the + // in-memory map inline. `execute()` does the same after its persist; + // skipping it here would silently drop the committed offset. + if let Some(auto_commit) = self.auto_commit + && !fragments.is_empty() + && let Some(last_offset) = last_matching_offset + { + auto_commit.apply(last_offset); + } + (fragments, commit_offset) + } +} + +pub enum PollTier { + Empty, + Resident { + fragments: PollFragments<4096>, + last_matching_offset: Option, + }, + Disk { + disk: DiskReadPlan, + query: MessageLookup, + /// Resident journal tail snapshot for the straddle continuation, + /// captured at plan time so the splice runs off the partition borrow. + resident_tail: ResidentTailSnapshot, + }, +} + +/// Outcome of [`DiskReadPlan::read_disk`], distinguishing a benign empty walk +/// from an IO fault so the caller can fail-closed. +/// +/// A faulted segment may hold data that is present-but-unreadable right now; +/// the disk walk stops at the fault (never advancing to later segments) so a +/// poll cannot return a gap. The caller must NOT fall the journal forward over +/// a `Faulted` result, or it would splice the next resident op over the +/// unreadable run and silently skip live messages. +pub enum DiskReadOutcome { + /// Walk produced matches (possibly a partial prefix if a fault stopped it). + Matched { + fragments: PollFragments<4096>, + last_matching_offset: Option, + matched: u32, + }, + /// Walk completed with no fault and matched nothing. The query offset is + /// below disk retention too, so the caller may serve the journal forward + /// (retention-recovery) without skipping anything. + Empty, + /// Walk stopped on an IO fault before matching anything. Fail-closed: the + /// caller returns an empty poll so the consumer cursor does not advance + /// past data that may still be present-but-unreadable. + Faulted, +} + +impl DiskReadPlan { + /// Serve a poll from the on-disk segment files, off the partition borrow. + /// Reads from owned descriptors so no partition reference is held across + /// the file IO. Walks stamped `[256B SendMessages2Header][blob]` batches in + /// chunked reads, re-reading a batch split across a chunk boundary in the + /// next chunk. + #[allow(clippy::cast_possible_truncation)] + pub(crate) async fn read_disk(self, query: MessageLookup) -> DiskReadOutcome { + const DISK_POLL_CHUNK: u64 = 1 << 20; + + let count = query.count(); + if count == 0 || self.segments.is_empty() { + return DiskReadOutcome::Empty; + } + let Some(partition_dir) = self.partition_dir.as_deref() else { + // Simulated in-memory persistence, or no writer was resolvable + // (e.g. mid-rotation): no files to read. This is not an IO fault on + // present data, so it is `Empty`: the caller serves the resident + // journal tier (the sim's only tier) without skipping anything. + // TODO(hubcio): a live partition mid-rotation can also land here + // with disk-resident-but-unresolvable data; the journal-forward + // could then skip those offsets. Distinguish sim/no-files (Empty) + // from a transiently-unresolvable writer (Faulted, fail-closed). + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace_raw, + segment_count = self.segments.len(), + "disk poll: no partition dir to resolve segment files; serving journal tier" + ); + return DiskReadOutcome::Empty; + }; + + // `start_position` applies to the first snapshotted segment; each later + // segment is walked from byte 0 (reset at the end of every iteration). + let mut position = self.start_position; + let mut fragments = PollFragments::new(); + let mut last_matching_offset = None; + let mut matched: u32 = 0; + // Set when an open/read retry exhausts. The walk breaks immediately so + // later segments are never read into the result (which would leave a + // gap at the faulted segment). Pre-fault matches are still served. + let mut faulted = false; + + 'walk: for segment in &self.segments { + if matched >= count { + break; + } + let persisted = segment.persisted; + if persisted == 0 || position >= persisted { + // Benign skip: nothing persisted for this segment yet, or the + // start position is already past it. Not a fault. + position = 0; + continue; + } + let path = format!("{partition_dir}/{:0>20}.log", segment.start_offset); + let Some(file) = self.open_segment_with_retry(&path).await else { + // Open exhausted retries: the segment may hold present-but- + // unreadable data. Stop here rather than walking past it. + faulted = true; + break 'walk; + }; + + let mut chunk_len = DISK_POLL_CHUNK; + while matched < count && position < persisted { + let len = (persisted - position).min(chunk_len) as usize; + let Some(chunk) = self.read_chunk_with_retry(&file, position, len).await else { + // Chunk read exhausted retries: same fail-closed reason as + // a failed open. + faulted = true; + break 'walk; + }; + let consumed = walk_disk_chunk( + &chunk, + query, + count, + &mut matched, + &mut fragments, + &mut last_matching_offset, + ); + if consumed == 0 { + if (len as u64) >= persisted - position { + // The whole remainder fit yet no complete batch + // decoded: a corrupt batch in this segment. Fail-closed + // like an IO fault (set `faulted`, stop the walk) so a + // later segment is never served over the corrupt run, + // which would punch a silent gap into the poll. + faulted = true; + break 'walk; + } + // A single batch larger than the chunk: grow and re-read. + chunk_len = chunk_len.saturating_mul(4); + continue; + } + chunk_len = DISK_POLL_CHUNK; + position += consumed as u64; + } + position = 0; + } + + if matched > 0 { + // Pre-fault matches are always a contiguous prefix (the walk stops + // at the first fault), so a partial result carries no gap. + DiskReadOutcome::Matched { + fragments, + last_matching_offset, + matched, + } + } else if faulted { + DiskReadOutcome::Faulted + } else { + DiskReadOutcome::Empty + } + } + + /// Open a segment file for a disk poll, retrying transient IO failures (fd + /// pressure under heavy parallel load) so one failed syscall does not + /// silently collapse the poll into an empty result. + async fn open_segment_with_retry(&self, path: &str) -> Option { + for attempt in 0..3u8 { + match compio::fs::File::open(path).await { + Ok(file) => return Some(file), + Err(error) => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace_raw, + path, + attempt, + %error, + "disk poll: failed to open segment file" + ); + compio::time::sleep(std::time::Duration::from_millis(10)).await; + } + } + } + None + } + + /// Read one chunk for a disk poll, retrying transient IO failures. + async fn read_chunk_with_retry( + &self, + file: &compio::fs::File, + position: u64, + len: usize, + ) -> Option> { + for attempt in 0..3u8 { + // `with_capacity` (len == 0, capacity == len) instead of `zeroed`: + // `read_exact_at` fills the whole capacity in place and advances the + // length via `SetLen`, so the `zeroed` memset of up to 1MiB per + // chunk was pure waste - every byte is overwritten by the read. + let buffer = Owned::<4096>::with_capacity(len); + let compio::BufResult(read, buffer) = file.read_exact_at(buffer, position).await; + match read { + Ok(()) => return Some(Frozen::from(buffer)), + Err(error) => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace_raw, + position, + attempt, + %error, + "disk poll: segment read failed" + ); + compio::time::sleep(std::time::Duration::from_millis(10)).await; + } + } + } + None + } +} + +impl AutoCommitCtx { + /// Whether applying this auto-commit needs a real disk persist. `false` for + /// sim/dev partitions with no `offset_path`, where the apply is a sync, + /// in-memory `papaya` store that the pump can run inline. + pub(crate) const fn needs_persist(&self) -> bool { + self.offset_path.is_some() + } + + /// Persist the committed offset to disk, off the partition borrow. + pub(crate) async fn persist(&self, offset: u64) -> Result<(), IggyError> { + match &self.offset_path { + Some(path) => persist_offset(path, offset, self.enforce_fsync).await, + None => Ok(()), + } + } + + /// Apply the committed offset to the in-memory map on the owned `Arc` + /// handle, with NO partition reference. Uses the monotone + /// [`upsert_offset_max`] so a stale off-pump auto-commit cannot rewind a + /// newer explicit store; the maps are lock-free (`papaya`), so this is + /// sound off the pump task. + #[allow(clippy::cast_possible_truncation)] + pub(crate) fn apply(&self, offset: u64) { + match &self.target { + AutoCommitTarget::Consumer { + offsets, + consumer_id, + create_path, + } => { + let consumer_id = *consumer_id; + let map: &ConsumerOffsets = offsets; + upsert_offset_max(map, consumer_id as usize, offset, || { + create_path.as_deref().map_or_else( + || { + ConsumerOffset::new( + ConsumerKind::Consumer, + consumer_id, + 0, + String::new(), + ) + }, + |path| ConsumerOffset::default_for_consumer(consumer_id, path), + ) + }); + } + AutoCommitTarget::ConsumerGroup { + offsets, + group_id, + create_path, + } => { + let group_id = *group_id; + let key = ConsumerGroupId(group_id as usize); + let map: &ConsumerGroupOffsets = offsets; + upsert_offset_max(map, key, offset, || { + create_path.as_deref().map_or_else( + || { + ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + group_id, + 0, + String::new(), + ) + }, + |path| ConsumerOffset::default_for_consumer_group(key, path), + ) + }); + } + } + } +} + +/// Upsert a committed offset into a lock-free `papaya` offset map: bump an +/// existing entry in place, or build one via `create_on_miss` on first commit +/// for a consumer/group that has none yet. Shared by the pump's +/// [`IggyPartition::apply_consumer_offset_commit`] and the off-pump +/// [`AutoCommitCtx::apply`] so both store offsets identically. +pub fn upsert_offset( + map: &papaya::HashMap, + key: K, + offset: u64, + create_on_miss: impl FnOnce() -> ConsumerOffset, +) where + K: Hash + Eq + Clone + Send + Sync, +{ + let guard = map.pin(); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + let created = create_on_miss(); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } +} + +/// Monotone variant of [`upsert_offset`] for the off-pump auto-commit: an +/// existing entry is bumped via `fetch_max` so a stale auto-commit racing a +/// newer explicit `StoreConsumerOffset` cannot rewind it backward. The +/// on-miss create branch is identical. The explicit pump path keeps +/// [`upsert_offset`] (`store`), since an explicit store may legitimately rewind. +fn upsert_offset_max( + map: &papaya::HashMap, + key: K, + offset: u64, + create_on_miss: impl FnOnce() -> ConsumerOffset, +) where + K: Hash + Eq + Clone + Send + Sync, +{ + let guard = map.pin(); + if let Some(existing) = guard.get(&key) { + existing.offset.fetch_max(offset, Ordering::Relaxed); + } else { + let created = create_on_miss(); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } +} + +/// Walk stamped `[256B SendMessages2Header][blob]` batches in one disk +/// chunk, pushing matching fragments. Returns bytes consumed: the start +/// of the first batch that did not fully fit in the chunk (the caller +/// re-reads from there), or the chunk end when everything decoded. +fn walk_disk_chunk( + chunk: &Frozen<4096>, + query: MessageLookup, + count: u32, + matched: &mut u32, + fragments: &mut PollFragments<4096>, + last_matching_offset: &mut Option, +) -> usize { + let bytes: &[u8] = chunk; + let mut cursor = 0usize; + + while *matched < count && cursor + COMMAND_HEADER_SIZE <= bytes.len() { + let Ok(batch) = decode_batch_slice(&bytes[cursor..]) else { + // Incomplete tail batch (or corrupt data): hand the position + // back so the caller can re-read or bail. + break; + }; + let total_size = batch.header.total_size(); + + if let Some(selection) = select_batch_slice(&batch, query, *matched) { + // On disk a batch is the bare `[256B header][blob]`, so the batch + // base is the chunk cursor (no preceding prepare header). + push_selected_batch_fragments( + fragments, + last_matching_offset, + matched, + chunk, + cursor, + &batch, + selection, + ); + } + + cursor += total_size; + } + + cursor.min(bytes.len()) +} + +#[cfg(test)] +mod tests { + use super::*; + use server_common::iobuf::Owned; + + fn non_empty_fragments() -> PollFragments<4096> { + let mut fragments = PollFragments::new(); + fragments.push(crate::types::Fragment::whole( + Owned::<4096>::zeroed(8).into(), + )); + fragments + } + + fn consumer_auto_commit( + offsets: Arc, + consumer_id: u32, + offset_path: Option, + ) -> AutoCommitCtx { + AutoCommitCtx { + offset_path, + enforce_fsync: false, + target: AutoCommitTarget::Consumer { + offsets, + consumer_id, + create_path: None, + }, + } + } + + #[test] + fn resident_auto_commit_without_offset_path_applies_inline() { + // sim/dev partition: auto_commit is requested but there is no + // `offset_path`, so the apply is a sync in-memory store. The plan must + // stay on the resident fast path (no detached task) AND still record + // the committed offset, which the resident path used to drop. + let offsets = Arc::new(ConsumerOffsets::with_capacity(1)); + let plan = PollPlan { + commit_offset: 42, + auto_commit: Some(consumer_auto_commit(offsets.clone(), 7, None)), + last_polled: None, + tier: PollTier::Resident { + fragments: non_empty_fragments(), + last_matching_offset: Some(5), + }, + }; + + assert!( + !plan.needs_off_pump_io(), + "no offset_path means the apply is sync; the pump must not spawn", + ); + + let (fragments, commit_offset) = plan.execute_resident(); + assert!(!fragments.is_empty(), "resident fragments must be returned"); + assert_eq!(commit_offset, 42, "commit offset is forwarded verbatim"); + + let stored = offsets + .pin() + .get(&7usize) + .map(|entry| entry.offset.load(Ordering::Relaxed)); + assert_eq!( + stored, + Some(5), + "the in-memory auto-commit must be applied on the resident path", + ); + } + + #[test] + fn resident_auto_commit_with_offset_path_needs_off_pump_io() { + let offsets = Arc::new(ConsumerOffsets::with_capacity(1)); + let plan = PollPlan { + commit_offset: 0, + auto_commit: Some(consumer_auto_commit( + offsets, + 7, + Some("some/path".to_owned()), + )), + last_polled: None, + tier: PollTier::Resident { + fragments: non_empty_fragments(), + last_matching_offset: Some(5), + }, + }; + assert!( + plan.needs_off_pump_io(), + "a real offset_path persist must be spawned off the pump", + ); + } + + #[test] + fn auto_commit_apply_is_monotone_but_explicit_store_rewinds() { + // Auto-commit must never rewind a newer offset (anti-rewind via + // fetch_max); an explicit StoreConsumerOffset may legitimately rewind. + let offsets = Arc::new(ConsumerOffsets::with_capacity(1)); + let auto_commit = consumer_auto_commit(offsets.clone(), 7, None); + + auto_commit.apply(10); + let after_high = offsets + .pin() + .get(&7usize) + .map(|entry| entry.offset.load(Ordering::Relaxed)); + assert_eq!(after_high, Some(10)); + + // A stale auto-commit with a smaller offset must not rewind. + auto_commit.apply(4); + let after_stale = offsets + .pin() + .get(&7usize) + .map(|entry| entry.offset.load(Ordering::Relaxed)); + assert_eq!(after_stale, Some(10), "auto-commit fetch_max must hold"); + + // The explicit pump path (store-semantics) still rewinds to 4. + upsert_offset(&offsets, 7usize, 4, || { + ConsumerOffset::new(ConsumerKind::Consumer, 7, 0, String::new()) + }); + let after_explicit = offsets + .pin() + .get(&7usize) + .map(|entry| entry.offset.load(Ordering::Relaxed)); + assert_eq!( + after_explicit, + Some(4), + "explicit store may rewind below the auto-committed offset", + ); + } +} diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 1e93ce8041..388d1df37c 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -325,9 +325,10 @@ impl Drop for ShutdownOnDrop { /// `ReadHandleFactory`s) and pushes one clone per peer onto `bundle_tx`. /// Every other shard receives the bundle and rebuilds a reader-mode /// `MuxStateMachine` on its own runtime - no WAL access, no replay, no -/// `RecoverySync` two-phase fence. Phase 2 of the old handshake was -/// only there to keep peer scans away from shard 0's torn-tail repair; -/// with no peer scan that race is structurally gone. +/// `RecoverySync` two-phase fence. The old phase-2 WAL fence is gone +/// because peers no longer scan the WAL. They do still scan live shared +/// metadata to load their on-disk partitions, so a separate listener +/// fence is still required - see [`BootstrapBarrier`]. /// /// The channel is bounded to the peer count so shard 0's `send` never /// blocks beyond a peer drain. A peer that dies before recv drops its @@ -343,6 +344,33 @@ enum MetadataHandoff { }, } +/// Reverse handshake to [`MetadataHandoff`]: gates shard 0's client +/// listeners until every peer has loaded its on-disk partitions. +/// +/// Peers build their owned-partition set from live shared metadata and +/// load each segment from disk in `build_shard_for_thread`. If shard 0 +/// opened listeners the instant `broadcast_metadata_bundle` returned +/// (peers have only *received* the bundle, not *loaded* partitions), a +/// client could create a partition before a peer's load scan finished. +/// That freshly committed partition would surface in the peer's scan +/// with no segment dir on disk yet, and `load_partition`'s `walk_dir` +/// would fail with `CannotReadPartitions`, aborting the whole node. A +/// partition created after boot must take the runtime reconciler path +/// (which creates its dir), never the bootstrap load path. +/// +/// Shard 0 (`Owner`) drains one signal per peer before binding +/// listeners; each peer (`Waiter`) sends one once its load completes. +/// The cross-thread shutdown flag drives both sides out of their poll +/// loop if any shard dies mid-boot. +enum BootstrapBarrier { + Owner { + ready_rx: crossfire::MAsyncRx>, + }, + Waiter { + ready_tx: crossfire::MAsyncTx>, + }, +} + struct TcpTopology { /// Domain-separation cluster id derived from `cluster.name`; threaded to /// every consensus instance and the replica handshake so frames agree. @@ -527,6 +555,12 @@ pub fn bootstrap( let (metadata_bundle_tx, metadata_bundle_rx) = crossfire::mpmc::bounded_async::(metadata_peers); + // Reverse barrier (see `BootstrapBarrier`): every peer sends one + // signal once it finishes loading its on-disk partitions; shard 0 + // drains them all before binding listeners. Bounded to the peer + // count so a sender never blocks (each peer sends exactly once). + let (ready_tx, ready_rx) = crossfire::mpmc::bounded_async::(metadata_peers); + let mut shard_threads: Vec<(u16, thread::JoinHandle>)> = Vec::with_capacity(shards_count); for (idx, assignment) in assignments.into_iter().enumerate() { @@ -548,6 +582,15 @@ pub fn bootstrap( bundle_rx: metadata_bundle_rx.clone(), } }; + let barrier_for_shard = if shard_id == 0 { + BootstrapBarrier::Owner { + ready_rx: ready_rx.clone(), + } + } else { + BootstrapBarrier::Waiter { + ready_tx: ready_tx.clone(), + } + }; let handle = match thread::Builder::new() .name(format!("shard-{shard_id}")) @@ -562,6 +605,7 @@ pub fn bootstrap( config_for_shard, shutdown_flag_for_shard, metadata_handoff_for_shard, + barrier_for_shard, owner_table_for_shard, ) }) { @@ -577,6 +621,8 @@ pub fn bootstrap( // would hang until the shutdown watchdog kicks the bus. drop(metadata_bundle_tx); drop(metadata_bundle_rx); + drop(ready_tx); + drop(ready_rx); join_partial_shard_survivors(shard_threads); return Err(ServerNgError::ShardSpawnFailed { shard_id, source }); } @@ -590,6 +636,8 @@ pub fn bootstrap( // disconnects. drop(metadata_bundle_tx); drop(metadata_bundle_rx); + drop(ready_tx); + drop(ready_rx); info!( shards_count, @@ -615,6 +663,7 @@ fn run_shard_thread( config: Arc, shutdown_flag: Arc, metadata_handoff: MetadataHandoff, + barrier: BootstrapBarrier, owner_table: Arc, ) -> Result<(), ServerNgError> { // Armed for the whole thread body: a post-spawn error `?` or a panic @@ -650,6 +699,7 @@ fn run_shard_thread( &config, shutdown_flag, metadata_handoff, + barrier, owner_table, )) .await @@ -675,6 +725,7 @@ async fn shard_main( config: &ServerNgConfig, shutdown_flag: Arc, metadata_handoff: MetadataHandoff, + barrier: BootstrapBarrier, owner_table: Arc, ) -> Result<(), ServerNgError> { let topology = resolve_tcp_topology(config, replica_id)?; @@ -844,6 +895,9 @@ async fn shard_main( drop(metrics_for_notifier); } + // The pump task also drives the consensus timer tick (heartbeats, prepare + // retransmit, view-change timeouts) as a select! arm, serialized with frame + // processing - see `run_message_pump`. let (stop_tx, stop_rx) = channel(1); let pump_shard = Rc::clone(&shard); let pump_handle = compio::runtime::spawn(async move { @@ -878,39 +932,6 @@ async fn shard_main( }); bus.track_background(reconciler_handle); - // Consensus timer driver: heartbeats, prepare retransmit, and - // view-change timeouts only advance when `VsrConsensus::tick` runs - // ("call this periodically, e.g. every 10ms"). The simulator steps it - // explicitly; production drives it here. Without this, a prepare lost - // to a transient replica-link blip is never retransmitted and its - // client request hangs until the SDK read timeout. - let (consensus_tick_stop_tx, consensus_tick_stop_rx) = channel::<()>(1); - let tick_shard = Rc::clone(&shard); - let consensus_tick_handle = compio::runtime::spawn(async move { - const CONSENSUS_TICK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10); - loop { - if consensus_tick_stop_rx.try_recv().is_ok() { - break; - } - tick_shard.tick_metadata().await; - tick_shard.tick_partitions().await; - // While a cooperative revocation is pending, wake the reconciler each - // tick so it completes the handoff within ~one tick of the source - // draining the partition, instead of waiting for the 1s periodic pass. - if tick_shard - .plane - .metadata() - .mux_stm - .streams() - .has_pending_revocations() - { - tick_shard.dispatch_metadata_commit_tick(); - } - compio::time::sleep(CONSENSUS_TICK_INTERVAL).await; - } - }); - bus.track_background(consensus_tick_handle); - // Per-shard heartbeat verifier: evicts connections that stop pinging, // releasing their consumer-group membership. Gated on config so a // deployment without heartbeats never reaps live sessions. @@ -928,7 +949,6 @@ async fn shard_main( } else { None }; - // Expired-PAT cleaner: shard 0 only (it owns the metadata consensus // group) and only when enabled. Each pass no-ops unless this node is // the caught-up metadata primary, so the delete is proposed once and @@ -951,17 +971,42 @@ async fn shard_main( None }; + // Listener fence (see `BootstrapBarrier`). Peers still scan live + // shared metadata and load their on-disk partitions in + // `build_shard_for_thread`; the factory-bundle handoff only proves + // they *received* the bundle, not that they finished loading. Shard + // 0 must not accept client traffic until every peer's load scan is + // done, otherwise a partition created by the first client surfaces + // in a still-running scan with no segment dir on disk and aborts the + // node with `CannotReadPartitions`. By this point every shard has + // also spawned its pump + reconciler, so a partition created after + // the fence takes the runtime reconciler path on its owning shard. + match barrier { + BootstrapBarrier::Owner { ready_rx } => { + await_bootstrap_complete( + &ready_rx, + usize::from(total_shards.saturating_sub(1)), + &shutdown_flag_for_handoff, + poll_interval, + ) + .await?; + } + BootstrapBarrier::Waiter { ready_tx } => { + signal_bootstrap_complete( + shard_id, + &ready_tx, + &shutdown_flag_for_handoff, + poll_interval, + ) + .await?; + } + } + // Listeners (replica + every client transport) bind on shard 0 only. // Shard 0's coordinator round-robins inbound TCP/WS connections to // peer shards via fd-transfer. QUIC and TCP-TLS clients terminate // locally on shard 0 (their per-connection state is non-portable - // see `LifecycleFrame::ClientWsConnectionSetup` rustdoc). - // - // No phase-2 listener fence is needed: peer shards no longer scan - // the WAL, so a shard-0 append accepted mid-boot cannot race a - // peer's `truncate_or_fail`. The factory-bundle handoff has already - // installed reader-mode `MuxStateMachine`s on every peer by the - // time shard 0 returns from `broadcast_metadata_bundle`. if shard_id == 0 { let coord = shard .coordinator() @@ -983,7 +1028,6 @@ async fn shard_main( { let _ = stop_tx.try_send(()); let _ = reconcile_stop_tx.try_send(()); - let _ = consensus_tick_stop_tx.try_send(()); if let Some(tx) = &heartbeat_stop_tx { let _ = tx.try_send(()); } @@ -997,7 +1041,6 @@ async fn shard_main( bus.token().wait().await; let _ = stop_tx.try_send(()); let _ = reconcile_stop_tx.try_send(()); - let _ = consensus_tick_stop_tx.try_send(()); if let Some(tx) = &heartbeat_stop_tx { let _ = tx.try_send(()); } @@ -1092,6 +1135,70 @@ async fn broadcast_metadata_bundle( Ok(()) } +/// Peer side of [`BootstrapBarrier`]: tell shard 0 this shard finished +/// loading its on-disk partitions. Mirrors [`broadcast_metadata_bundle`]'s +/// `try_send`-or-shutdown poll loop so a sibling failure (which flips the +/// shutdown flag) drives this out instead of stranding it on a full +/// channel. The channel is sized to the peer count and each peer sends +/// exactly once, so `Full` is not expected; the branch only keeps the +/// loop interruptible. +async fn signal_bootstrap_complete( + shard_id: u16, + ready_tx: &crossfire::MAsyncTx>, + shutdown_flag: &Arc, + poll_interval: Duration, +) -> Result<(), ServerNgError> { + let mut pending = shard_id; + loop { + match ready_tx.try_send(pending) { + Ok(()) => return Ok(()), + Err(crossfire::TrySendError::Disconnected(_)) => { + // Shard 0 dropped its `ready_rx` before draining (it + // aborted before binding listeners). Propagate so this + // shard short-circuits; the shutdown flag flips via the + // normal teardown path. + return Err(ServerNgError::MetadataHandoffAborted { shard_id }); + } + Err(crossfire::TrySendError::Full(returned)) => { + if shutdown_flag.load(Ordering::Relaxed) { + return Err(ServerNgError::MetadataHandoffAborted { shard_id }); + } + pending = returned; + compio::time::sleep(poll_interval).await; + } + } + } +} + +/// Owner side of [`BootstrapBarrier`]: drain one ready signal per peer +/// before shard 0 binds listeners. Polls the shutdown flag so a peer that +/// dies mid-load (flipping the flag) aborts the wait instead of hanging on +/// a signal that will never arrive. A single shard (`peers == 0`) returns +/// immediately. +async fn await_bootstrap_complete( + ready_rx: &crossfire::MAsyncRx>, + peers: usize, + shutdown_flag: &Arc, + poll_interval: Duration, +) -> Result<(), ServerNgError> { + let mut remaining = peers; + while remaining > 0 { + match ready_rx.try_recv() { + Ok(_shard_id) => remaining -= 1, + Err(crossfire::TryRecvError::Disconnected) => { + return Err(ServerNgError::ShardBootstrapBarrierAborted { remaining }); + } + Err(crossfire::TryRecvError::Empty) => { + if shutdown_flag.load(Ordering::Relaxed) { + return Err(ServerNgError::ShardBootstrapBarrierAborted { remaining }); + } + compio::time::sleep(poll_interval).await; + } + } + } + Ok(()) +} + /// Spawn a per-shard polling task that watches the cross-thread shutdown /// flag and triggers this shard's bus shutdown on transition. The flag /// is the only Send signal we have; the bus' shutdown machinery is @@ -2648,4 +2755,82 @@ mod tests { "expected MetadataHandoffAborted on shutdown, got {err:?}" ); } + + #[compio::test] + async fn await_bootstrap_complete_returns_immediately_for_single_shard() { + // A single-shard server has no peers to wait on; the owner barrier + // must not block when `peers == 0`. + let (_ready_tx, ready_rx) = crossfire::mpmc::bounded_async::(1); + let flag = Arc::new(AtomicBool::new(false)); + await_bootstrap_complete(&ready_rx, 0, &flag, TEST_POLL_INTERVAL) + .await + .expect("single-shard server must not block on the barrier"); + } + + #[compio::test] + async fn await_bootstrap_complete_drains_every_peer_signal() { + // Two peers report load-complete; shard 0 drains both, then proceeds + // to bind listeners. + let (ready_tx, ready_rx) = crossfire::mpmc::bounded_async::(2); + let flag = Arc::new(AtomicBool::new(false)); + signal_bootstrap_complete(1, &ready_tx, &flag, TEST_POLL_INTERVAL) + .await + .expect("peer 1 must signal load-complete"); + signal_bootstrap_complete(2, &ready_tx, &flag, TEST_POLL_INTERVAL) + .await + .expect("peer 2 must signal load-complete"); + await_bootstrap_complete(&ready_rx, 2, &flag, TEST_POLL_INTERVAL) + .await + .expect("owner must drain both peer signals"); + } + + #[compio::test] + async fn await_bootstrap_complete_aborts_on_shutdown_flag() { + use compio::runtime::ResumeUnwind; + + // `_ready_tx` is held so the channel is not disconnected: the owner + // must exit via the shutdown flag, not a dropped sender. + let (_ready_tx, ready_rx) = crossfire::mpmc::bounded_async::(1); + let flag = Arc::new(AtomicBool::new(false)); + + let owner = compio::runtime::spawn({ + let flag = Arc::clone(&flag); + async move { await_bootstrap_complete(&ready_rx, 1, &flag, TEST_POLL_INTERVAL).await } + }); + + // The peer never signals, but a sibling failure flips the flag; the + // owner must abort instead of hanging before listeners. + compio::time::sleep(TEST_POLL_INTERVAL / 2).await; + flag.store(true, Ordering::Relaxed); + + let err = owner + .await + .resume_unwind() + .expect("owner task was cancelled") + .expect_err("shutdown flag must abort the barrier wait"); + assert!( + matches!( + err, + ServerNgError::ShardBootstrapBarrierAborted { remaining: 1 } + ), + "expected ShardBootstrapBarrierAborted, got {err:?}" + ); + } + + #[compio::test] + async fn signal_bootstrap_complete_aborts_when_owner_drops_rx() { + // Shard 0 aborted before draining and dropped its receiver; a peer's + // signal must surface the disconnect instead of stranding. + let (ready_tx, ready_rx) = crossfire::mpmc::bounded_async::(1); + let flag = Arc::new(AtomicBool::new(false)); + drop(ready_rx); + + let err = signal_bootstrap_complete(2, &ready_tx, &flag, TEST_POLL_INTERVAL) + .await + .expect_err("dropped rx must surface as an abort"); + assert!( + matches!(err, ServerNgError::MetadataHandoffAborted { shard_id: 2 }), + "expected MetadataHandoffAborted, got {err:?}" + ); + } } diff --git a/core/server-ng/src/dispatch.rs b/core/server-ng/src/dispatch.rs index 24edd63bc5..245663b496 100644 --- a/core/server-ng/src/dispatch.rs +++ b/core/server-ng/src/dispatch.rs @@ -66,7 +66,7 @@ use message_bus::client_listener::RequestHandler; use message_bus::replica::listener::MessageHandler; use message_bus::{IggyMessageBus, MessageBus}; use metadata::impls::metadata::{MetadataSubmitError, StreamsFrontend}; -use partitions::{Partition, PollingArgs, PollingConsumer}; +use partitions::{PollPlan, PollingArgs, PollingConsumer}; use secrecy::ExposeSecret; use server_common::Message; use server_common::sharding::IggyNamespace; @@ -141,83 +141,93 @@ pub(crate) fn make_partition_read_handler( shard_handle: &ServerNgShardHandle, ) -> PartitionReadHandler { let shard_handle = Rc::clone(shard_handle); + // Runs synchronously on the shard pump (see `process_lifecycle` -> + // `on_partition_read`). `build_poll_snapshot` takes the partition borrow via + // `with_partition` (closure-scoped, debug `BorrowGuard`) and returns an owned + // `PollPlan`; only owned data crosses into `spawn_poll_io`. A fully-resident + // poll replies here without spawning. See the `poll_plan` module docs. Rc::new(move |namespace, read, reply| { - let shard_handle = Rc::clone(&shard_handle); - // The poll awaits journal reads; run it as a task so the shard pump - // is not blocked. Same single-threaded-runtime discipline as the - // pump's own `on_request` path: the partition reference is resolved - // after the tombstone check and the read races only reconciler - // removal, which tombstones the namespace first. - compio::runtime::spawn(async move { - let Some(shard) = upgrade_shard_handle(&shard_handle) else { - return; - }; - let partitions = shard.plane.partitions(); - if partitions.is_tombstoned(&namespace) { - let _ = reply.try_send(PartitionReadReply::NotFound); - return; - } - let Some(partition) = partitions.get_by_ns(&namespace) else { - let _ = reply.try_send(PartitionReadReply::NotFound); - return; - }; - let result = match read { - PartitionRead::Poll { consumer, args } => { - let poll_started = std::time::Instant::now(); - let poll_result = partition.poll_messages(consumer, args).await; - let elapsed = poll_started.elapsed(); - if elapsed > std::time::Duration::from_secs(1) { - warn!( - namespace_raw = namespace.inner(), - elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX), - "slow partition poll; gather side may have timed out" - ); + let Some(shard) = upgrade_shard_handle(&shard_handle) else { + return; + }; + let partitions = shard.plane.partitions(); + match read { + PartitionRead::Poll { consumer, args } => { + match partitions.build_poll_snapshot(&namespace, consumer, &args) { + None => { + let _ = reply.try_send(PartitionReadReply::NotFound); } - match poll_result { - Ok((fragments, _last_matching_offset)) => PartitionReadReply::Poll { + Some(plan) if plan.needs_off_pump_io() => { + spawn_poll_io(namespace, plan, reply); + } + Some(plan) => { + let (fragments, current_offset) = plan.execute_resident(); + let _ = reply.try_send(PartitionReadReply::Poll { fragments, - current_offset: partition.offsets().commit_offset, - }, - Err(error) => { - warn!( - namespace_raw = namespace.inner(), - error = %error, - "partition poll failed" - ); - PartitionReadReply::NotFound - } + current_offset, + }); } } - PartitionRead::ConsumerOffset { consumer } => PartitionReadReply::ConsumerOffset { - stored: partition.get_consumer_offset(consumer), - current_offset: partition.offsets().commit_offset, - }, - PartitionRead::GroupOffsetState { group_id } => { - #[allow(clippy::cast_possible_truncation)] - let key = iggy_common::ConsumerGroupId(group_id as usize); - let load = |offset: &iggy_common::ConsumerOffset| { - offset.offset.load(std::sync::atomic::Ordering::Relaxed) - }; - let committed = partition.consumer_group_offsets.pin().get(&key).map(load); - let last_polled = partition.last_polled_offsets.pin().get(&key).map(load); - PartitionReadReply::GroupOffsetState { + } + PartitionRead::ConsumerOffset { consumer } => { + let result = match partitions.consumer_offset_read(&namespace, consumer) { + Some((stored, current_offset)) => PartitionReadReply::ConsumerOffset { + stored, + current_offset, + }, + None => PartitionReadReply::NotFound, + }; + let _ = reply.try_send(result); + } + PartitionRead::GroupOffsetState { group_id } => { + let result = match partitions.group_offset_state(&namespace, group_id) { + Some((last_polled, committed)) => PartitionReadReply::GroupOffsetState { last_polled, committed, - } - } - PartitionRead::ClearGroupLastPolled { group_id } => { - #[allow(clippy::cast_possible_truncation)] - let key = iggy_common::ConsumerGroupId(group_id as usize); - partition.last_polled_offsets.pin().remove(&key); - PartitionReadReply::Ack - } - }; - let _ = reply.try_send(result); - }) - .detach(); + }, + None => PartitionReadReply::NotFound, + }; + let _ = reply.try_send(result); + } + PartitionRead::ClearGroupLastPolled { group_id } => { + let result = match partitions.clear_group_last_polled(&namespace, group_id) { + Some(()) => PartitionReadReply::Ack, + None => PartitionReadReply::NotFound, + }; + let _ = reply.try_send(result); + } + } }) } +/// Spawn the off-pump leg of a partition poll: disk read + auto-commit +/// persist/apply on the OWNED plan (disk descriptors, resident-tail `Frozen` +/// clones, `Arc` offset map), then send the reply. Holds no partition +/// reference, so it is sound concurrently with the pump's `&mut` writes. +fn spawn_poll_io( + namespace: IggyNamespace, + plan: PollPlan, + reply: shard::Sender, +) { + compio::runtime::spawn(async move { + let poll_started = std::time::Instant::now(); + let (fragments, current_offset) = plan.execute().await; + let elapsed = poll_started.elapsed(); + if elapsed > std::time::Duration::from_secs(1) { + warn!( + namespace_raw = namespace.inner(), + elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX), + "slow partition poll; gather side may have timed out" + ); + } + let _ = reply.try_send(PartitionReadReply::Poll { + fragments, + current_offset, + }); + }) + .detach(); +} + pub(crate) fn make_deferred_replica_message_handler( shard_handle: &ServerNgShardHandle, ) -> MessageHandler { diff --git a/core/server-ng/src/partition_reconciler.rs b/core/server-ng/src/partition_reconciler.rs index 10152bb96c..89956e41f9 100644 --- a/core/server-ng/src/partition_reconciler.rs +++ b/core/server-ng/src/partition_reconciler.rs @@ -58,6 +58,7 @@ use consensus::{MetadataHandle, PartitionsHandle}; use futures::FutureExt; use iggy_common::{ConsumerGroupId, IggyTimestamp}; use metadata::impls::metadata::StreamsFrontend; +use partitions::delete_persisted_offset; use server_common::sharding::{IggyNamespace, ShardId}; use shard::MetadataSubmit; use shard::ReconcileOp; @@ -589,24 +590,24 @@ async fn reconcile_consumer_group_offsets(ctx: &ReconcilerCtx, counters: &mut Pa let partitions = ctx.shard.plane.partitions(); let owned: Vec = partitions.namespaces().copied().collect(); for ns in owned { - let Some(partition) = partitions.get_by_ns(&ns) else { + let live = live_groups.get(&(ns.stream_id(), ns.topic_id())); + // Take the in-memory removes + owned unlink paths under a closure-scoped + // borrow that cannot escape into the await below. Holding a raw + // `&IggyPartition` across `delete_persisted_offset().await` would let the + // pump task realloc the partitions vec underneath us (a UAF). + let paths = partitions.with_partition(&ns, |partition| { + partition.reclaim_dead_group_offsets(|group_id| { + live.is_some_and(|set| set.contains(&group_id)) + }) + }); + let Some(paths) = paths else { continue; }; - let stored = partition.consumer_group_offset_ids(); - if stored.is_empty() { - continue; - } - let live = live_groups.get(&(ns.stream_id(), ns.topic_id())); - for group_id in stored { - let still_live = live.is_some_and(|set| set.contains(&group_id)); - if still_live { - continue; - } - if let Err(err) = partition.delete_consumer_group_offset(group_id).await { + for path in paths { + if let Err(err) = delete_persisted_offset(&path).await { warn!( shard = ctx.shard.id, ns_raw = ns.inner(), - group_id, error = %err, "reconciler failed to reclaim deleted consumer-group offset" ); diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 35e93090e7..a3070dedda 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -104,6 +104,11 @@ pub enum ServerNgError { factory bundle; shard 0 dropped its sender (most likely it failed to recover)" )] MetadataHandoffAborted { shard_id: u16 }, + #[error( + "shard 0 aborted before binding listeners with {remaining} peer shard(s) still loading \ + their on-disk partitions; a peer most likely failed during bootstrap (shutdown flag set)" + )] + ShardBootstrapBarrierAborted { remaining: usize }, #[error("failed to parse {context} socket address '{address}'")] SocketAddressParse { context: &'static str, diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index c8d1230dce..b15236fcd5 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -1475,25 +1475,46 @@ where partitions.insert(namespace, partition); } - /// Handle incoming view-change/control message. Metadata use metadata - /// consensus. Partitions loop all partitions, use partition consensus. - /// - // TODO(hubcio): every VSR callback below - // (`on_start_view_change`, `on_do_view_change`, `on_start_view`, - // `on_commit`, `tick_partitions`) materialises - // `planes.1.0.namespaces().copied().collect::>()` per call to - // avoid borrowing the partitions plane across the partition-consensus - // `.await` inside the loop. Allocs scale with VSR traffic, not with - // useful work: a quiet cluster still pays one Vec per heartbeat tick. - // Convert to the `namespace_scratch: RefCell>` - // pattern already used by `process_loopback` (see :646-684) so the - // scratch is reused across calls. Asserts on entry/exit keep the - // "empty on entry, drained on exit" invariant explicit. - // - // Reproducible in `core/simulator`: sim drives these callbacks via - // `tick()` against `IggyShard`, so an allocation-counting variant of - // `MemStorage`/test harness can pin the per-VSR-cb alloc count and - // fail on regression after the scratch refactor lands. + /// Resolve the single partition a VSR control frame addresses, keyed by + /// `header.namespace`. Warns and returns `None` when the namespace matches + /// neither metadata nor a live partition consensus. Returns `&mut` because + /// `on_do_view_change` / `on_commit` need it for `commit_journal`; the read- + /// only callers reborrow `&`. Pump-only (sole mutator), so the `&mut` formed + /// here via interior mutability cannot alias a concurrent reconcile. + #[allow(clippy::mut_from_ref)] + fn resolve_partition_target<'a>( + &self, + partitions: &'a IggyPartitions, + namespace: u64, + view: u32, + replica: u8, + frame: &'static str, + ) -> Option<&'a mut IggyPartition> + where + B: MessageBus, + { + let Some(partition) = partitions.get_mut_by_ns(&IggyNamespace::from_raw(namespace)) else { + tracing::warn!( + shard = self.id, + namespace, + view, + replica, + frame, + "dropping VSR control frame: namespace matches neither metadata nor partition consensus" + ); + return None; + }; + debug_assert_eq!( + partition.consensus().namespace(), + namespace, + "keyed partition lookup must match the frame namespace" + ); + Some(partition) + } + + /// Handle an incoming VSR control frame. A metadata frame uses the metadata + /// consensus; a partition frame addresses exactly one partition, resolved by + /// [`Self::resolve_partition_target`]. #[allow(clippy::future_not_send)] async fn on_start_view_change(&self, msg: Message) where @@ -1516,29 +1537,19 @@ where return; } - let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); - for namespace in namespaces { - let Some(partition) = planes.1.0.get_by_ns(&namespace) else { - continue; - }; - let consensus = partition.consensus(); - if consensus.namespace() != header.namespace { - continue; - } - - let actions = consensus.handle_start_view_change(PlaneKind::Partitions, &header); - dispatch_vsr_actions::(consensus, None, &actions).await; - dispatch_partition_journal_actions(consensus, partition, &actions).await; + let Some(partition) = self.resolve_partition_target( + &planes.1.0, + header.namespace, + header.view, + header.replica, + "StartViewChange", + ) else { return; - } - - tracing::warn!( - shard = self.id, - namespace = header.namespace, - view = header.view, - replica = header.replica, - "dropping StartViewChange: namespace matches neither metadata nor partition consensus" - ); + }; + let consensus = partition.consensus(); + let actions = consensus.handle_start_view_change(PlaneKind::Partitions, &header); + dispatch_vsr_actions::(consensus, None, &actions).await; + dispatch_partition_journal_actions(consensus, partition, &actions).await; } #[allow(clippy::future_not_send)] @@ -1576,35 +1587,25 @@ where } let config = planes.1.0.config(); - let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); - for namespace in namespaces { - let Some(partition) = planes.1.0.get_mut_by_ns(&namespace) else { - continue; - }; - let consensus = partition.consensus(); - if consensus.namespace() != header.namespace { - continue; - } - - let actions = consensus.handle_do_view_change(PlaneKind::Partitions, &header); - dispatch_vsr_actions::(consensus, None, &actions).await; - dispatch_partition_journal_actions(consensus, partition, &actions).await; - if actions - .iter() - .any(|action| matches!(action, VsrAction::CommitJournal)) - { - partition.commit_journal(config).await; - } + let Some(partition) = self.resolve_partition_target( + &planes.1.0, + header.namespace, + header.view, + header.replica, + "DoViewChange", + ) else { return; + }; + let consensus = partition.consensus(); + let actions = consensus.handle_do_view_change(PlaneKind::Partitions, &header); + dispatch_vsr_actions::(consensus, None, &actions).await; + dispatch_partition_journal_actions(consensus, partition, &actions).await; + if actions + .iter() + .any(|action| matches!(action, VsrAction::CommitJournal)) + { + partition.commit_journal(config).await; } - - tracing::warn!( - shard = self.id, - namespace = header.namespace, - view = header.view, - replica = header.replica, - "dropping DoViewChange: namespace matches neither metadata nor partition consensus" - ); } #[allow(clippy::future_not_send)] @@ -1629,29 +1630,19 @@ where return; } - let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); - for namespace in namespaces { - let Some(partition) = planes.1.0.get_by_ns(&namespace) else { - continue; - }; - let consensus = partition.consensus(); - if consensus.namespace() != header.namespace { - continue; - } - - let actions = consensus.handle_start_view(PlaneKind::Partitions, &header); - dispatch_vsr_actions::(consensus, None, &actions).await; - dispatch_partition_journal_actions(consensus, partition, &actions).await; + let Some(partition) = self.resolve_partition_target( + &planes.1.0, + header.namespace, + header.view, + header.replica, + "StartView", + ) else { return; - } - - tracing::warn!( - shard = self.id, - namespace = header.namespace, - view = header.view, - replica = header.replica, - "dropping StartView: namespace matches neither metadata nor partition consensus" - ); + }; + let consensus = partition.consensus(); + let actions = consensus.handle_start_view(PlaneKind::Partitions, &header); + dispatch_vsr_actions::(consensus, None, &actions).await; + dispatch_partition_journal_actions(consensus, partition, &actions).await; } #[allow(clippy::future_not_send)] @@ -1684,29 +1675,19 @@ where } let config = planes.1.0.config(); - let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); - for namespace in namespaces { - let Some(partition) = planes.1.0.get_mut_by_ns(&namespace) else { - continue; - }; - let consensus = partition.consensus(); - if consensus.namespace() != header.namespace { - continue; - } - - if consensus.handle_commit(&header) { - partition.commit_journal(config).await; - } + let Some(partition) = self.resolve_partition_target( + &planes.1.0, + header.namespace, + header.view, + header.replica, + "Commit", + ) else { return; + }; + let consensus = partition.consensus(); + if consensus.handle_commit(&header) { + partition.commit_journal(config).await; } - - tracing::warn!( - shard = self.id, - namespace = header.namespace, - view = header.view, - replica = header.replica, - "dropping Commit: namespace matches neither metadata nor partition consensus" - ); } /// Tick partition consensuses. Loop partitions. No partitions-plane journal. @@ -1722,6 +1703,14 @@ where >, { let partitions = self.plane.partitions(); + // Fan out over every group (each partition's heartbeat/retransmit timer + // must advance), so the keyed single-namespace lookup the control-frame + // handlers use does not apply here. The namespaces are snapshotted into + // an owned Vec so no partitions-plane borrow is held across the tick + // `.await`. + // TODO(hubcio): reuse the pump's `namespace_scratch` (as + // `process_loopback` does) to drop this per-tick alloc; a quiet cluster + // still pays one Vec per heartbeat. let namespaces: Vec<_> = partitions.namespaces().copied().collect(); for namespace in namespaces { diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs index 8a41d3c5ee..658a549b5d 100644 --- a/core/shard/src/router.rs +++ b/core/shard/src/router.rs @@ -20,6 +20,7 @@ use crate::shards_table::{ ShardsTable, calculate_shard_assignment, calculate_shard_from_consensus_ns, }; use crate::{IggyShard, LifecycleFrame, Receiver, ShardFrame}; +use consensus::MetadataHandle; use crossfire::TrySendError; use futures::FutureExt; use iggy_binary_protocol::{ConsensusHeader, GenericHeader, Operation, PrepareHeader}; @@ -30,6 +31,11 @@ use metadata::stm::StateMachine; use server_common::sharding::{IggyNamespace, METADATA_CONSENSUS_NAMESPACE}; use server_common::{Message, MessageBag}; +/// How often the shard pump drives `VsrConsensus::tick`: heartbeats, prepare +/// retransmit, and view-change timeouts only advance when the tick runs +/// ("call this periodically, e.g. every 10ms"). +const CONSENSUS_TICK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10); + /// Decompose a [`MessageBag`] into the routing-relevant tuple /// `(operation, namespace, generic_message)`. /// @@ -259,9 +265,45 @@ where // first-drain reallocation. let mut loopback_buf = Vec::with_capacity(64); let mut namespace_scratch: Vec = Vec::with_capacity(64); + // Consensus timer driver, folded into the pump: running the tick as a + // select! arm (not a sibling task) serializes it with frame processing, + // so `tick_partitions` can no longer hold a partition reference across + // an `.await` while `apply_reconcile_ops` reallocates the partitions + // vec on this same task. The timer is created once and pinned, then + // re-armed only after it fires, so a busy inbox cannot drop-and-reset + // it (which would stall heartbeats / prepare retransmit). + // Single source for the timer, so the re-arm below cannot drift from the + // initial interval. + let rearm_tick = || compio::time::sleep(CONSENSUS_TICK_INTERVAL).fuse(); + let mut consensus_tick = std::pin::pin!(rearm_tick()); loop { futures::select! { _ = stop.recv().fuse() => break, + () = consensus_tick.as_mut() => { + // Sharing the pump task is what keeps `tick_partitions` + // borrow-safe, but it bounds the tick's worst-case delay to + // one frame body's longest `.await` (replication append + + // commit_journal fsync/rotate + reply). + // TODO(hubcio): if a load test shows tick starvation, + // make `tick_partitions` borrow-free so the tick can be + // decoupled from the pump again without reintroducing the + // partition-ref-across-`.await` UB this fold closed. + self.tick_metadata().await; + self.tick_partitions().await; + // While a cooperative revocation is pending, wake the + // reconciler each tick so the handoff completes within ~one + // tick of the partition draining, not the periodic pass. + if self + .plane + .metadata() + .mux_stm + .streams() + .has_pending_revocations() + { + self.dispatch_metadata_commit_tick(); + } + consensus_tick.set(rearm_tick()); + } frame = self.inbox.recv().fuse() => { match frame { Ok(frame) => { diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index 132e79bc79..d1d4115bda 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -31,7 +31,7 @@ use iggy_binary_protocol::{GenericHeader, ReplyHeader}; use iggy_common::IggyError; use network::Network; use packet::{PacketSimulatorOptions, ProcessId}; -use partitions::{Partition, PartitionOffsets, PollQueryResult, PollingArgs, PollingConsumer}; +use partitions::{Partition, PartitionOffsets, PollFragments, PollingArgs, PollingConsumer}; use replica::{Replica, new_replica}; use server_common::Message; use server_common::sharding::IggyNamespace; @@ -309,18 +309,23 @@ impl Simulator { replica_idx: usize, namespace: IggyNamespace, consumer: PollingConsumer, - args: PollingArgs, - ) -> Result, IggyError> { + args: &PollingArgs, + ) -> Result, IggyError> { let replica = &self.replicas[replica_idx]; - let partition = - replica - .plane - .partitions() - .get_by_ns(&namespace) - .ok_or(IggyError::ResourceNotFound(format!( - "partition not found for namespace {namespace:?} on replica {replica_idx}" - )))?; - futures::executor::block_on(partition.poll_messages(consumer, args)) + // Build the owned poll plan synchronously, then execute off the borrow. + // The sim's partitions are in-memory (no `partition_dir`), so the plan + // serves only the resident journal tier; `execute` performs no disk IO. + let Some(plan) = replica + .plane + .partitions() + .build_poll_snapshot(&namespace, consumer, args) + else { + return Err(IggyError::ResourceNotFound(format!( + "partition not found for namespace {namespace:?} on replica {replica_idx}" + ))); + }; + let (fragments, _commit_offset) = futures::executor::block_on(plan.execute()); + Ok(fragments) } /// Partition offsets from a replica. diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs index 4112c2aa42..3de3100b65 100644 --- a/core/simulator/src/main.rs +++ b/core/simulator/src/main.rs @@ -118,8 +118,8 @@ fn main() { // 4. Poll messages and check offsets on the leader let consumer = PollingConsumer::Consumer(1, 0); let args = PollingArgs::new(PollingStrategy::first(), 10, false); - match sim.poll_messages(leader as usize, test_namespace, consumer, args) { - Ok((fragments, _last_matching_offset)) => { + match sim.poll_messages(leader as usize, test_namespace, consumer, &args) { + Ok(fragments) => { println!( "[sim] Poll returned {} fragments (expected 4)", fragments.len()