diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index 7d38f3bac8..e171095ea8 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -55,6 +55,13 @@ pub enum Operation { /// consensus by the partition reconciler; no client wire code. CompleteConsumerGroupRevocation = 67, + /// Server-originated: set a partition's delete watermark (the offset below + /// which sealed segments are removed). The dispatch handler resolves a + /// client `DeleteSegments` count to a concrete offset on the owning shard + /// and submits this through metadata consensus, so every replica applies + /// the same offset deterministically. No client wire code. + TruncatePartition = 68, + // Metadata operations (shard 0) CreateStream = 128, UpdateStream = 129, @@ -66,9 +73,10 @@ pub enum Operation { PurgeTopic = 135, CreatePartitions = 136, DeletePartitions = 137, - // TODO: DeleteSegments is a partition operation (is_partition() == true) but its - // discriminant sits in the metadata range (128-147). Should be moved to 163 once - // iggy_common's Operation enum is removed and wire compat is no longer a concern. + // Client op handled specially: the dispatch layer resolves the requested + // segment count to an offset on the owning shard and replicates a + // `TruncatePartition` through metadata, so `DeleteSegments` itself is + // neither a metadata nor a partition consensus op (see `is_partition`). DeleteSegments = 138, CreateConsumerGroup = 139, DeleteConsumerGroup = 140, @@ -86,7 +94,6 @@ pub enum Operation { SendMessages = 160, StoreConsumerOffset = 161, DeleteConsumerOffset = 162, - // 163 is reserved for the planned DeleteSegments move (see TODO above). StoreConsumerOffset2 = 164, DeleteConsumerOffset2 = 165, } @@ -152,7 +159,7 @@ impl Operation { #[must_use] #[inline] pub const fn is_partition(&self) -> bool { - matches!(self, Self::DeleteSegments) || (*self as u8) >= Self::PARTITION_START + (*self as u8) >= Self::PARTITION_START } /// Operations clients are allowed to send directly. @@ -175,7 +182,8 @@ impl Operation { | Self::CreateTopicWithAssignments | Self::CreatePartitionsWithAssignments | Self::RemoveConsumerGroupMember - | Self::CompleteConsumerGroupRevocation => None, + | Self::CompleteConsumerGroupRevocation + | Self::TruncatePartition => None, Self::CreateStream | Self::UpdateStream | Self::DeleteStream @@ -315,7 +323,11 @@ mod tests { assert!(Operation::CreateStream.is_client_allowed()); assert!(Operation::SendMessages.is_partition()); assert!(!Operation::SendMessages.is_metadata()); - assert!(Operation::DeleteSegments.is_partition()); + assert!(!Operation::DeleteSegments.is_partition()); + assert!(!Operation::DeleteSegments.is_metadata()); + assert!(Operation::TruncatePartition.is_internal()); + assert!(Operation::TruncatePartition.is_metadata()); + assert!(!Operation::TruncatePartition.is_client_allowed()); assert!(Operation::DeleteConsumerOffset.is_partition()); assert!(Operation::StoreConsumerOffset2.is_partition()); assert!(Operation::DeleteConsumerOffset2.is_partition()); diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index d455d0deab..92f0203030 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -300,6 +300,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> | Operation::CreatePartitionsWithAssignments | Operation::RemoveConsumerGroupMember | Operation::CompleteConsumerGroupRevocation + | Operation::TruncatePartition | Operation::Reserved | Operation::Register | Operation::Logout diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index 662f4b1288..91d85bb207 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -622,6 +622,7 @@ pub const fn operation_as_str(operation: Operation) -> &'static str { Operation::CreatePartitionsWithAssignments => "create_partitions_with_assignments", Operation::RemoveConsumerGroupMember => "remove_consumer_group_member", Operation::CompleteConsumerGroupRevocation => "complete_consumer_group_revocation", + Operation::TruncatePartition => "truncate_partition", Operation::CreateStream => "create_stream", Operation::UpdateStream => "update_stream", Operation::DeleteStream => "delete_stream", diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index d83bcf7de9..53e8c70a40 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -29,14 +29,14 @@ mod cg_vsr; #[cfg(not(feature = "vsr"))] mod concurrent_addition; mod general; -// Asserts the periodic messages cleaner deletes expired segments from disk; -// server-ng has no data-maintenance cleaner yet. -#[cfg(not(feature = "vsr"))] +// The per-shard segment cleaner deletes expired / oversize segments from disk +// under both the legacy server and server-ng. mod message_cleanup; mod message_retrieval; -// Mixes server restarts, consumer-group barriers, and DeleteSegments -// maintenance; out of vsr scope for now. -#[cfg(not(feature = "vsr"))] +// Server restarts, consumer-group barriers, and DeleteSegments maintenance. +// `should_delete_segments_without_consumers` is framing-agnostic + async-aware +// and runs under server-ng; the consumer-group / restart variants stay +// legacy-shaped (cg polling has its own vsr gaps) and are filtered out there. mod purge_delete; mod scenarios; mod specific; diff --git a/core/integration/tests/server/purge_delete.rs b/core/integration/tests/server/purge_delete.rs index 9460322ff5..9888fb97cb 100644 --- a/core/integration/tests/server/purge_delete.rs +++ b/core/integration/tests/server/purge_delete.rs @@ -19,6 +19,11 @@ use crate::server::scenarios::purge_delete_scenario; use integration::iggy_harness; use test_case::test_matrix; +// Legacy-only: asserts the exact on-disk layout ([0, 7, 14, 21], 7 messages per +// 5KiB segment) and per-file byte sizes throughout. server-ng's per-message +// framing yields a different layout, so the scenario's offset anchors do not +// hold under vsr. Port to a framing-agnostic form to re-enable. +#[cfg(not(feature = "vsr"))] #[iggy_harness(server( segment.size = "5KiB", segment.cache_indexes = ["all", "none", "open_segment"], @@ -39,11 +44,23 @@ async fn should_delete_segments_and_validate_filesystem( partition.messages_required_to_save = "1", partition.enforce_fsync = "true", ))] -#[test_matrix([restart_off(), restart_on()])] +// server-ng restarts the cluster on restart_on, which currently fails in +// bootstrap independently of segment deletion: an `Index data must be exactly +// 16 bytes` recovery panic (cache_indexes all/none) and a replica-port rebind +// race. Run only the no-restart matrix under vsr until those are fixed. +#[cfg_attr(not(feature = "vsr"), test_matrix([restart_off(), restart_on()]))] +#[cfg_attr(feature = "vsr", test_matrix([restart_off()]))] async fn should_delete_segments_without_consumers(harness: &mut TestHarness, restart_server: bool) { purge_delete_scenario::run_no_consumers(harness, restart_server).await; } +// vsr-gated: asserts a segment is freed *exactly* when the polled offset +// reaches its end — a synchronous-commit assumption. Under vsr the +// consumer-group auto-commit lags the poll, so the per-message timing does not +// hold (the barrier itself is correct: the server retains un-consumed +// segments, verified by `min_committed_offset` + its runtime diag). Legacy-only +// until redesigned to sync on the committed offset for async commit. +#[cfg(not(feature = "vsr"))] #[iggy_harness(server( segment.size = "5KiB", segment.cache_indexes = ["all", "none", "open_segment"], @@ -57,6 +74,9 @@ async fn should_delete_segments_with_consumer_group_barrier(harness: &TestHarnes purge_delete_scenario::run_consumer_group_barrier(&client, &data_path).await; } +// Legacy-only: pins the [0, 7, 14, 21] layout across a multi-consumer barrier. +// server-ng framing differs; port to framing-agnostic to re-enable. +#[cfg(not(feature = "vsr"))] #[iggy_harness(server( segment.size = "5KiB", segment.cache_indexes = ["all", "none", "open_segment"], @@ -77,7 +97,13 @@ async fn should_block_deletion_until_all_consumers_pass_segment( partition.messages_required_to_save = "1", partition.enforce_fsync = "true", ))] -#[test_matrix([restart_off(), restart_on()])] +// restart_on is vsr-gated: cluster restart hits a pre-existing server-ng +// bootstrap failure (legacy 16-byte index reader vs server-ng 24-byte writer), +// unrelated to purge. The scenario asserts the exact [0, 7, 14, 21] layout only +// on the legacy path; under vsr it verifies the framing-agnostic purge outcome +// (offsets cleared, files deleted, partition reset to a single segment at 0). +#[cfg_attr(not(feature = "vsr"), test_matrix([restart_off(), restart_on()]))] +#[cfg_attr(feature = "vsr", test_matrix([restart_off()]))] async fn should_purge_topic_and_clear_consumer_offsets( harness: &mut TestHarness, restart_server: bool, @@ -89,6 +115,7 @@ fn restart_off() -> bool { false } +#[cfg(not(feature = "vsr"))] fn restart_on() -> bool { true } diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index ed2040e536..25a60d69f0 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -47,14 +47,12 @@ pub mod invalid_consumer_offset_scenario; // file), so volume-based rotation rules cannot trigger. #[cfg(not(feature = "vsr"))] pub mod log_rotation_scenario; -#[cfg(not(feature = "vsr"))] pub mod message_cleanup_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod offset_scenario; #[cfg(not(feature = "vsr"))] pub mod permissions_scenario; -#[cfg(not(feature = "vsr"))] pub mod purge_delete_scenario; pub mod read_during_persistence_scenario; #[cfg(not(feature = "vsr"))] diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs b/core/integration/tests/server/scenarios/purge_delete_scenario.rs index 211f3c5fc2..d1a8c6127f 100644 --- a/core/integration/tests/server/scenarios/purge_delete_scenario.rs +++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs @@ -38,18 +38,23 @@ const INDEX_EXTENSION: &str = "index"; /// gets persisted into the same segment, then rotation fires. /// Result: 7 messages per sealed segment (7000B on disk). const PAYLOAD_SIZE: usize = 936; +#[cfg(not(feature = "vsr"))] const MESSAGE_ON_DISK_SIZE: u64 = IGGY_MESSAGE_HEADER_SIZE as u64 + PAYLOAD_SIZE as u64; +#[cfg(not(feature = "vsr"))] const INDEX_SIZE_PER_MSG: u64 = INDEX_SIZE as u64; const TOTAL_MESSAGES: u32 = 25; /// 3 sealed segments (7 msgs each) + 1 active (4 msgs at offsets 21-24). +#[cfg(not(feature = "vsr"))] const EXPECTED_SEGMENT_OFFSETS: [u64; 4] = [0, 7, 14, 21]; +#[cfg(not(feature = "vsr"))] const MSGS_PER_SEALED_SEGMENT: u64 = 7; /// Single consumer barrier: oldest-first deletion, barrier advancement, and edge cases. /// /// Covers: barrier blocks deletion, advancing barrier releases segments, delete(0) no-op, /// delete(u32::MAX) bulk, consumer not stuck after deletion, error cases for invalid IDs. +#[cfg(not(feature = "vsr"))] pub async fn run(harness: &mut TestHarness, restart_server: bool) { let client = build_root_client(harness); client.connect().await.unwrap(); @@ -349,19 +354,29 @@ pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { let partition_path = partition_path(&data_path, stream_id, topic_id); - assert_eq!( - get_sorted_segment_offsets(&partition_path), - EXPECTED_SEGMENT_OFFSETS, - "Segment layout must match calculated offsets" + // server-ng's per-message on-disk framing differs from legacy, so the + // segment boundaries are not hardcodable. Capture the real layout once; + // legacy still verifies it matches the calculated offsets + file sizes. + let layout = get_sorted_segment_offsets(&partition_path); + #[cfg(not(feature = "vsr"))] + { + assert_eq!( + layout, EXPECTED_SEGMENT_OFFSETS, + "Segment layout must match calculated offsets" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS); + } + assert!( + layout.len() >= 2, + "expected at least one sealed segment plus the active one, got {layout:?}" ); - assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS); assert_eq!( poll_all_offsets(&client, &stream_ident, &topic_ident).await, (0..TOTAL_MESSAGES as u64).collect::>() ); - // Delete 3 sealed segments one by one - let sealed_count = EXPECTED_SEGMENT_OFFSETS.len() - 1; + // Delete the sealed segments one by one + let sealed_count = layout.len() - 1; for i in 0..sealed_count { client .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) @@ -369,14 +384,12 @@ pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { .unwrap(); maybe_restart(harness, restart_server).await; - let first_surviving = EXPECTED_SEGMENT_OFFSETS[i + 1]; - assert_eq!( - get_sorted_segment_offsets(&partition_path), - EXPECTED_SEGMENT_OFFSETS[i + 1..], - "After deleting {n} sealed segment(s)", - n = i + 1 - ); - assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[i + 1..]); + let first_surviving = layout[i + 1]; + // server-ng deletes asynchronously (metadata commit -> reconciler); + // legacy deletes synchronously. Converge before asserting. + await_segment_layout(&partition_path, &layout[i + 1..]).await; + #[cfg(not(feature = "vsr"))] + assert_segment_file_sizes(&partition_path, &layout[i + 1..]); assert_eq!( poll_all_offsets(&client, &stream_ident, &topic_ident).await, (first_surviving..TOTAL_MESSAGES as u64).collect::>(), @@ -384,12 +397,9 @@ pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { ); } - // Only active segment remains — delete is a no-op - assert_eq!( - get_sorted_segment_offsets(&partition_path), - [EXPECTED_SEGMENT_OFFSETS[3]], - "Only active segment at offset 21" - ); + // Only the active segment remains — delete is a no-op + let active = *layout.last().expect("layout is non-empty"); + await_segment_layout(&partition_path, std::slice::from_ref(&active)).await; assert_no_orphaned_segment_files(&partition_path, 1); client @@ -397,15 +407,12 @@ pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { .await .unwrap(); - assert_eq!( - get_sorted_segment_offsets(&partition_path), - [EXPECTED_SEGMENT_OFFSETS[3]], - "No-op: active segment protected" - ); - assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[3..]); + await_segment_layout(&partition_path, std::slice::from_ref(&active)).await; + #[cfg(not(feature = "vsr"))] + assert_segment_file_sizes(&partition_path, std::slice::from_ref(&active)); assert_eq!( poll_all_offsets(&client, &stream_ident, &topic_ident).await, - (EXPECTED_SEGMENT_OFFSETS[3]..TOTAL_MESSAGES as u64).collect::>(), + (active..TOTAL_MESSAGES as u64).collect::>(), "Active segment messages still pollable" ); @@ -424,6 +431,7 @@ pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { /// reaches its end_offset — not one message earlier, not one later. /// /// No restart_server variant — 25 delete_segments calls would mean 25 restarts. +#[cfg(not(feature = "vsr"))] pub async fn run_consumer_group_barrier(client: &IggyClient, data_path: &Path) { let stream = client.create_stream(STREAM_NAME).await.unwrap(); let stream_id = stream.id; @@ -528,6 +536,7 @@ pub async fn run_consumer_group_barrier(client: &IggyClient, data_path: &Path) { /// The barrier is `min(fast, slow)`, so deletion is entirely gated by the slow consumer. /// Advances the slow consumer through each segment boundary, verifying that segments are /// released only when `segment_deletable(seg_end, barrier)` becomes true. +#[cfg(not(feature = "vsr"))] pub async fn run_multi_consumer_barrier(harness: &mut TestHarness, restart_server: bool) { let client = build_root_client(harness); client.connect().await.unwrap(); @@ -755,6 +764,10 @@ pub async fn run_purge_topic(harness: &mut TestHarness, restart_server: bool) { let partition_path = partition_path(&data_path, stream_id, topic_id); + // Exact layout is legacy-framing-specific; the purge outcome asserted below + // (offsets cleared, files deleted, partition reset to a single empty segment + // at offset 0, new messages from offset 0) is framing-agnostic. + #[cfg(not(feature = "vsr"))] assert_eq!( get_sorted_segment_offsets(&partition_path), EXPECTED_SEGMENT_OFFSETS @@ -861,6 +874,13 @@ pub async fn run_purge_topic(harness: &mut TestHarness, restart_server: bool) { .unwrap(); maybe_restart(harness, restart_server).await; + // server-ng purges asynchronously (metadata commit -> reconciler -> pump); + // legacy purges synchronously. The pump's purge resets the partition to a + // single segment at offset 0 and clears consumer offsets + files in the + // same frame, so converging on the [0] layout means the whole purge landed. + #[cfg(feature = "vsr")] + await_segment_layout(&partition_path, &[0]).await; + // --- Verify consumer offsets cleared --- let consumer_offset = client .get_consumer_offset(&consumer, &stream_ident, &topic_ident, Some(PARTITION_ID)) @@ -943,6 +963,35 @@ pub async fn run_purge_topic(harness: &mut TestHarness, restart_server: bool) { client.delete_stream(&stream_ident).await.unwrap(); } +/// Wait for the partition's on-disk segment layout to converge to `expected`. +/// +/// server-ng's `DeleteSegments` is eventually-consistent: the client call +/// returns after the metadata `TruncatePartition` commit, and the partition +/// reconciler performs the on-disk deletion on its next pass. Legacy deletes +/// synchronously, so it asserts immediately. +#[cfg(feature = "vsr")] +async fn await_segment_layout(partition_path: &str, expected: &[u64]) { + for _ in 0..200 { + if get_sorted_segment_offsets(partition_path).as_slice() == expected { + return; + } + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + } + assert_eq!( + get_sorted_segment_offsets(partition_path).as_slice(), + expected, + "segment layout did not converge within timeout" + ); +} + +#[cfg(not(feature = "vsr"))] +async fn await_segment_layout(partition_path: &str, expected: &[u64]) { + assert_eq!( + get_sorted_segment_offsets(partition_path).as_slice(), + expected + ); +} + async fn maybe_restart(harness: &mut TestHarness, restart_server: bool) { if restart_server { harness.restart_server().await.unwrap(); @@ -1034,6 +1083,7 @@ async fn poll_all_offsets( /// Asserts that each segment's `.log` and `.index` files have the exact expected size. /// Derives message count per segment from adjacent offsets and TOTAL_MESSAGES. +#[cfg(not(feature = "vsr"))] fn assert_segment_file_sizes(partition_path: &str, offsets: &[u64]) { for (i, &offset) in offsets.iter().enumerate() { let msg_count = if i + 1 < offsets.len() { @@ -1153,6 +1203,7 @@ fn count_files_with_ext(dir: &str, ext: &str) -> usize { /// /// `seg_end_offset` is the last offset stored in the segment (inclusive). /// `committed` is the minimum committed offset across all consumers/groups. +#[cfg(not(feature = "vsr"))] fn segment_deletable(seg_end_offset: u64, committed: u64) -> bool { seg_end_offset <= committed } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index e0a7fa3c57..73511c7015 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -18,7 +18,7 @@ use crate::MuxStateMachine; use crate::stm::consumer_group::CompleteConsumerGroupRevocationRequest; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; -use crate::stm::stream::Streams; +use crate::stm::stream::{Streams, TruncatePartitionRequest}; use crate::stm::user::{DeletePersonalAccessTokenRequest, Users}; use crate::stm::{ConsensusGroupAllocator, StateMachine}; use consensus::{ @@ -1526,7 +1526,15 @@ where ) -> Result, iggy_common::IggyError> { let consensus = self.consensus.as_ref().unwrap(); let header = *message.header(); - if !header.operation.is_client_allowed() { + // `TruncatePartition` is server-originated (the owning shard resolves a + // client `DeleteSegments` count to a concrete offset) but replicated AS + // the client's own request, so the commit records the client's request + // sequence in the `ClientTable`. It is internal -- no wire command code + // maps to it, so a client cannot construct one directly -- hence the + // `is_client_allowed` gate excludes it; admit it explicitly. The default + // match arm below projects it through unchanged. + if !header.operation.is_client_allowed() && header.operation != Operation::TruncatePartition + { return Err(IggyError::InvalidCommand); } let body = &message.as_slice()[size_of::()..header.size as usize]; @@ -1880,6 +1888,65 @@ where msg } +/// Build a `TruncatePartition` request attributed to the originating client. +/// +/// Replicated through the standard client-request path so the commit records +/// `(client, session, request)` in the `ClientTable`. The client numbers +/// `DeleteSegments` in the same monotonic request sequence as every other +/// metadata op, so attributing the truncate to an internal id (or skipping the +/// commit) leaves a hole that fails the next op's `request == committed + 1` +/// preflight. +/// +/// `template` is the client's own `DeleteSegments` header: it supplies the wire +/// `cluster` / `view` / `release` and the client's `request` number. +/// `client_id` / `session` are the bound VSR identity. +/// +/// # Panics +/// If the total request size exceeds `u32::MAX`; a `TruncatePartition` body is +/// a few fixed-width fields, so this cannot happen in practice. +#[must_use] +pub fn build_truncate_partition_client_message( + template: &RequestHeader, + client_id: u128, + session: u64, + stream_id: u32, + topic_id: u32, + partition_id: u32, + up_to_offset: u64, +) -> Message { + let body = TruncatePartitionRequest { + stream_id: WireIdentifier::numeric(stream_id), + topic_id: WireIdentifier::numeric(topic_id), + partition_id, + up_to_offset, + } + .to_bytes(); + let header_size = size_of::(); + let total = header_size + body.len(); + let mut msg = Message::::new(total); + { + let slice = msg.as_mut_slice(); + slice[header_size..total].copy_from_slice(&body); + let header = + bytemuck::checked::try_from_bytes_mut::(&mut slice[..header_size]) + .expect("zeroed bytes are a valid RequestHeader"); + *header = RequestHeader { + command: Command2::Request, + operation: Operation::TruncatePartition, + size: u32::try_from(total).expect("request size fits u32"), + cluster: template.cluster, + view: template.view, + release: template.release, + client: client_id, + session, + request: template.request, + namespace: server_common::sharding::METADATA_CONSENSUS_NAMESPACE, + ..RequestHeader::default() + }; + } + msg +} + fn build_prepare_message( consensus: &VsrConsensus, request: &RequestHeader, diff --git a/core/metadata/src/stm/snapshot.rs b/core/metadata/src/stm/snapshot.rs index 5f704ae46e..0186bc9c25 100644 --- a/core/metadata/src/stm/snapshot.rs +++ b/core/metadata/src/stm/snapshot.rs @@ -346,6 +346,8 @@ mod tests { consensus_group_id: 33, created_at: ts, created_revision: 0, + deleted_up_to_offset: 0, + purge_generation: 0, }], consumer_groups: Vec::new(), next_consumer_group_id: 1, diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 02deabfe16..765e643df0 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -28,8 +28,8 @@ use crate::stm::result::{ use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; use ahash::AHashMap; -use bytes::{Bytes, BytesMut}; -use iggy_binary_protocol::codec::WireEncode; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::codec::{WireDecode, WireEncode}; use iggy_binary_protocol::requests::consumer_groups::{ CreateConsumerGroupRequest, DeleteConsumerGroupRequest, }; @@ -70,6 +70,12 @@ pub struct PartitionSnapshot { /// with revision 0 instead of failing to decode. #[serde(default)] pub created_revision: u64, + /// `#[serde(default)]` so pre-watermark snapshots restore at 0. + #[serde(default)] + pub deleted_up_to_offset: u64, + /// `#[serde(default)]` so pre-purge snapshots restore at 0. + #[serde(default)] + pub purge_generation: u64, } #[derive(Debug, Clone)] @@ -82,6 +88,17 @@ pub struct Partition { /// reused the slab key, so the local partition is stale and must be torn /// down before rebuild. pub created_revision: u64, + /// Replicated delete watermark: the reconciler on every replica removes + /// sealed segments with `end_offset` below this. Advanced monotonically by + /// `TruncatePartition` (the resolved form of a client `DeleteSegments`). + /// `0` means nothing has been trimmed. + pub deleted_up_to_offset: u64, + /// Replicated purge counter: `PurgeTopic` increments it for every partition + /// in the topic. The reconciler on every replica resets a partition to a + /// single empty segment at offset 0 (clearing consumer offsets) when this + /// exceeds the generation it last applied locally. Monotonic so a redundant + /// reconcile pass does not re-wipe a partition already at this generation. + pub purge_generation: u64, } impl Partition { @@ -97,6 +114,8 @@ impl Partition { consensus_group_id, created_at, created_revision, + deleted_up_to_offset: 0, + purge_generation: 0, } } } @@ -110,12 +129,6 @@ pub struct StatsSnapshot { } /// Topic snapshot representation for serialization. -/// -/// Encoded by `rmp_serde::to_vec` as a **positional array**, so `serde(default)` -/// only fills **trailing** elements absent from an older snapshot. The two -/// consumer-group fields below are therefore deliberately last: a topic -/// snapshot written before co-located consumer groups has a shorter array, and -/// the defaults fill the missing tail. Any future field must also be appended. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TopicSnapshot { pub id: usize, @@ -131,9 +144,6 @@ pub struct TopicSnapshot { // load-balancing hint advanced on the `Balanced`-send read path (outside // the replicated apply), so each replica's value drifts independently; // persisting it would make the snapshot diverge per replica. Restored to 0. - // - // The two consumer-group fields are trailing so the `serde(default)` above - // actually works for snapshots predating co-located consumer groups. #[serde(default)] pub consumer_groups: Vec<(u64, ConsumerGroupSnapshot)>, #[serde(default)] @@ -338,6 +348,109 @@ define_state! { } } +/// Server-originated request that advances a partition's delete watermark. +/// +/// `up_to_offset` is resolved on the owning shard from a client +/// `DeleteSegments` count, then replicated through metadata so every replica +/// applies the same monotonic watermark (see [`Partition::deleted_up_to_offset`]). +#[derive(Debug, Clone)] +pub struct TruncatePartitionRequest { + pub stream_id: WireIdentifier, + pub topic_id: WireIdentifier, + pub partition_id: u32, + pub up_to_offset: u64, +} + +impl WireEncode for TruncatePartitionRequest { + fn encoded_size(&self) -> usize { + self.stream_id.encoded_size() + self.topic_id.encoded_size() + 4 + 8 + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream_id.encode(buf); + self.topic_id.encode(buf); + buf.put_u32_le(self.partition_id); + buf.put_u64_le(self.up_to_offset); + } +} + +impl WireDecode for TruncatePartitionRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), iggy_binary_protocol::WireError> { + let (stream_id, mut pos) = WireIdentifier::decode(buf)?; + let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let partition_slice = buf.get(pos..pos + 4).ok_or_else(|| { + iggy_binary_protocol::WireError::UnexpectedEof { + offset: pos, + need: 4, + have: buf.len().saturating_sub(pos), + } + })?; + let partition_id = u32::from_le_bytes(partition_slice.try_into().expect("4 bytes")); + pos += 4; + let offset_slice = buf.get(pos..pos + 8).ok_or_else(|| { + iggy_binary_protocol::WireError::UnexpectedEof { + offset: pos, + need: 8, + have: buf.len().saturating_sub(pos), + } + })?; + let up_to_offset = u64::from_le_bytes(offset_slice.try_into().expect("8 bytes")); + pos += 8; + Ok(( + Self { + stream_id, + topic_id, + partition_id, + up_to_offset, + }, + pos, + )) + } +} + +impl StateHandler for TruncatePartitionRequest { + type State = StreamsInner; + fn apply(&self, state: &mut StreamsInner, _timestamp: IggyTimestamp) -> ApplyReply { + // Internal op (no client result enum): a missing parent / partition is + // an idempotent no-op, like the other reconciler-fed internal ops. + { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return ApplyReply::ok(Bytes::new()); + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return ApplyReply::ok(Bytes::new()); + }; + let Some(stream) = state.items.get_mut(stream_id) else { + return ApplyReply::ok(Bytes::new()); + }; + let Some(topic) = stream.topics.get_mut(topic_id) else { + return ApplyReply::ok(Bytes::new()); + }; + let Some(partition) = topic + .partitions + .iter_mut() + .find(|partition| partition.id == self.partition_id as usize) + else { + return ApplyReply::ok(Bytes::new()); + }; + // Monotonic: a stale or duplicate replay never rewinds the watermark. + if self.up_to_offset > partition.deleted_up_to_offset { + partition.deleted_up_to_offset = self.up_to_offset; + } + } + // Bump on every applied truncate (partition resolved), even when the + // watermark did not advance. A client `DeleteSegments` re-resolving to + // an already-committed offset must still re-drive the reconciler so + // segments the consumer barrier has since released are removed -- legacy + // `delete_segments` re-evaluates the barrier on every call. The + // watermark itself stays monotonic (set above); only the + // reconcile-trigger fires unconditionally. + state.revision = state.revision.wrapping_add(1); + ApplyReply::ok(Bytes::new()) + } +} + collect_handlers! { Streams { CreateStream, @@ -360,6 +473,7 @@ collect_handlers! { LeaveConsumerGroup, RemoveConsumerGroupMember, CompleteConsumerGroupRevocation, + TruncatePartition, } } @@ -436,6 +550,68 @@ impl Streams { self.inner.read(f) } + /// Committed delete watermark for a partition (the offset below which + /// sealed segments are removed), or `0` if the partition is unknown or + /// never trimmed. The per-shard reconciler reads this to enforce a + /// committed `TruncatePartition` against its local segments. + #[must_use] + pub fn partition_delete_watermark( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> u64 { + self.inner.read(|inner| { + inner + .items + .get(stream_id) + .and_then(|stream| stream.topics.get(topic_id)) + .and_then(|topic| topic.partitions.iter().find(|p| p.id == partition_id)) + .map_or(0, |partition| partition.deleted_up_to_offset) + }) + } + + /// Committed purge generation for a partition. The reconciler resets the + /// local partition (single empty segment at offset 0, cleared consumer + /// offsets) whenever this exceeds the generation it last applied. `0` means + /// never purged. Mirrors [`Self::partition_delete_watermark`]. + #[must_use] + pub fn partition_purge_generation( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> u64 { + self.inner.read(|inner| { + inner + .items + .get(stream_id) + .and_then(|stream| stream.topics.get(topic_id)) + .and_then(|topic| topic.partitions.iter().find(|p| p.id == partition_id)) + .map_or(0, |partition| partition.purge_generation) + }) + } + + /// Retention policy for a topic: `(message_expiry, max_topic_size, + /// partition_count)`, or `None` if the stream or topic is unknown. The + /// per-shard segment cleaner reads this off-pump to decide local segment + /// deletion; `partition_count` lets it derive a per-partition size budget. + #[must_use] + pub fn topic_retention_config( + &self, + stream_id: usize, + topic_id: usize, + ) -> Option<(IggyExpiry, MaxTopicSize, usize)> { + self.inner.read(|inner| { + let topic = inner.items.get(stream_id)?.topics.get(topic_id)?; + Some(( + topic.message_expiry, + topic.max_topic_size, + topic.partitions.len(), + )) + }) + } + /// Build the `ConsumerGroupDetailsResponse` for a group (members + their /// round-robin partition assignment). `partitions_count` is the topic's /// total partition count. `None` if the stream/topic/group is unknown. @@ -1030,6 +1206,8 @@ impl StateHandler for CreateTopicWithAssignmentsRequest { consensus_group_id: partition.consensus_group_id, created_at: timestamp, created_revision: new_revision, + deleted_up_to_offset: 0, + purge_generation: 0, }; topic.partitions.push(partition); } @@ -1171,14 +1349,32 @@ impl StateHandler for DeleteTopicRequest { impl StateHandler for PurgeTopicRequest { type State = StreamsInner; fn apply(&self, state: &mut StreamsInner, _timestamp: IggyTimestamp) -> ApplyReply { - // See `PurgeStreamRequest`: the data drop is partition-plane work, not - // yet wired off this committed op; the metadata commit only resolves the - // parents and acks. - let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return ApplyReply::err(PurgeTopicResult::StreamNotFound); + // Purge keeps the topic, its partitions, and consumer-group membership; + // it wipes message data and consumer offsets per partition. The on-disk + // reset happens on every replica's reconciler -- here we only advance + // each partition's monotonic purge generation, which the reconciler + // observes (committed generation > locally applied) and turns into a + // single empty segment at offset 0 plus cleared offsets. + let advanced = { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return ApplyReply::err(PurgeTopicResult::StreamNotFound); + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return ApplyReply::err(PurgeTopicResult::TopicNotFound); + }; + let Some(stream) = state.items.get_mut(stream_id) else { + return ApplyReply::err(PurgeTopicResult::StreamNotFound); + }; + let Some(topic) = stream.topics.get_mut(topic_id) else { + return ApplyReply::err(PurgeTopicResult::TopicNotFound); + }; + for partition in &mut topic.partitions { + partition.purge_generation = partition.purge_generation.wrapping_add(1); + } + !topic.partitions.is_empty() }; - if state.resolve_topic_id(stream_id, &self.topic_id).is_none() { - return ApplyReply::err(PurgeTopicResult::TopicNotFound); + if advanced { + state.revision = state.revision.wrapping_add(1); } ApplyReply::ok(Bytes::new()) } @@ -1246,6 +1442,8 @@ impl StateHandler for CreatePartitionsWithAssignmentsRequest { consensus_group_id: partition.consensus_group_id, created_at: timestamp, created_revision: new_revision, + deleted_up_to_offset: 0, + purge_generation: 0, }); } // Added partitions are unassigned until the groups rebalance. @@ -1339,6 +1537,8 @@ impl Snapshotable for Streams { consensus_group_id: p.consensus_group_id, created_at: p.created_at, created_revision: p.created_revision, + deleted_up_to_offset: p.deleted_up_to_offset, + purge_generation: p.purge_generation, }) .collect(), consumer_groups: topic @@ -1418,6 +1618,8 @@ impl Snapshotable for Streams { consensus_group_id: p.consensus_group_id, created_at: p.created_at, created_revision: p.created_revision, + deleted_up_to_offset: p.deleted_up_to_offset, + purge_generation: p.purge_generation, }) .collect(), // Not snapshotted (see `TopicSnapshot`): start fresh. @@ -1495,6 +1697,23 @@ mod tests { }; use iggy_binary_protocol::responses::topics::get_topic::GetTopicResponse; + #[test] + fn truncate_partition_request_round_trips() { + let request = TruncatePartitionRequest { + stream_id: WireIdentifier::numeric(7), + topic_id: WireIdentifier::numeric(3), + partition_id: 5, + up_to_offset: 1234, + }; + let bytes = request.to_bytes(); + let (decoded, consumed) = TruncatePartitionRequest::decode(&bytes).expect("decode"); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded.stream_id, request.stream_id); + assert_eq!(decoded.topic_id, request.topic_id); + assert_eq!(decoded.partition_id, request.partition_id); + assert_eq!(decoded.up_to_offset, request.up_to_offset); + } + fn create_stream(inner: &mut StreamsInner, name: &str) { let request = CreateStreamRequest { name: WireName::new(name).unwrap(), diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 3250f83110..15227e831e 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -47,7 +47,7 @@ use iggy_binary_protocol::{AckLevel, Operation, PrepareHeader, WireDecode, WireI use iggy_binary_protocol::{PrepareOkHeader, RequestHeader}; use iggy_common::{ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, - IggyByteSize, IggyError, IggyTimestamp, PartitionStats, PollingKind, + IggyByteSize, IggyError, IggyExpiry, IggyTimestamp, PartitionStats, PollingKind, }; use journal::Journal as _; use message_bus::{IggyMessageBus, MessageBus}; @@ -108,6 +108,11 @@ where consumer_offset_enforce_fsync: bool, pending_consumer_offset_commits: HashMap, observed_view: u32, + /// Highest `PurgeTopic` generation this replica has locally applied (reset + /// the partition to empty). The reconciler compares the committed metadata + /// generation against this and resets only when it advances, so a redundant + /// reconcile pass never re-wipes a partition already at this generation. + applied_purge_generation: u64, } /// Post-preflight dispatch in `on_request`: replicate via VSR or take the @@ -196,9 +201,15 @@ where consumer_offset_enforce_fsync: false, pending_consumer_offset_commits: HashMap::new(), observed_view, + applied_purge_generation: 0, } } + #[must_use] + pub const fn applied_purge_generation(&self) -> u64 { + self.applied_purge_generation + } + #[must_use] pub const fn consensus(&self) -> &VsrConsensus { &self.consensus @@ -2344,6 +2355,348 @@ where Ok(()) } + /// Minimum committed offset across all consumers and consumer groups, with + /// the holder's identity. `None` when nothing has been committed, in which + /// case there is no deletion barrier. + fn min_committed_offset(&self) -> Option<(u64, ConsumerKind, u32)> { + let consumer_guard = self.consumer_offsets.pin(); + let group_guard = self.consumer_group_offsets.pin(); + let consumers = consumer_guard.iter().map(|(_, offset)| { + ( + offset.offset.load(Ordering::Relaxed), + offset.kind, + offset.consumer_id, + ) + }); + let groups = group_guard.iter().map(|(_, offset)| { + ( + offset.offset.load(Ordering::Relaxed), + offset.kind, + offset.consumer_id, + ) + }); + consumers.chain(groups).min_by_key(|(offset, _, _)| *offset) + } + + /// Time-expiry plus size-retention in one pass: remove the leading sealed + /// segments that have expired or that push the partition past `max_bytes`. + /// Returns the `(segments, messages)` removed. + pub async fn clean_expired_segments( + &mut self, + now: IggyTimestamp, + message_expiry: IggyExpiry, + max_bytes: Option, + ) -> (u64, u64) { + let expired = leading_expired_end(self.log.segments(), now, message_expiry); + let oversized = + max_bytes.and_then(|max_bytes| leading_oversized_end(self.log.segments(), max_bytes)); + let Some(up_to) = expired.into_iter().chain(oversized).max() else { + return (0, 0); + }; + self.remove_sealed_segments_up_to(up_to).await + } + + /// Remove the oldest sealed segments whose `end_offset <= up_to_offset`, + /// never the active segment and never past the consumer barrier (the + /// minimum committed consumer/group offset). Unlinks the messages and + /// index files and decrements partition stats. Idempotent: an offset below + /// the oldest sealed segment removes nothing. Returns the + /// `(segments, messages)` removed. + /// + /// Holds `write_lock` to serialize against the commit/rotate path, which + /// runs on the separate consensus-tick loop. + pub async fn remove_sealed_segments_up_to(&mut self, up_to_offset: u64) -> (u64, u64) { + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + + let barrier = self.min_committed_offset(); + let namespace = self.namespace(); + let removable = { + let segments = self.log.segments(); + let last_idx = segments.len().saturating_sub(1); + let mut removable = 0usize; + for (idx, segment) in segments.iter().enumerate() { + if idx == last_idx || !segment.sealed || segment.end_offset > up_to_offset { + break; + } + if let Some((barrier_offset, kind, consumer_id)) = barrier + && segment.end_offset > barrier_offset + { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + start_offset = segment.start_offset, + end_offset = segment.end_offset, + barrier = barrier_offset, + %kind, + consumer_id, + "segment retained: blocked by committed consumer offset" + ); + break; + } + removable += 1; + } + removable + }; + + let mut deleted_segments = 0u64; + let mut deleted_messages = 0u64; + for _ in 0..removable { + // The removable run is always a prefix (oldest first), so the next + // victim is index 0 once the previous one is gone. + let segment = self.log.segments_mut().remove(0); + let mut storage = self.log.storages_mut().remove(0); + self.log.indexes_mut().remove(0); + self.log.messages_writers_mut().remove(0); + self.log.index_writers_mut().remove(0); + + let (messages_path, index_path) = storage.segment_and_index_paths(); + let _ = storage.shutdown(); + drop(storage); + + for path in messages_path.into_iter().chain(index_path) { + match compio::fs::remove_file(&path).await { + Ok(()) => {} + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + path = %path, + %error, + "failed to unlink segment file during cleanup" + ); + } + } + } + + let segment_size = segment.size.as_bytes_u64(); + // The removal loop above only reaches sealed segments, which always + // hold at least one message, so the count is inclusive end..=start. + // A one-message sealed segment has `start_offset == end_offset`, so + // the `+ 1` is required (a `start == end -> 0` special case would + // undercount it). + let messages_in_segment = segment.end_offset - segment.start_offset + 1; + self.stats.decrement_size_bytes(segment_size); + self.stats.decrement_segments_count(1); + self.stats.decrement_messages_count(messages_in_segment); + + deleted_segments += 1; + deleted_messages += messages_in_segment; + + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + start_offset = segment.start_offset, + end_offset = segment.end_offset, + "deleted sealed segment during cleanup" + ); + } + + (deleted_segments, deleted_messages) + } + + /// Build and install a fresh empty segment starting at `start_offset` with + /// real on-disk writers. Paths are derived from the partition directory + /// (see `rotate_segment`); falls back to the config-derived path for + /// in-memory partitions with no directory. + /// + /// # Errors + /// If the segment's log / index file cannot be created. + async fn install_empty_segment( + &mut self, + config: &PartitionsConfig, + start_offset: u64, + ) -> Result<(), IggyError> { + let namespace = self.namespace(); + let (messages_path, index_path) = self.partition_dir().map_or_else( + || { + ( + config.get_messages_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ), + config.get_index_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ), + ) + }, + |dir| { + ( + format!("{dir}/{start_offset:0>20}.log"), + format!("{dir}/{start_offset:0>20}.index"), + ) + }, + ); + let segment = Segment::new(start_offset, config.segment_size); + let storage = SegmentStorage::new( + &messages_path, + &index_path, + 0, + 0, + config.enforce_fsync, + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?; + let messages_size_bytes = storage + .messages_writer + .as_ref() + .ok_or_else(|| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))? + .size_counter(); + let messages_writer = Rc::new( + MessagesWriter::new( + &messages_path, + messages_size_bytes, + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?, + ); + let index_writer = Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(std::sync::atomic::AtomicU64::new(0)), + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentIndexFile(index_path.clone()))?, + ); + self.log + .add_persisted_segment(segment, storage, Some(messages_writer), Some(index_writer)); + Ok(()) + } + + /// Reset the partition to a single empty segment at offset 0 and clear all + /// consumer / consumer-group offsets (memory + disk). This is the local + /// effect of a committed `PurgeTopic`: it wipes message data and offsets but + /// preserves the partition and its consumer-group membership. Mirrors the + /// legacy server's `purge_all_segments` + offset-file deletion. + /// + /// Records `generation` as the applied purge generation so the reconciler + /// does not re-wipe a partition already purged at this generation (a later + /// `PurgeTopic` advances the committed generation and triggers a fresh pass). + /// + /// # Errors + /// If the replacement segment's log / index file cannot be created. + pub async fn purge( + &mut self, + config: &PartitionsConfig, + generation: u64, + ) -> Result<(), IggyError> { + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + + let namespace = self.namespace(); + + // Drain every segment (including the active one) and unlink its files. + let segment_count = self.log.segments().len(); + for _ in 0..segment_count { + self.log.segments_mut().remove(0); + let mut storage = self.log.storages_mut().remove(0); + self.log.indexes_mut().remove(0); + self.log.messages_writers_mut().remove(0); + self.log.index_writers_mut().remove(0); + + let (messages_path, index_path) = storage.segment_and_index_paths(); + let _ = storage.shutdown(); + drop(storage); + + for path in messages_path.into_iter().chain(index_path) { + match compio::fs::remove_file(&path).await { + Ok(()) => {} + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + path = %path, + %error, + "failed to unlink segment file during purge" + ); + } + } + } + } + + // Recreate a fresh empty segment at offset 0 with real writers. + let start_offset = 0u64; + self.install_empty_segment(config, start_offset).await?; + + // Reset the offset counters so new messages start at offset 0. + self.offset.store(start_offset, Ordering::Release); + self.dirty_offset.store(start_offset, Ordering::Relaxed); + self.should_increment_offset = false; + + // Clear consumer + consumer-group offsets (memory + disk). Collect the + // file paths before deleting so the map guard is not held across an + // await. + let consumer_paths: Vec = { + let guard = self.consumer_offsets.pin(); + let paths = guard + .iter() + .filter_map(|(key, _)| { + u32::try_from(*key) + .ok() + .and_then(|id| self.persisted_offset_path(ConsumerKind::Consumer, id)) + }) + .collect(); + guard.clear(); + paths + }; + let group_paths: Vec = { + let guard = self.consumer_group_offsets.pin(); + let paths = guard + .iter() + .filter_map(|(key, _)| { + u32::try_from(key.0) + .ok() + .and_then(|id| self.persisted_offset_path(ConsumerKind::ConsumerGroup, id)) + }) + .collect(); + guard.clear(); + paths + }; + for path in consumer_paths.into_iter().chain(group_paths) { + let _ = delete_persisted_offset(&path).await; + } + + // Clear the ephemeral cooperative-rebalance tracking too: after the + // reset to offset 0 a stale `last_polled` (a high pre-purge offset) + // would make the reconciler's completion check `committed >= last_polled` + // unsatisfiable, stalling a pending revocation until its timeout. + self.last_polled_offsets.pin().clear(); + + // Reset stats to a single empty segment. + self.stats.zero_out_all(); + self.stats.increment_segments_count(1); + + self.applied_purge_generation = generation; + Ok(()) + } + + /// `end_offset` of the `count`-th oldest sealed (non-active) segment, used + /// to resolve a client `DeleteSegments` count into a concrete truncation + /// offset on the owning shard. `None` when there are no deletable sealed + /// segments; clamps to the last sealed segment when fewer than `count` + /// exist. + #[must_use] + pub fn nth_oldest_sealed_end_offset(&self, count: u32) -> Option { + nth_oldest_sealed_end(self.log.segments(), count) + } + async fn send_prepare_ok(&self, header: &PrepareHeader) { // `VsrAction::RetransmitPrepares` reads from `self.log.journal`. // Both `SendMessages` (via `append_send_messages_to_journal`) and @@ -2389,6 +2742,66 @@ fn accumulate_committed_info( info.max_timestamp = info.max_timestamp.max(base_timestamp); } +/// Highest `end_offset` among the leading run of expired sealed segments, or +/// `None` when none are expired. The last element is the active segment and is +/// never considered. `expiry` must be resolved; a `ServerDefault` expires +/// nothing (see [`Segment::is_expired`]). +fn leading_expired_end( + segments: &[Segment], + now: IggyTimestamp, + expiry: IggyExpiry, +) -> Option { + let last_idx = segments.len().saturating_sub(1); + let mut up_to = None; + for (idx, segment) in segments.iter().enumerate() { + if idx == last_idx || !segment.is_expired(now, expiry) { + break; + } + up_to = Some(segment.end_offset); + } + up_to +} + +/// Highest `end_offset` to drop so the resident size falls to `max_bytes`, or +/// `None` when already under budget. The active segment (last element) is +/// never dropped. The budget is per-partition: the cluster has no single owner +/// of a topic-wide total, so each replica trims its own log. +fn leading_oversized_end(segments: &[Segment], max_bytes: u64) -> Option { + let last_idx = segments.len().saturating_sub(1); + let mut resident: u64 = segments + .iter() + .map(|segment| segment.size.as_bytes_u64()) + .sum(); + let mut up_to = None; + for (idx, segment) in segments.iter().enumerate() { + if idx == last_idx || !segment.sealed || resident <= max_bytes { + break; + } + resident -= segment.size.as_bytes_u64(); + up_to = Some(segment.end_offset); + } + up_to +} + +/// `end_offset` of the `count`-th oldest sealed (non-active) segment of +/// `segments`, or `None` when there is no deletable sealed segment. Clamps to +/// the last sealed segment when fewer than `count` exist. +fn nth_oldest_sealed_end(segments: &[Segment], count: u32) -> Option { + if count == 0 { + return None; + } + // Exclude the active (last) segment, take the leading sealed run, then the + // `count`-th of those (or the last available when fewer exist). + let last_idx = segments.len().saturating_sub(1); + segments + .iter() + .take(last_idx) + .take_while(|segment| segment.sealed) + .take(count as usize) + .map(|segment| segment.end_offset) + .last() +} + /// Walk stamped `[256B SendMessages2Header][blob]` batches in one disk /// chunk, pushing matching fragments. Returns bytes consumed: the start /// of the first batch that did not fully fit in the chunk (the caller @@ -2444,3 +2857,125 @@ fn walk_disk_chunk( cursor.min(bytes.len()) } + +#[cfg(test)] +mod retention_tests { + use super::*; + use iggy_common::IggyDuration; + use std::time::Duration; + + fn segment(end_offset: u64, max_timestamp: u64, size: u64, sealed: bool) -> Segment { + let mut segment = Segment::new(0, IggyByteSize::from(0u64)); + segment.end_offset = end_offset; + segment.max_timestamp = max_timestamp; + segment.size = IggyByteSize::from(size); + segment.sealed = sealed; + segment + } + + fn one_second() -> IggyExpiry { + IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(1))) + } + + #[test] + fn leading_expired_end_skips_active_and_returns_last_expired() { + let segments = vec![ + segment(9, 1, 100, true), + segment(19, 2, 100, true), + segment(29, 3, 100, true), + segment(39, 0, 100, false), // active: never considered + ]; + assert_eq!( + leading_expired_end(&segments, IggyTimestamp::now(), one_second()), + Some(29) + ); + } + + #[test] + fn leading_expired_end_stops_at_first_unexpired() { + let now = IggyTimestamp::now(); + let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_hours(1))); + let segments = vec![ + segment(9, 1, 100, true), // expired + segment(19, now.as_micros(), 100, true), // recent: not expired, stops run + segment(29, 1, 100, true), + segment(39, 0, 100, false), + ]; + assert_eq!(leading_expired_end(&segments, now, expiry), Some(9)); + } + + #[test] + fn leading_expired_end_none_for_never_expire() { + let segments = vec![segment(9, 1, 100, true), segment(19, 0, 100, false)]; + assert_eq!( + leading_expired_end(&segments, IggyTimestamp::now(), IggyExpiry::NeverExpire), + None + ); + } + + #[test] + fn leading_expired_end_none_for_lone_active_segment() { + let segments = vec![segment(9, 1, 100, false)]; + assert_eq!( + leading_expired_end(&segments, IggyTimestamp::now(), one_second()), + None + ); + } + + #[test] + fn leading_oversized_end_trims_oldest_until_under_budget() { + // 4 x 100 = 400 resident, active excluded. Budget 250: drop seg0 (300 + // left) then seg1 (200 <= 250, stop). up_to = seg1.end_offset. + let segments = vec![ + segment(9, 1, 100, true), + segment(19, 2, 100, true), + segment(29, 3, 100, true), + segment(39, 0, 100, false), + ]; + assert_eq!(leading_oversized_end(&segments, 250), Some(19)); + } + + #[test] + fn leading_oversized_end_none_when_under_budget() { + let segments = vec![segment(9, 1, 100, true), segment(19, 0, 100, false)]; + assert_eq!(leading_oversized_end(&segments, 10_000), None); + } + + #[test] + fn leading_oversized_end_never_drops_active_segment() { + let segments = vec![segment(9, 1, 1_000, false)]; + assert_eq!(leading_oversized_end(&segments, 10), None); + } + + #[test] + fn nth_oldest_sealed_end_resolves_count_to_offset() { + let segments = vec![ + segment(9, 1, 100, true), + segment(19, 2, 100, true), + segment(29, 3, 100, true), + segment(39, 0, 100, false), // active: excluded + ]; + assert_eq!(nth_oldest_sealed_end(&segments, 1), Some(9)); + assert_eq!(nth_oldest_sealed_end(&segments, 2), Some(19)); + // More than available sealed: clamps to the last sealed segment. + assert_eq!(nth_oldest_sealed_end(&segments, 10), Some(29)); + assert_eq!(nth_oldest_sealed_end(&segments, 0), None); + } + + #[test] + fn nth_oldest_sealed_end_stops_at_first_unsealed() { + let segments = vec![ + segment(9, 1, 100, true), + segment(19, 2, 100, false), // unsealed mid-run stops the count + segment(29, 3, 100, true), + segment(39, 0, 100, false), + ]; + assert_eq!(nth_oldest_sealed_end(&segments, 5), Some(9)); + } + + #[test] + fn nth_oldest_sealed_end_none_for_lone_active_segment() { + let segments = vec![segment(9, 1, 100, false)]; + assert_eq!(nth_oldest_sealed_end(&segments, 1), None); + } +} diff --git a/core/partitions/src/segment.rs b/core/partitions/src/segment.rs index 69293f2863..538ccd8404 100644 --- a/core/partitions/src/segment.rs +++ b/core/partitions/src/segment.rs @@ -71,7 +71,11 @@ impl Segment { #[must_use] pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool { - if !self.sealed { + // A sealed segment whose newest-message timestamp is unknown (0, e.g. + // sealed before any index was written) must not count as expired: + // `0 + duration <= now` is otherwise always true and would delete it + // instantly. Mirrors the #2924 fix on the common Segment type. + if !self.sealed || self.max_timestamp == 0 { return false; } @@ -83,3 +87,54 @@ impl Segment { } } } + +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::IggyDuration; + use std::time::Duration; + + fn sealed_segment(max_timestamp: u64) -> Segment { + let mut segment = Segment::new(0, IggyByteSize::from(1024u64)); + segment.sealed = true; + segment.max_timestamp = max_timestamp; + segment + } + + #[test] + fn unsealed_segment_is_never_expired() { + let mut segment = sealed_segment(1); + segment.sealed = false; + let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(1))); + assert!(!segment.is_expired(IggyTimestamp::now(), expiry)); + } + + #[test] + fn zero_max_timestamp_segment_is_not_expired() { + let segment = sealed_segment(0); + let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(1))); + assert!(!segment.is_expired(IggyTimestamp::now(), expiry)); + } + + #[test] + fn old_sealed_segment_is_expired() { + let segment = sealed_segment(1); + let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(1))); + assert!(segment.is_expired(IggyTimestamp::now(), expiry)); + } + + #[test] + fn recent_sealed_segment_is_not_expired() { + let now = IggyTimestamp::now(); + let segment = sealed_segment(now.as_micros()); + let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_hours(1))); + assert!(!segment.is_expired(now, expiry)); + } + + #[test] + fn never_expire_and_server_default_never_expire() { + let segment = sealed_segment(1); + assert!(!segment.is_expired(IggyTimestamp::now(), IggyExpiry::NeverExpire)); + assert!(!segment.is_expired(IggyTimestamp::now(), IggyExpiry::ServerDefault)); + } +} diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 1e93ce8041..a07bb99ab2 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -951,6 +951,22 @@ async fn shard_main( None }; + // Segment cleaner: runs on every shard (each replica trims its own log, + // primary and backup alike). Local and unreplicated; gated by the shared + // data-maintenance config. + let segment_cleaner_stop = if config.data_maintenance.messages.cleaner_enabled { + let (stop_tx, stop_rx) = channel(1); + let cleaner_shard = Rc::clone(&shard); + let interval = config.data_maintenance.messages.interval.get_duration(); + let cleaner_handle = compio::runtime::spawn(async move { + crate::segment_cleaner::run_segment_cleaner(cleaner_shard, stop_rx, interval).await; + }); + bus.track_background(cleaner_handle); + Some(stop_tx) + } else { + None + }; + // Listeners (replica + every client transport) bind on shard 0 only. // Shard 0's coordinator round-robins inbound TCP/WS connections to // peer shards via fd-transfer. QUIC and TCP-TLS clients terminate @@ -990,6 +1006,9 @@ async fn shard_main( if let Some(cleaner_stop_tx) = &pat_cleaner_stop { let _ = cleaner_stop_tx.try_send(()); } + if let Some(tx) = &segment_cleaner_stop { + let _ = tx.try_send(()); + } return Err(error); } } @@ -1004,6 +1023,9 @@ async fn shard_main( if let Some(cleaner_stop_tx) = &pat_cleaner_stop { let _ = cleaner_stop_tx.try_send(()); } + if let Some(tx) = &segment_cleaner_stop { + let _ = tx.try_send(()); + } info!(shard = shard_id, "server-ng shard exited cleanly"); Ok(()) diff --git a/core/server-ng/src/dispatch.rs b/core/server-ng/src/dispatch.rs index 24edd63bc5..1a02dcc994 100644 --- a/core/server-ng/src/dispatch.rs +++ b/core/server-ng/src/dispatch.rs @@ -52,6 +52,7 @@ use iggy_binary_protocol::primitives::polling_strategy::WirePollingStrategy; use iggy_binary_protocol::requests::consumer_groups::SyncConsumerGroupRequest; use iggy_binary_protocol::requests::consumer_offsets::GetConsumerOffsetRequest; use iggy_binary_protocol::requests::messages::PollMessagesRequest; +use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest; use iggy_binary_protocol::requests::system::get_client::GetClientRequest; use iggy_binary_protocol::requests::users::{LoginRegisterRequest, LoginRegisterWithPatRequest}; use iggy_binary_protocol::responses::clients::client_response::ConsumerGroupInfoResponse; @@ -65,7 +66,9 @@ use iggy_common::{IggyError, PollingStrategy}; use message_bus::client_listener::RequestHandler; use message_bus::replica::listener::MessageHandler; use message_bus::{IggyMessageBus, MessageBus}; -use metadata::impls::metadata::{MetadataSubmitError, StreamsFrontend}; +use metadata::impls::metadata::{ + MetadataSubmitError, StreamsFrontend, build_truncate_partition_client_message, +}; use partitions::{Partition, PollingArgs, PollingConsumer}; use secrecy::ExposeSecret; use server_common::Message; @@ -211,6 +214,11 @@ pub(crate) fn make_partition_read_handler( partition.last_polled_offsets.pin().remove(&key); PartitionReadReply::Ack } + PartitionRead::ResolveSegmentDeleteOffset { count } => { + PartitionReadReply::SegmentDeleteOffset { + up_to_offset: partition.nth_oldest_sealed_end_offset(count), + } + } }; let _ = reply.try_send(result); }) @@ -512,6 +520,16 @@ async fn handle_client_request( return; } + // DeleteSegments is neither a partition nor a metadata consensus op: the + // owning shard resolves the requested count to a concrete offset, then a + // `TruncatePartition` is replicated through metadata (Option A). Each + // replica's reconciler trims to the committed watermark. Handle it here, + // ahead of the partition/metadata routing below. + if header.operation == Operation::DeleteSegments { + handle_delete_segments_request(shard, transport_client_id, bound, &request).await; + return; + } + // Partition data-plane op (SendMessages / consumer-offset writes): the // op belongs to the partition's own consensus group, not the metadata // group. Route it through the shard mesh by namespace; the owning @@ -1433,6 +1451,117 @@ async fn submit_logout_on_owner( } } +/// Handle a client `DeleteSegments`: resolve the requested count to an offset +/// on the owning shard, replicate a `TruncatePartition` through metadata so +/// every replica trims to the same watermark, then ack the client. The local +/// deletion happens later, when each replica's reconciler observes the commit. +/// +/// Best-effort space management: any failure (bad request, unknown namespace, +/// nothing sealed to delete, transient submit error) is logged and still acks, +/// so the client never blocks on it. +#[allow(clippy::future_not_send)] +async fn handle_delete_segments_request( + shard: &Rc, + transport_client_id: u128, + bound: Option<(u128, u64)>, + request: &Message, +) { + let header = *request.header(); + let body = request_body(request); + + // An unbound transport cannot be attributed a VSR request sequence; the + // outer handler already short-circuits these, so this is defensive. + let Some((vsr_client_id, session)) = bound else { + return; + }; + + // The client numbers DeleteSegments in the same monotonic request sequence + // as every other metadata op. So resolve the requested count to a concrete + // offset on the owning shard, then replicate a `TruncatePartition(offset)` + // AS the client's own request through the standard owner path: the commit + // records (client, session, request) in the `ClientTable` on every replica, + // keeping the sequence contiguous. Skipping the commit (or attributing it + // to an internal id) leaves a hole that fails the next metadata op's + // `request == committed + 1` preflight -> RequestGap -> silent drop -> the + // SDK blocks until timeout. A no-op delete still commits `up_to_offset = 0` + // (monotonic apply) for the same reason. + #[allow(clippy::cast_possible_truncation)] + let truncate = match DeleteSegmentsRequest::decode_from(body) { + Ok(parsed) => match resolve_partition_request_namespace( + shard, + Operation::DeleteSegments, + body, + transport_client_id, + ) { + Ok(namespace_raw) => { + let namespace = IggyNamespace::from_raw(namespace_raw); + let up_to_offset = match shard + .partition_read( + namespace, + PartitionRead::ResolveSegmentDeleteOffset { + count: parsed.segments_count, + }, + ) + .await + { + Some(PartitionReadReply::SegmentDeleteOffset { + up_to_offset: Some(offset), + }) => offset, + _ => 0, + }; + Some(build_truncate_partition_client_message( + &header, + vsr_client_id, + session, + namespace.stream_id() as u32, + namespace.topic_id() as u32, + namespace.partition_id() as u32, + up_to_offset, + )) + } + Err(error) => { + warn!( + transport_client_id, + error = %error, + "delete_segments: unresolved namespace" + ); + None + } + }, + Err(_) => None, + }; + + if let Some(truncate) = truncate + && submit_client_request_on_owner(shard, truncate) + .await + .is_none() + { + // Transient submit failure (not primary / view change). Stay silent; + // the SDK read-timeout replays the same request id, which re-resolves + // and commits. Acking here would advance the client past an unrecorded + // request and gap the next metadata op. + warn!( + transport_client_id, + "delete_segments: transient submit; client will replay" + ); + return; + } + + let commit = current_metadata_commit(shard); + let reply = build_empty_reply(&header, transport_client_id, session, commit); + if let Err(error) = shard + .bus + .send_to_client(transport_client_id, reply.into_generic().into_frozen()) + .await + { + warn!( + transport_client_id, + error = %error, + "delete_segments: failed to send reply" + ); + } +} + /// Disconnect cleanup: the local `SessionManager` connection is already /// dropped by the caller; this submits a session-matched `Logout` so the /// committed apply releases the `ClientTable` slot on every replica (shard 0 diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index 090b8d8ad6..78d9847b85 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -28,6 +28,7 @@ pub mod partition_reconciler; pub mod pat; pub(crate) mod personal_access_token_cleaner; pub mod responses; +pub(crate) mod segment_cleaner; pub mod server_error; pub mod session_manager; pub mod users; diff --git a/core/server-ng/src/partition_reconciler.rs b/core/server-ng/src/partition_reconciler.rs index 10152bb96c..2e2591f519 100644 --- a/core/server-ng/src/partition_reconciler.rs +++ b/core/server-ng/src/partition_reconciler.rs @@ -293,6 +293,8 @@ async fn reconcile_once(ctx: &ReconcilerCtx) -> bool { reconcile_additions(ctx, target, &mut counters).await; reconcile_removals(ctx, &target_set, &mut counters).await; reconcile_consumer_group_offsets(ctx, &mut counters).await; + reconcile_segment_truncations(ctx); + reconcile_partition_purges(ctx); let local_set: AHashSet = ctx.shard.plane.partitions().namespaces().copied().collect(); @@ -757,6 +759,48 @@ fn shards_table_contains(ctx: &ReconcilerCtx, ns: IggyNamespace) -> bool { ctx.shard.shards_table().shard_for(ns).is_some() } +/// Enforce committed `TruncatePartition` watermarks: for each owned partition +/// carrying a non-zero delete watermark, stage a pump-side trim to that offset. +/// Idempotent — the pump no-ops once a partition is trimmed past the watermark, +/// so a redundant pass triggered by an unrelated revision bump is harmless. +fn reconcile_segment_truncations(ctx: &ReconcilerCtx) { + let namespaces: Vec<_> = ctx.shard.plane.partitions().namespaces().copied().collect(); + let streams = ctx.shard.plane.metadata().mux_stm.streams(); + for namespace in namespaces { + let watermark = streams.partition_delete_watermark( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ); + if watermark > 0 { + ctx.shard.request_truncate_partition(namespace, watermark); + } + } +} + +/// Stage a `PurgePartition` reset for every owned partition whose committed +/// `PurgeTopic` generation is newer than the one the local partition last +/// applied. The pump re-checks the generation before wiping, so a redundant +/// pass (e.g. from an unrelated revision bump) is a no-op. +fn reconcile_partition_purges(ctx: &ReconcilerCtx) { + let partitions = ctx.shard.plane.partitions(); + let namespaces: Vec<_> = partitions.namespaces().copied().collect(); + let streams = ctx.shard.plane.metadata().mux_stm.streams(); + for namespace in namespaces { + let committed = streams.partition_purge_generation( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ); + let applied = partitions + .get_by_ns(&namespace) + .map_or(0, partitions::IggyPartition::applied_purge_generation); + if committed > applied { + ctx.shard.request_purge_partition(namespace, committed); + } + } +} + pub fn install_tick_handler(shard: &Rc, wake_tx: WakeTx) { let shard_id = shard.id; let handler = Rc::new(move || { diff --git a/core/server-ng/src/segment_cleaner.rs b/core/server-ng/src/segment_cleaner.rs new file mode 100644 index 0000000000..9e0e104a77 --- /dev/null +++ b/core/server-ng/src/segment_cleaner.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-shard periodic segment cleaner. +//! +//! Local and unreplicated: every replica (primary and backup) trims its own +//! expired or over-budget sealed segments. Divergence in physical log start is +//! invisible to clients because reads are served by the partition primary. The +//! timer resolves each owned partition's retention policy from metadata and +//! stamps `now`, then hands a `CleanPartition` request to the shard pump, the +//! single writer of partition state, which performs the deletion serialized +//! with reads. This mirrors the legacy server's `MessagesCleaner` -> +//! message-pump `CleanTopicMessages` path. + +use crate::bootstrap::ServerNgShard; +use consensus::{MetadataHandle, PartitionsHandle}; +use iggy_common::{IggyExpiry, IggyTimestamp, MaxTopicSize}; +use metadata::impls::metadata::StreamsFrontend; +use shard::Receiver; +use std::rc::Rc; +use std::time::Duration; +use tracing::trace; + +/// Run the cleaner until `stop` fires. Wakes every `interval`; expiry and size +/// are evaluated against wall-clock and resident bytes, so no metadata-commit +/// wake is needed. +pub async fn run_segment_cleaner(shard: Rc, stop: Receiver<()>, interval: Duration) { + trace!( + shard = shard.id, + interval_ms = interval.as_millis(), + "segment cleaner started" + ); + loop { + // `Ok(_)`: stop signalled -> exit. `Err(_)`: interval elapsed -> pass. + match compio::time::timeout(interval, stop.recv()).await { + Ok(_) => break, + Err(_) => stage_owned_partitions(&shard), + } + } + trace!(shard = shard.id, "segment cleaner exited"); +} + +/// Stage a cleaner pass for every partition this shard owns whose topic has a +/// retention policy. Reads config off-pump and hands the resolved decision to +/// the pump; partitions with no policy are skipped without a frame. +fn stage_owned_partitions(shard: &Rc) { + let now = IggyTimestamp::now(); + let namespaces: Vec<_> = shard.plane.partitions().namespaces().copied().collect(); + let streams = shard.plane.metadata().mux_stm.streams(); + for namespace in namespaces { + let Some((message_expiry, max_topic_size, partition_count)) = + streams.topic_retention_config(namespace.stream_id(), namespace.topic_id()) + else { + continue; + }; + + // `ServerDefault` resolves to "never expire" here, matching the legacy + // cleaner: a topic created with the server default never expires + // segments unless an explicit duration was stored. + let has_expiry = !matches!( + message_expiry, + IggyExpiry::NeverExpire | IggyExpiry::ServerDefault + ); + let max_bytes = match max_topic_size { + // Per-partition budget: the cluster has no single owner of a + // topic-wide total, so each partition keeps an equal share. + MaxTopicSize::Custom(size) => { + let divisor = u64::try_from(partition_count).unwrap_or(1).max(1); + Some(size.as_bytes_u64() / divisor) + } + // No per-partition cap. `ServerDefault` must NOT fall through to a + // sized branch: its `as_bytes_u64()` is 0, which would trim every + // sealed segment. The server-ng default topic size is unlimited. + MaxTopicSize::Unlimited | MaxTopicSize::ServerDefault => None, + }; + + if !has_expiry && max_bytes.is_none() { + continue; + } + shard.request_clean_partition(namespace, now, message_expiry, max_bytes); + } +} diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index c8d1230dce..7c4b496d40 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -37,6 +37,7 @@ use iggy_binary_protocol::{ #[cfg(any(test, feature = "simulator"))] use iggy_common::PartitionStats; use iggy_common::variadic; +use iggy_common::{IggyExpiry, IggyTimestamp}; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use message_bus::client_listener::RequestHandler; @@ -240,6 +241,12 @@ pub enum PartitionRead { ClearGroupLastPolled { group_id: u64, }, + /// Resolve a client `DeleteSegments` count into a concrete truncation + /// offset: the `end_offset` of the `count`-th oldest sealed segment. Run on + /// the owning shard, which alone holds the partition's segment state. + ResolveSegmentDeleteOffset { + count: u32, + }, } /// Reply to a [`PartitionRead`]. @@ -261,6 +268,10 @@ pub enum PartitionReadReply { }, /// Acknowledges a [`PartitionRead::ClearGroupLastPolled`]. Ack, + /// Reply to [`PartitionRead::ResolveSegmentDeleteOffset`]: the resolved + /// truncation offset, or `None` when the partition has no sealed segments + /// to delete. + SegmentDeleteOffset { up_to_offset: Option }, /// The owning shard has no materialised partition for the namespace /// (unknown, tombstoned, or mid-reconcile). Callers surface an error /// instead of an empty result. @@ -524,6 +535,34 @@ pub enum LifecycleFrame { /// shard's `reconcile_queue` on receipt; tail drain on every frame /// catches dropped markers. ReconcileApply, + /// Per-shard segment-cleaner request: delete expired / over-budget sealed + /// segments of `namespace` on the pump, serialized with reads. The timer + /// task resolves `message_expiry` / `max_bytes` from metadata and stamps + /// `now`; the pump only mutates. Local and unreplicated — each replica + /// trims its own log (divergence is invisible: reads hit the primary). + CleanPartition { + namespace: IggyNamespace, + now: IggyTimestamp, + message_expiry: IggyExpiry, + max_bytes: Option, + }, + /// Reconciler-staged enforcement of a committed `TruncatePartition` + /// watermark: delete sealed segments up to `up_to_offset` on the pump, + /// serialized with reads. Each replica applies the committed offset + /// locally and idempotently. + TruncatePartition { + namespace: IggyNamespace, + up_to_offset: u64, + }, + /// Reconciler-staged enforcement of a committed `PurgeTopic`: reset the + /// partition to a single empty segment at offset 0 and clear consumer + /// offsets on the pump, serialized with reads. `generation` is the + /// committed purge generation; the pump no-ops if the partition already + /// applied it, so a redundant reconcile pass never re-wipes live data. + PurgePartition { + namespace: IggyNamespace, + generation: u64, + }, } /// Reconciler-staged partition mutation. @@ -1034,6 +1073,55 @@ where let _ = sender.try_send(ShardFrame::lifecycle(LifecycleFrame::ReconcileApply)); } + /// Stage a segment-cleaner pass for `namespace` on this shard's pump. The + /// timer task resolves retention config off-pump and stamps `now`; the pump + /// is the single writer of partition state, so the deletion runs there, + /// serialized with reads. + pub fn request_clean_partition( + &self, + namespace: IggyNamespace, + now: IggyTimestamp, + message_expiry: IggyExpiry, + max_bytes: Option, + ) { + let Some(sender) = self.senders.get(self.id as usize) else { + return; + }; + let _ = sender.try_send(ShardFrame::lifecycle(LifecycleFrame::CleanPartition { + namespace, + now, + message_expiry, + max_bytes, + })); + } + + /// Stage a `TruncatePartition` enforcement for `namespace` on this shard's + /// pump: delete sealed segments up to `up_to_offset`. The reconciler calls + /// this after observing a committed delete watermark for an owned partition. + pub fn request_truncate_partition(&self, namespace: IggyNamespace, up_to_offset: u64) { + let Some(sender) = self.senders.get(self.id as usize) else { + return; + }; + let _ = sender.try_send(ShardFrame::lifecycle(LifecycleFrame::TruncatePartition { + namespace, + up_to_offset, + })); + } + + /// Stage a `PurgePartition` enforcement for `namespace` on this shard's + /// pump: reset the partition to empty at offset 0 and clear consumer + /// offsets. The reconciler calls this after observing a committed purge + /// generation newer than the partition's locally applied one. + pub fn request_purge_partition(&self, namespace: IggyNamespace, generation: u64) { + let Some(sender) = self.senders.get(self.id as usize) else { + return; + }; + let _ = sender.try_send(ShardFrame::lifecycle(LifecycleFrame::PurgePartition { + namespace, + generation, + })); + } + /// Drain and apply staged [`ReconcileOp`]s on the pump task. /// Synchronous: every arm is in-memory only. `ConfirmRemove`'s fsync + /// blocking close is offloaded to a detached task so the pump doesn't diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs index 8a41d3c5ee..d097badfb1 100644 --- a/core/shard/src/router.rs +++ b/core/shard/src/router.rs @@ -20,6 +20,7 @@ use crate::shards_table::{ ShardsTable, calculate_shard_assignment, calculate_shard_from_consensus_ns, }; use crate::{IggyShard, LifecycleFrame, Receiver, ShardFrame}; +use consensus::PartitionsHandle; use crossfire::TrySendError; use futures::FutureExt; use iggy_binary_protocol::{ConsensusHeader, GenericHeader, Operation, PrepareHeader}; @@ -500,6 +501,81 @@ where LifecycleFrame::ReconcileApply => { self.apply_reconcile_ops(); } + LifecycleFrame::CleanPartition { + namespace, + now, + message_expiry, + max_bytes, + } => { + // Pump-side: the single writer of partition state. The timer + // task already resolved the retention decision off-pump, so + // this only mutates, serialized with reads on the same loop. + if let Some(partition) = self.plane.partitions().get_mut_by_ns(&namespace) { + let (segments, messages) = partition + .clean_expired_segments(now, message_expiry, max_bytes) + .await; + if segments > 0 { + tracing::debug!( + shard = self.id, + namespace_raw = namespace.inner(), + segments, + messages, + "segment cleaner removed sealed segments" + ); + } + } + } + LifecycleFrame::TruncatePartition { + namespace, + up_to_offset, + } => { + // Pump-side enforcement of a committed delete watermark. The + // committed offset is identical on every replica, so the local + // deletion converges; idempotent if already trimmed past it. + if let Some(partition) = self.plane.partitions().get_mut_by_ns(&namespace) { + let (segments, messages) = + partition.remove_sealed_segments_up_to(up_to_offset).await; + if segments > 0 { + tracing::debug!( + shard = self.id, + namespace_raw = namespace.inner(), + segments, + messages, + up_to_offset, + "truncate-partition removed sealed segments" + ); + } + } + } + LifecycleFrame::PurgePartition { + namespace, + generation, + } => { + // Pump-side enforcement of a committed `PurgeTopic`: reset the + // partition to empty at offset 0 and clear consumer offsets. + // Idempotent per generation -- skip if already applied so a + // redundant reconcile pass does not wipe messages sent since. + let config = self.plane.partitions().config().clone(); + if let Some(partition) = self.plane.partitions().get_mut_by_ns(&namespace) + && partition.applied_purge_generation() < generation + { + match partition.purge(&config, generation).await { + Ok(()) => tracing::debug!( + shard = self.id, + namespace_raw = namespace.inner(), + generation, + "purge-partition reset partition to empty" + ), + Err(error) => tracing::error!( + shard = self.id, + namespace_raw = namespace.inner(), + generation, + %error, + "purge-partition failed to reset partition" + ), + } + } + } } }