Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub enum Operation {
/// consensus by the partition reconciler; no client wire code.
CompleteConsumerGroupRevocation = 67,

/// Server-originated: set a partition's delete watermark (the offset below
/// which sealed segments are removed). The dispatch handler resolves a
/// client `DeleteSegments` count to a concrete offset on the owning shard
/// and submits this through metadata consensus, so every replica applies
/// the same offset deterministically. No client wire code.
TruncatePartition = 68,

// Metadata operations (shard 0)
CreateStream = 128,
UpdateStream = 129,
Expand All @@ -66,9 +73,10 @@ pub enum Operation {
PurgeTopic = 135,
CreatePartitions = 136,
DeletePartitions = 137,
// TODO: DeleteSegments is a partition operation (is_partition() == true) but its
// discriminant sits in the metadata range (128-147). Should be moved to 163 once
// iggy_common's Operation enum is removed and wire compat is no longer a concern.
// Client op handled specially: the dispatch layer resolves the requested
// segment count to an offset on the owning shard and replicates a
// `TruncatePartition` through metadata, so `DeleteSegments` itself is
// neither a metadata nor a partition consensus op (see `is_partition`).
DeleteSegments = 138,
CreateConsumerGroup = 139,
DeleteConsumerGroup = 140,
Expand All @@ -86,7 +94,6 @@ pub enum Operation {
SendMessages = 160,
StoreConsumerOffset = 161,
DeleteConsumerOffset = 162,
// 163 is reserved for the planned DeleteSegments move (see TODO above).
StoreConsumerOffset2 = 164,
DeleteConsumerOffset2 = 165,
}
Expand Down Expand Up @@ -152,7 +159,7 @@ impl Operation {
#[must_use]
#[inline]
pub const fn is_partition(&self) -> bool {
matches!(self, Self::DeleteSegments) || (*self as u8) >= Self::PARTITION_START
(*self as u8) >= Self::PARTITION_START
}

/// Operations clients are allowed to send directly.
Expand All @@ -175,7 +182,8 @@ impl Operation {
| Self::CreateTopicWithAssignments
| Self::CreatePartitionsWithAssignments
| Self::RemoveConsumerGroupMember
| Self::CompleteConsumerGroupRevocation => None,
| Self::CompleteConsumerGroupRevocation
| Self::TruncatePartition => None,
Self::CreateStream
| Self::UpdateStream
| Self::DeleteStream
Expand Down Expand Up @@ -315,7 +323,11 @@ mod tests {
assert!(Operation::CreateStream.is_client_allowed());
assert!(Operation::SendMessages.is_partition());
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
assert!(!Operation::DeleteSegments.is_partition());
assert!(!Operation::DeleteSegments.is_metadata());
assert!(Operation::TruncatePartition.is_internal());
assert!(Operation::TruncatePartition.is_metadata());
assert!(!Operation::TruncatePartition.is_client_allowed());
assert!(Operation::DeleteConsumerOffset.is_partition());
assert!(Operation::StoreConsumerOffset2.is_partition());
assert!(Operation::DeleteConsumerOffset2.is_partition());
Expand Down
1 change: 1 addition & 0 deletions core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta>
| Operation::CreatePartitionsWithAssignments
| Operation::RemoveConsumerGroupMember
| Operation::CompleteConsumerGroupRevocation
| Operation::TruncatePartition
| Operation::Reserved
| Operation::Register
| Operation::Logout
Expand Down
1 change: 1 addition & 0 deletions core/consensus/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ pub const fn operation_as_str(operation: Operation) -> &'static str {
Operation::CreatePartitionsWithAssignments => "create_partitions_with_assignments",
Operation::RemoveConsumerGroupMember => "remove_consumer_group_member",
Operation::CompleteConsumerGroupRevocation => "complete_consumer_group_revocation",
Operation::TruncatePartition => "truncate_partition",
Operation::CreateStream => "create_stream",
Operation::UpdateStream => "update_stream",
Operation::DeleteStream => "delete_stream",
Expand Down
12 changes: 6 additions & 6 deletions core/integration/tests/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ mod cg_vsr;
#[cfg(not(feature = "vsr"))]
mod concurrent_addition;
mod general;
// Asserts the periodic messages cleaner deletes expired segments from disk;
// server-ng has no data-maintenance cleaner yet.
#[cfg(not(feature = "vsr"))]
// The per-shard segment cleaner deletes expired / oversize segments from disk
// under both the legacy server and server-ng.
mod message_cleanup;
mod message_retrieval;
// Mixes server restarts, consumer-group barriers, and DeleteSegments
// maintenance; out of vsr scope for now.
#[cfg(not(feature = "vsr"))]
// Server restarts, consumer-group barriers, and DeleteSegments maintenance.
// `should_delete_segments_without_consumers` is framing-agnostic + async-aware
// and runs under server-ng; the consumer-group / restart variants stay
// legacy-shaped (cg polling has its own vsr gaps) and are filtered out there.
mod purge_delete;
mod scenarios;
mod specific;
31 changes: 29 additions & 2 deletions core/integration/tests/server/purge_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use crate::server::scenarios::purge_delete_scenario;
use integration::iggy_harness;
use test_case::test_matrix;

// Legacy-only: asserts the exact on-disk layout ([0, 7, 14, 21], 7 messages per
// 5KiB segment) and per-file byte sizes throughout. server-ng's per-message
// framing yields a different layout, so the scenario's offset anchors do not
// hold under vsr. Port to a framing-agnostic form to re-enable.
#[cfg(not(feature = "vsr"))]
#[iggy_harness(server(
segment.size = "5KiB",
segment.cache_indexes = ["all", "none", "open_segment"],
Expand All @@ -39,11 +44,23 @@ async fn should_delete_segments_and_validate_filesystem(
partition.messages_required_to_save = "1",
partition.enforce_fsync = "true",
))]
#[test_matrix([restart_off(), restart_on()])]
// server-ng restarts the cluster on restart_on, which currently fails in
// bootstrap independently of segment deletion: an `Index data must be exactly
// 16 bytes` recovery panic (cache_indexes all/none) and a replica-port rebind
// race. Run only the no-restart matrix under vsr until those are fixed.
#[cfg_attr(not(feature = "vsr"), test_matrix([restart_off(), restart_on()]))]
#[cfg_attr(feature = "vsr", test_matrix([restart_off()]))]
async fn should_delete_segments_without_consumers(harness: &mut TestHarness, restart_server: bool) {
purge_delete_scenario::run_no_consumers(harness, restart_server).await;
}

// vsr-gated: asserts a segment is freed *exactly* when the polled offset
// reaches its end — a synchronous-commit assumption. Under vsr the
// consumer-group auto-commit lags the poll, so the per-message timing does not
// hold (the barrier itself is correct: the server retains un-consumed
// segments, verified by `min_committed_offset` + its runtime diag). Legacy-only
// until redesigned to sync on the committed offset for async commit.
#[cfg(not(feature = "vsr"))]
#[iggy_harness(server(
segment.size = "5KiB",
segment.cache_indexes = ["all", "none", "open_segment"],
Expand All @@ -57,6 +74,9 @@ async fn should_delete_segments_with_consumer_group_barrier(harness: &TestHarnes
purge_delete_scenario::run_consumer_group_barrier(&client, &data_path).await;
}

// Legacy-only: pins the [0, 7, 14, 21] layout across a multi-consumer barrier.
// server-ng framing differs; port to framing-agnostic to re-enable.
#[cfg(not(feature = "vsr"))]
#[iggy_harness(server(
segment.size = "5KiB",
segment.cache_indexes = ["all", "none", "open_segment"],
Expand All @@ -77,7 +97,13 @@ async fn should_block_deletion_until_all_consumers_pass_segment(
partition.messages_required_to_save = "1",
partition.enforce_fsync = "true",
))]
#[test_matrix([restart_off(), restart_on()])]
// restart_on is vsr-gated: cluster restart hits a pre-existing server-ng
// bootstrap failure (legacy 16-byte index reader vs server-ng 24-byte writer),
// unrelated to purge. The scenario asserts the exact [0, 7, 14, 21] layout only
// on the legacy path; under vsr it verifies the framing-agnostic purge outcome
// (offsets cleared, files deleted, partition reset to a single segment at 0).
#[cfg_attr(not(feature = "vsr"), test_matrix([restart_off(), restart_on()]))]
#[cfg_attr(feature = "vsr", test_matrix([restart_off()]))]
async fn should_purge_topic_and_clear_consumer_offsets(
harness: &mut TestHarness,
restart_server: bool,
Expand All @@ -89,6 +115,7 @@ fn restart_off() -> bool {
false
}

#[cfg(not(feature = "vsr"))]
fn restart_on() -> bool {
true
}
2 changes: 0 additions & 2 deletions core/integration/tests/server/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ pub mod invalid_consumer_offset_scenario;
// file), so volume-based rotation rules cannot trigger.
#[cfg(not(feature = "vsr"))]
pub mod log_rotation_scenario;
#[cfg(not(feature = "vsr"))]
pub mod message_cleanup_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod offset_scenario;
#[cfg(not(feature = "vsr"))]
pub mod permissions_scenario;
#[cfg(not(feature = "vsr"))]
pub mod purge_delete_scenario;
pub mod read_during_persistence_scenario;
#[cfg(not(feature = "vsr"))]
Expand Down
Loading
Loading