diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e4c6995d639..efe91ba3059 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -41,6 +41,7 @@ use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMet use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, + SwapIndexingPipelinesRequest, SwapIndexingPipelinesResponse, }; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::metastore::{ @@ -353,7 +354,7 @@ impl ControlPlane { let physical_indexing_plan: Vec = self .indexing_scheduler .observable_state() - .last_applied_physical_plan + .current_targeted_physical_plan .map(|plan| { plan.indexing_tasks_per_indexer() .iter() @@ -953,6 +954,20 @@ impl Handler for ControlPlane { } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = ControlPlaneResult; + + async fn handle( + &mut self, + request: SwapIndexingPipelinesRequest, + _ctx: &ActorContext, + ) -> Result { + let response = self.indexing_scheduler.swap_pipelines(request); + Ok(response) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult<()>; @@ -1906,7 +1921,7 @@ mod tests { control_plane_mailbox.ask(Observe).await.unwrap(); let last_applied_physical_plan = control_plane_obs .indexing_scheduler - .last_applied_physical_plan + .current_targeted_physical_plan .unwrap(); let indexing_tasks = last_applied_physical_plan .indexing_tasks_per_indexer() @@ -1937,7 +1952,7 @@ mod tests { control_plane_mailbox.ask(Observe).await.unwrap(); let last_applied_physical_plan = control_plane_obs .indexing_scheduler - .last_applied_physical_plan + .current_targeted_physical_plan .unwrap(); let indexing_tasks = last_applied_physical_plan .indexing_tasks_per_indexer() diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 16d8bbcd737..c52558fe50b 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -28,11 +28,15 @@ use quickwit_common::pretty::PrettySample; use quickwit_config::{ FileSourceParams, SourceParams, disable_ingest_v1, indexing_pipeline_params_fingerprint, }; +use quickwit_proto::control_plane::{ + ControlPlaneResult, SwapIndexingPipelinesEntry, SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, SwapIndexingPipelinesResult, +}; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, PIPELINE_THROUGHPUT, }; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{NodeId, PipelineUid}; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, info, warn}; @@ -60,7 +64,7 @@ const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); pub struct IndexingSchedulerState { pub num_applied_physical_indexing_plan: usize, pub num_schedule_indexing_plan: usize, - pub last_applied_physical_plan: Option, + pub current_targeted_physical_plan: Option, #[serde(skip)] pub last_applied_plan_timestamp: Option, } @@ -282,6 +286,17 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { sources } +/// Holds the pre-validated tasks to move for a single swap entry. +/// Tasks are collected from the original plan before any modifications. +struct ValidSwapOperation { + left_node_id: String, + left_tasks: Vec, + right_node_id: String, + right_tasks: Vec, + left_index_id: String, + right_index_id: Option, +} + impl IndexingScheduler { pub fn new(cluster_id: String, self_node_id: NodeId, indexer_pool: IndexerPool) -> Self { IndexingScheduler { @@ -333,15 +348,15 @@ impl IndexingScheduler { let new_physical_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, - self.state.last_applied_physical_plan.as_ref(), + self.state.current_targeted_physical_plan.as_ref(), &shard_locations, ); let shard_locality_metrics = get_shard_locality_metrics(&new_physical_plan, &shard_locations); crate::metrics::CONTROL_PLANE_METRICS.set_shard_locality_metrics(shard_locality_metrics); - if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { + if let Some(current_targeted_plan) = &self.state.current_targeted_physical_plan { let plans_diff = get_indexing_plans_diff( - last_applied_plan.indexing_tasks_per_indexer(), + current_targeted_plan.indexing_tasks_per_indexer(), new_physical_plan.indexing_tasks_per_indexer(), ); // No need to apply the new plan as it is the same as the old one. @@ -358,9 +373,9 @@ impl IndexingScheduler { /// - If node IDs differ, schedule a new indexing plan. /// - If indexing tasks differ, apply again the last plan. pub(crate) fn control_running_plan(&mut self, model: &ControlPlaneModel) { - let last_applied_plan = - if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { - last_applied_plan + let current_targeted_plan = + if let Some(current_targeted) = &self.state.current_targeted_physical_plan { + current_targeted } else { // If there is no plan, the node is probably starting and the scheduler did not find // indexers yet. In this case, we want to schedule as soon as possible to find new @@ -382,7 +397,7 @@ impl IndexingScheduler { let indexing_plans_diff = get_indexing_plans_diff( &running_indexing_tasks_by_node_id, - last_applied_plan.indexing_tasks_per_indexer(), + current_targeted_plan.indexing_tasks_per_indexer(), ); if !indexing_plans_diff.has_same_nodes() { info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan"); @@ -390,7 +405,7 @@ impl IndexingScheduler { } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); - self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None); + self.apply_physical_indexing_plan(&indexers, current_targeted_plan.clone(), None); } } @@ -438,7 +453,264 @@ impl IndexingScheduler { } self.state.num_applied_physical_indexing_plan += 1; self.state.last_applied_plan_timestamp = Some(Instant::now()); - self.state.last_applied_physical_plan = Some(new_physical_plan); + self.state.current_targeted_physical_plan = Some(new_physical_plan); + } + + /// Swaps indexing pipelines between indexers as requested. + /// + /// The swap is applied in 3 phases: + /// 1. Upfront contradiction check (rejects entire request on failure). + /// 2. Per-swap validation against the original (unmodified) plan. + /// 3. Atomic application of all valid swaps to a working copy of the plan. + pub(crate) fn swap_pipelines( + &mut self, + request: SwapIndexingPipelinesRequest, + ) -> ControlPlaneResult { + // Phase 0: Check that a plan exists. + let Some(original_plan) = &self.state.current_targeted_physical_plan else { + return Ok(SwapIndexingPipelinesResponse { + results: request + .swaps + .into_iter() + .map(|swap| SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: false, + reason: "no indexing plan is currently applied".to_string(), + }) + .collect(), + }); + }; + + // Phase 1: Upfront contradiction check (rejects entire request on failure). + if let Err(error_response) = Self::check_swap_contradictions(&request) { + return Ok(error_response); + } + + // Phase 2: Validate each swap against the ORIGINAL plan and collect + // the tasks to move. + let mut swap_results: Vec = + Vec::with_capacity(request.swaps.len()); + let mut valid_operations: Vec = Vec::new(); + + for swap in &request.swaps { + match Self::validate_single_swap(original_plan, swap) { + Ok(operation) => { + valid_operations.push(operation); + swap_results.push(SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: true, + reason: String::new(), + }); + } + Err(reason) => { + swap_results.push(SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: false, + reason, + }); + } + } + } + + // Phase 3: Apply all valid swaps atomically to a working copy. + if !valid_operations.is_empty() { + let mut new_plan = original_plan.clone(); + for operation in &valid_operations { + Self::apply_swap_operation(&mut new_plan, operation); + } + self.state.current_targeted_physical_plan = Some(new_plan); + } + + Ok(SwapIndexingPipelinesResponse { + results: swap_results, + }) + } + + /// Validates the entire swap request for contradictions. + /// + /// A contradiction exists when: + /// - A swap entry references the same node on both sides. + /// - The same (node_id, index_id) pair appears in more than one swap entry. + /// + /// On contradiction, returns a response with all swaps marked as failed. + fn check_swap_contradictions( + request: &SwapIndexingPipelinesRequest, + ) -> Result<(), SwapIndexingPipelinesResponse> { + let mut seen_slots: FnvHashSet<(&str, &str)> = FnvHashSet::default(); + + let make_error_response = |reason: String| SwapIndexingPipelinesResponse { + results: request + .swaps + .iter() + .map(|s| SwapIndexingPipelinesResult { + swap: Some(s.clone()), + success: false, + reason: reason.clone(), + }) + .collect(), + }; + + for swap in &request.swaps { + // Reject same-node operations. + if swap.left_node_id == swap.right_node_id { + let right_index_desc = swap.right_index_id.as_deref().unwrap_or(""); + let reason = format!( + "request rejected: swap between '{}' (index '{}') and '{}' (index '{}') \ + references the same node", + swap.left_node_id, swap.left_index_id, swap.right_node_id, right_index_desc, + ); + return Err(make_error_response(reason)); + } + + let left_slot = (swap.left_node_id.as_str(), swap.left_index_id.as_str()); + + // Check for duplicate left slots across entries. + if !seen_slots.insert(left_slot) { + let reason = format!( + "request rejected: contradictory swaps — index '{}' on node '{}' is \ + referenced by multiple swap entries", + left_slot.1, left_slot.0, + ); + return Err(make_error_response(reason)); + } + + // Only check right slot for full swaps (when right_index_id is specified). + if let Some(right_index_id) = &swap.right_index_id { + let right_slot = (swap.right_node_id.as_str(), right_index_id.as_str()); + if !seen_slots.insert(right_slot) { + let reason = format!( + "request rejected: contradictory swaps — index '{}' on node '{}' is \ + referenced by multiple swap entries", + right_slot.1, right_slot.0, + ); + return Err(make_error_response(reason)); + } + } + } + + Ok(()) + } + + /// Validates a single swap entry against the original (unmodified) plan. + /// + /// When `right_index_id` is `None`, the operation is a one-way move: the left + /// index's pipelines are moved to the right node without moving any pipelines back. + fn validate_single_swap( + plan: &PhysicalIndexingPlan, + swap: &SwapIndexingPipelinesEntry, + ) -> Result { + // 1. Verify the left indexer exists in the plan. + let left_tasks = plan.indexer(&swap.left_node_id).ok_or_else(|| { + format!( + "indexer '{}' not found in the current plan", + swap.left_node_id + ) + })?; + + // 2. Collect tasks for the left index. + let left_tasks_to_move: Vec = left_tasks + .iter() + .filter(|t| t.index_uid().index_id == swap.left_index_id) + .cloned() + .collect(); + + // 3. Reject if no tasks found on the left side. + if left_tasks_to_move.is_empty() { + return Err(format!( + "no pipelines found for index '{}' on indexer '{}'", + swap.left_index_id, swap.left_node_id, + )); + } + + // 4. For full swaps, validate the right side too. For move-only operations (right_index_id + // is None), just verify the right indexer exists. + let right_tasks_to_move = if let Some(right_index_id) = &swap.right_index_id { + let right_tasks = plan.indexer(&swap.right_node_id).ok_or_else(|| { + format!( + "indexer '{}' not found in the current plan", + swap.right_node_id + ) + })?; + + let right_tasks_to_move: Vec = right_tasks + .iter() + .filter(|t| t.index_uid().index_id == *right_index_id) + .cloned() + .collect(); + + if right_tasks_to_move.is_empty() { + return Err(format!( + "no pipelines found for index '{}' on indexer '{}'", + right_index_id, swap.right_node_id, + )); + } + + if left_tasks_to_move.len() != right_tasks_to_move.len() { + return Err(format!( + "pipeline count mismatch: '{}' has {} pipeline(s) on '{}', but '{}' has {} \ + pipeline(s) on '{}'", + swap.left_index_id, + left_tasks_to_move.len(), + swap.left_node_id, + right_index_id, + right_tasks_to_move.len(), + swap.right_node_id, + )); + } + + right_tasks_to_move + } else { + // Move-only: verify the right indexer exists in the plan. + plan.indexer(&swap.right_node_id).ok_or_else(|| { + format!( + "indexer '{}' not found in the current plan", + swap.right_node_id + ) + })?; + Vec::new() + }; + + Ok(ValidSwapOperation { + left_node_id: swap.left_node_id.clone(), + left_tasks: left_tasks_to_move, + right_node_id: swap.right_node_id.clone(), + right_tasks: right_tasks_to_move, + left_index_id: swap.left_index_id.clone(), + right_index_id: swap.right_index_id.clone(), + }) + } + + /// Applies a validated swap operation to a working copy of the plan. + /// + /// When `right_index_id` is `None`, this is a one-way move: the left index's + /// pipelines are moved to the right node without any pipelines moving back. + fn apply_swap_operation(plan: &mut PhysicalIndexingPlan, operation: &ValidSwapOperation) { + let plan_map = plan.indexing_tasks_per_indexer_mut(); + + // Remove the left index's tasks from the left node. + if let Some(left_node_tasks) = plan_map.get_mut(&operation.left_node_id) { + left_node_tasks.retain(|t| t.index_uid().index_id != operation.left_index_id); + } + // For full swaps, also remove the right index's tasks from the right node. + if let (Some(right_index_id), Some(right_node_tasks)) = ( + &operation.right_index_id, + plan_map.get_mut(&operation.right_node_id), + ) { + right_node_tasks.retain(|t| t.index_uid().index_id != *right_index_id); + } + + // Move left tasks to the right node with fresh pipeline UIDs. + for task in &operation.left_tasks { + let mut moved_task = task.clone(); + moved_task.pipeline_uid = Some(PipelineUid::random()); + plan.add_indexing_task(&operation.right_node_id, moved_task); + } + // For full swaps, move right tasks to the left node with fresh pipeline UIDs. + for task in &operation.right_tasks { + let mut moved_task = task.clone(); + moved_task.pipeline_uid = Some(PipelineUid::random()); + plan.add_indexing_task(&operation.left_node_id, moved_task); + } } } @@ -619,14 +891,14 @@ fn format_indexing_task_map( /// the last plan applied by the scheduler. fn get_indexing_plans_diff<'a>( running_plan: &'a FnvHashMap>, - last_applied_plan: &'a FnvHashMap>, + current_targeted_plan: &'a FnvHashMap>, ) -> IndexingPlansDiff<'a> { // Nodes diff. let running_node_ids: FnvHashSet<&str> = running_plan .keys() .map(|node_id| node_id.as_str()) .collect(); - let planned_node_ids: FnvHashSet<&str> = last_applied_plan + let planned_node_ids: FnvHashSet<&str> = current_targeted_plan .keys() .map(|node_id| node_id.as_str()) .collect(); @@ -647,7 +919,7 @@ fn get_indexing_plans_diff<'a>( .get(*node_id) .map(Vec::as_slice) .unwrap_or_else(|| &[]); - let last_applied_tasks = last_applied_plan + let last_applied_tasks = current_targeted_plan .get(*node_id) .map(Vec::as_slice) .unwrap_or_else(|| &[]); @@ -714,6 +986,10 @@ mod tests { use proptest::{prop_compose, proptest}; use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams}; use quickwit_metastore::IndexMetadata; + use quickwit_proto::control_plane::{SwapIndexingPipelinesEntry, SwapIndexingPipelinesRequest}; + use quickwit_proto::indexing::{ + ApplyIndexingPlanResponse, IndexingServiceClient, MockIndexingService, + }; use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; use super::*; @@ -880,6 +1156,620 @@ mod tests { } } + fn make_test_task(index_id: &str, source_id: &str, pipeline_uid: u128) -> IndexingTask { + IndexingTask { + index_uid: Some(IndexUid::for_test(index_id, 0)), + source_id: source_id.to_string(), + pipeline_uid: Some(PipelineUid::for_test(pipeline_uid)), + shard_ids: Vec::new(), + params_fingerprint: 0, + } + } + + fn make_swap_entry( + left_node: &str, + left_index: &str, + right_node: &str, + right_index: &str, + ) -> SwapIndexingPipelinesEntry { + SwapIndexingPipelinesEntry { + left_node_id: left_node.to_string(), + left_index_id: left_index.to_string(), + right_node_id: right_node.to_string(), + right_index_id: Some(right_index.to_string()), + } + } + + fn make_move_entry( + left_node: &str, + left_index: &str, + right_node: &str, + ) -> SwapIndexingPipelinesEntry { + SwapIndexingPipelinesEntry { + left_node_id: left_node.to_string(), + left_index_id: left_index.to_string(), + right_node_id: right_node.to_string(), + right_index_id: None, + } + } + + fn build_test_scheduler_with_plan(plan: PhysicalIndexingPlan) -> IndexingScheduler { + let indexer_pool = IndexerPool::default(); + for node_id in plan.indexing_tasks_per_indexer().keys() { + let mut mock_indexer = MockIndexingService::new(); + mock_indexer + .expect_apply_indexing_plan() + .returning(|_| Ok(ApplyIndexingPlanResponse {})); + let indexer_info = IndexerNodeInfo { + node_id: NodeId::from(node_id.as_str()), + generation_id: 0, + client: IndexingServiceClient::from_mock(mock_indexer), + indexing_tasks: Vec::new(), + indexing_capacity: CpuCapacity::from_cpu_millis(4_000), + }; + indexer_pool.insert(indexer_info.node_id.clone(), indexer_info); + } + let mut scheduler = + IndexingScheduler::new("test-cluster".to_string(), "test-node".into(), indexer_pool); + scheduler.state.current_targeted_physical_plan = Some(plan); + scheduler + } + + #[tokio::test] + async fn test_swap_pipelines_basic() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + let task_a = make_test_task("index-a", "source-1", 1); + let task_b = make_test_task("index-b", "source-1", 2); + plan.add_indexing_task("indexer-1", task_a.clone()); + plan.add_indexing_task("indexer-2", task_b.clone()); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert_eq!(response.results.len(), 1); + assert!(response.results[0].success); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + // index-a should now be on indexer-2 + let indexer_2_tasks = new_plan.indexer("indexer-2").unwrap(); + assert_eq!(indexer_2_tasks.len(), 1); + assert_eq!(indexer_2_tasks[0].index_uid().index_id, "index-a"); + // index-b should now be on indexer-1 + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + assert_eq!(indexer_1_tasks.len(), 1); + assert_eq!(indexer_1_tasks[0].index_uid().index_id, "index-b"); + } + + #[tokio::test] + async fn test_swap_pipelines_count_mismatch() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 2)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 3)); + + let mut scheduler = build_test_scheduler_with_plan(plan.clone()); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert_eq!(response.results.len(), 1); + assert!(!response.results[0].success); + assert!( + response.results[0] + .reason + .contains("pipeline count mismatch") + ); + + // Plan should be unchanged. + assert_eq!( + scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(), + &plan, + ); + } + + #[tokio::test] + async fn test_swap_pipelines_unknown_indexer() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&["indexer-1".to_string()]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-999", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(!response.results[0].success); + assert!(response.results[0].reason.contains("not found")); + } + + #[tokio::test] + async fn test_swap_pipelines_no_pipelines_for_index() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-NONEXISTENT", + "indexer-2", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(!response.results[0].success); + assert!(response.results[0].reason.contains("no pipelines found")); + } + + #[tokio::test] + async fn test_swap_pipelines_multiple_swaps_partial_success() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + "indexer-3".to_string(), + ]); + // Valid swap pair: 1 pipeline each. + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + // Invalid swap pair: count mismatch (2 vs 1). + plan.add_indexing_task("indexer-2", make_test_task("index-c", "source-1", 3)); + plan.add_indexing_task("indexer-2", make_test_task("index-c", "source-1", 4)); + plan.add_indexing_task("indexer-3", make_test_task("index-d", "source-1", 5)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![ + make_swap_entry("indexer-1", "index-a", "indexer-2", "index-b"), + make_swap_entry("indexer-2", "index-c", "indexer-3", "index-d"), + ], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert_eq!(response.results.len(), 2); + assert!(response.results[0].success); + assert!(!response.results[1].success); + assert!( + response.results[1] + .reason + .contains("pipeline count mismatch") + ); + + // The first swap should have been applied. + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + assert_eq!(indexer_1_tasks.len(), 1); + assert_eq!(indexer_1_tasks[0].index_uid().index_id, "index-b"); + } + + #[tokio::test] + async fn test_swap_pipelines_no_plan() { + let indexer_pool = IndexerPool::default(); + let mut scheduler = + IndexingScheduler::new("test-cluster".to_string(), "test-node".into(), indexer_pool); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(!response.results[0].success); + assert!(response.results[0].reason.contains("no indexing plan")); + } + + #[tokio::test] + async fn test_swap_pipelines_same_node_rejected() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&["indexer-1".to_string()]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-1", make_test_task("index-b", "source-1", 2)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-1", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(!response.results[0].success); + assert!(response.results[0].reason.contains("same node")); + } + + #[tokio::test] + async fn test_swap_pipelines_contradiction_same_slot() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + "indexer-3".to_string(), + ]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + plan.add_indexing_task("indexer-3", make_test_task("index-c", "source-1", 3)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + // Both swaps try to move index-a from indexer-1. + let request = SwapIndexingPipelinesRequest { + swaps: vec![ + make_swap_entry("indexer-1", "index-a", "indexer-2", "index-b"), + make_swap_entry("indexer-1", "index-a", "indexer-3", "index-c"), + ], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + // ALL swaps should be rejected. + assert_eq!(response.results.len(), 2); + assert!(!response.results[0].success); + assert!(!response.results[1].success); + assert!(response.results[0].reason.contains("contradictory")); + } + + #[tokio::test] + async fn test_swap_pipelines_contradiction_does_not_apply_any() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + "indexer-3".to_string(), + ]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + plan.add_indexing_task("indexer-3", make_test_task("index-c", "source-1", 3)); + + let mut scheduler = build_test_scheduler_with_plan(plan.clone()); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![ + make_swap_entry("indexer-1", "index-a", "indexer-2", "index-b"), + make_swap_entry("indexer-1", "index-a", "indexer-3", "index-c"), + ], + }; + let _response = scheduler.swap_pipelines(request).unwrap(); + + // Plan should be completely unchanged. + assert_eq!( + scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(), + &plan, + ); + } + + #[tokio::test] + async fn test_swap_pipelines_fresh_pipeline_uids() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + let task_a = make_test_task("index-a", "source-1", 100); + let task_b = make_test_task("index-b", "source-1", 200); + let original_uid_a = task_a.pipeline_uid; + let original_uid_b = task_b.pipeline_uid; + plan.add_indexing_task("indexer-1", task_a); + plan.add_indexing_task("indexer-2", task_b); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + scheduler.swap_pipelines(request).unwrap(); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + let moved_a = &new_plan.indexer("indexer-2").unwrap()[0]; + let moved_b = &new_plan.indexer("indexer-1").unwrap()[0]; + // Pipeline UIDs must be fresh (different from originals). + assert_ne!(moved_a.pipeline_uid, original_uid_a); + assert_ne!(moved_b.pipeline_uid, original_uid_b); + // And different from each other. + assert_ne!(moved_a.pipeline_uid, moved_b.pipeline_uid); + } + + #[tokio::test] + async fn test_swap_pipelines_multiple_sources_same_index() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + // index-a has 2 sources on indexer-1. + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-kafka", 1)); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-ingest", 2)); + // index-b has 2 sources on indexer-2. + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-kafka", 3)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-ingest", 4)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(response.results[0].success); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + // Both sources of index-a should now be on indexer-2. + let indexer_2_tasks = new_plan.indexer("indexer-2").unwrap(); + assert_eq!(indexer_2_tasks.len(), 2); + assert!( + indexer_2_tasks + .iter() + .all(|t| t.index_uid().index_id == "index-a") + ); + // Both sources of index-b should now be on indexer-1. + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + assert_eq!(indexer_1_tasks.len(), 2); + assert!( + indexer_1_tasks + .iter() + .all(|t| t.index_uid().index_id == "index-b") + ); + } + + #[tokio::test] + async fn test_swap_pipelines_preserves_other_tasks() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + // Tasks being swapped. + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + // Other tasks that should not be affected. + plan.add_indexing_task("indexer-1", make_test_task("index-c", "source-1", 3)); + plan.add_indexing_task("indexer-2", make_test_task("index-d", "source-1", 4)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_swap_entry( + "indexer-1", + "index-a", + "indexer-2", + "index-b", + )], + }; + scheduler.swap_pipelines(request).unwrap(); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + let indexer_2_tasks = new_plan.indexer("indexer-2").unwrap(); + + // indexer-1 should have index-c (unchanged) and index-b (swapped in). + assert_eq!(indexer_1_tasks.len(), 2); + let indexer_1_index_ids: Vec<&str> = indexer_1_tasks + .iter() + .map(|t| t.index_uid().index_id.as_str()) + .collect(); + assert!(indexer_1_index_ids.contains(&"index-c")); + assert!(indexer_1_index_ids.contains(&"index-b")); + + // indexer-2 should have index-d (unchanged) and index-a (swapped in). + assert_eq!(indexer_2_tasks.len(), 2); + let indexer_2_index_ids: Vec<&str> = indexer_2_tasks + .iter() + .map(|t| t.index_uid().index_id.as_str()) + .collect(); + assert!(indexer_2_index_ids.contains(&"index-d")); + assert!(indexer_2_index_ids.contains(&"index-a")); + } + + #[tokio::test] + async fn test_swap_pipelines_move_without_swap() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + let task_a = make_test_task("index-a", "source-1", 1); + let task_b = make_test_task("index-b", "source-1", 2); + plan.add_indexing_task("indexer-1", task_a.clone()); + plan.add_indexing_task("indexer-2", task_b.clone()); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + // Move index-a from indexer-1 to indexer-2 without swapping anything back. + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_move_entry("indexer-1", "index-a", "indexer-2")], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert_eq!(response.results.len(), 1); + assert!( + response.results[0].success, + "{}", + response.results[0].reason + ); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + + // indexer-1 should have no tasks (index-a was moved away). + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + assert!(indexer_1_tasks.is_empty()); + + // indexer-2 should have both index-b (unchanged) and index-a (moved in). + let indexer_2_tasks = new_plan.indexer("indexer-2").unwrap(); + assert_eq!(indexer_2_tasks.len(), 2); + let indexer_2_index_ids: Vec<&str> = indexer_2_tasks + .iter() + .map(|t| t.index_uid().index_id.as_str()) + .collect(); + assert!(indexer_2_index_ids.contains(&"index-a")); + assert!(indexer_2_index_ids.contains(&"index-b")); + } + + #[tokio::test] + async fn test_swap_pipelines_move_fresh_pipeline_uids() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + let task_a = make_test_task("index-a", "source-1", 100); + let original_uid_a = task_a.pipeline_uid; + plan.add_indexing_task("indexer-1", task_a); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_move_entry("indexer-1", "index-a", "indexer-2")], + }; + scheduler.swap_pipelines(request).unwrap(); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + let moved_a = &new_plan.indexer("indexer-2").unwrap()[0]; + // Pipeline UID must be refreshed after the move. + assert_ne!(moved_a.pipeline_uid, original_uid_a); + assert_eq!(moved_a.index_uid().index_id, "index-a"); + } + + #[tokio::test] + async fn test_swap_pipelines_move_unknown_right_indexer() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&["indexer-1".to_string()]); + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_move_entry("indexer-1", "index-a", "indexer-999")], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!(!response.results[0].success); + assert!(response.results[0].reason.contains("not found")); + } + + #[tokio::test] + async fn test_swap_pipelines_move_preserves_right_node_tasks() { + let mut plan = PhysicalIndexingPlan::with_indexer_ids(&[ + "indexer-1".to_string(), + "indexer-2".to_string(), + ]); + // indexer-1 has two indexes; only index-a will be moved. + plan.add_indexing_task("indexer-1", make_test_task("index-a", "source-1", 1)); + plan.add_indexing_task("indexer-1", make_test_task("index-c", "source-1", 3)); + // indexer-2 has index-b which should remain untouched. + plan.add_indexing_task("indexer-2", make_test_task("index-b", "source-1", 2)); + + let mut scheduler = build_test_scheduler_with_plan(plan); + + let request = SwapIndexingPipelinesRequest { + swaps: vec![make_move_entry("indexer-1", "index-a", "indexer-2")], + }; + let response = scheduler.swap_pipelines(request).unwrap(); + + assert!( + response.results[0].success, + "{}", + response.results[0].reason + ); + + let new_plan = scheduler + .state + .current_targeted_physical_plan + .as_ref() + .unwrap(); + + // indexer-1 should still have index-c (only index-a was moved). + let indexer_1_tasks = new_plan.indexer("indexer-1").unwrap(); + assert_eq!(indexer_1_tasks.len(), 1); + assert_eq!(indexer_1_tasks[0].index_uid().index_id, "index-c"); + + // indexer-2 should have both index-b (unchanged) and index-a (moved in). + let indexer_2_tasks = new_plan.indexer("indexer-2").unwrap(); + assert_eq!(indexer_2_tasks.len(), 2); + let indexer_2_index_ids: Vec<&str> = indexer_2_tasks + .iter() + .map(|t| t.index_uid().index_id.as_str()) + .collect(); + assert!(indexer_2_index_ids.contains(&"index-a")); + assert!(indexer_2_index_ids.contains(&"index-b")); + } + #[test] fn test_get_sources_to_schedule() { let mut model = ControlPlaneModel::default(); diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 9f0cd97b477..08cbdf9f230 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -178,7 +178,7 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { indexing_service_inbox.drain_for_test_typed::(); assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert!(scheduler_state.last_applied_physical_plan.is_some()); + assert!(scheduler_state.current_targeted_physical_plan.is_some()); assert_eq!(indexing_service_inbox_messages.len(), 1); // After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is @@ -266,7 +266,7 @@ async fn test_scheduler_scheduling_no_indexer() { .indexing_scheduler; assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); + assert!(scheduler_state.current_targeted_physical_plan.is_none()); // There is no indexer, we should observe no // scheduling. @@ -278,7 +278,7 @@ async fn test_scheduler_scheduling_no_indexer() { .indexing_scheduler; assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); + assert!(scheduler_state.current_targeted_physical_plan.is_none()); universe.assert_quit().await; } @@ -324,7 +324,7 @@ async fn test_scheduler_scheduling_multiple_indexers() { indexing_service_inbox_1.drain_for_test_typed::(); assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); + assert!(scheduler_state.current_targeted_physical_plan.is_none()); assert_eq!(indexing_service_inbox_messages.len(), 0); cluster diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index d0850091280..3474fdd6c2e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -69,6 +69,9 @@ service ControlPlaneService { // Performs a debounced shard pruning request to the metastore. rpc PruneShards(quickwit.metastore.PruneShardsRequest) returns (quickwit.metastore.EmptyResponse); + + // Swaps indexing pipelines of different indexes between different indexers. + rpc SwapIndexingPipelines(SwapIndexingPipelinesRequest) returns (SwapIndexingPipelinesResponse); } // Shard API @@ -125,3 +128,25 @@ message AdviseResetShardsResponse { repeated quickwit.ingest.ShardIds shards_to_delete = 1; repeated quickwit.ingest.ShardIdPositions shards_to_truncate = 2; } + +message SwapIndexingPipelinesRequest { + repeated SwapIndexingPipelinesEntry swaps = 1; +} + +message SwapIndexingPipelinesEntry { + string left_node_id = 1; + string left_index_id = 2; + string right_node_id = 3; + optional string right_index_id = 4; +} + +message SwapIndexingPipelinesResponse { + repeated SwapIndexingPipelinesResult results = 1; +} + +message SwapIndexingPipelinesResult { + SwapIndexingPipelinesEntry swap = 1; + bool success = 2; + // Human-readable reason when success is false. + string reason = 3; +} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 09cfbdebf58..15f85d6c0f1 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -73,6 +73,41 @@ pub struct AdviseResetShardsResponse { pub shards_to_truncate: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SwapIndexingPipelinesRequest { + #[prost(message, repeated, tag = "1")] + pub swaps: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SwapIndexingPipelinesEntry { + #[prost(string, tag = "1")] + pub left_node_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub left_index_id: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub right_node_id: ::prost::alloc::string::String, + #[prost(string, optional, tag = "4")] + pub right_index_id: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SwapIndexingPipelinesResponse { + #[prost(message, repeated, tag = "1")] + pub results: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SwapIndexingPipelinesResult { + #[prost(message, optional, tag = "1")] + pub swap: ::core::option::Option, + #[prost(bool, tag = "2")] + pub success: bool, + /// Human-readable reason when success is false. + #[prost(string, tag = "3")] + pub reason: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] @@ -180,6 +215,11 @@ pub trait ControlPlaneService: std::fmt::Debug + Send + Sync + 'static { &self, request: super::metastore::PruneShardsRequest, ) -> crate::control_plane::ControlPlaneResult; + ///Swaps indexing pipelines of different indexes between different indexers. + async fn swap_indexing_pipelines( + &self, + request: SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult; } #[derive(Debug, Clone)] pub struct ControlPlaneServiceClient { @@ -352,6 +392,12 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.0.prune_shards(request).await } + async fn swap_indexing_pipelines( + &self, + request: SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner.0.swap_indexing_pipelines(request).await + } } #[cfg(any(test, feature = "testsuite"))] pub mod mock_control_plane_service { @@ -440,6 +486,14 @@ pub mod mock_control_plane_service { > { self.inner.lock().await.prune_shards(request).await } + async fn swap_indexing_pipelines( + &self, + request: super::SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::SwapIndexingPipelinesResponse, + > { + self.inner.lock().await.swap_indexing_pipelines(request).await + } } } pub type BoxFuture = std::pin::Pin< @@ -613,6 +667,22 @@ for InnerControlPlaneServiceClient { Box::pin(fut) } } +impl tower::Service for InnerControlPlaneServiceClient { + type Response = SwapIndexingPipelinesResponse; + type Error = crate::control_plane::ControlPlaneError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: SwapIndexingPipelinesRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.swap_indexing_pipelines(request).await }; + Box::pin(fut) + } +} /// A tower service stack is a set of tower services. #[derive(Debug)] struct ControlPlaneServiceTowerServiceStack { @@ -668,6 +738,11 @@ struct ControlPlaneServiceTowerServiceStack { super::metastore::EmptyResponse, crate::control_plane::ControlPlaneError, >, + swap_indexing_pipelines_svc: quickwit_common::tower::BoxService< + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, + >, } #[async_trait::async_trait] impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { @@ -735,6 +810,12 @@ impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { ) -> crate::control_plane::ControlPlaneResult { self.prune_shards_svc.clone().ready().await?.call(request).await } + async fn swap_indexing_pipelines( + &self, + request: SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.swap_indexing_pipelines_svc.clone().ready().await?.call(request).await + } } type CreateIndexLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< @@ -836,6 +917,16 @@ type PruneShardsLayer = quickwit_common::tower::BoxLayer< super::metastore::EmptyResponse, crate::control_plane::ControlPlaneError, >; +type SwapIndexingPipelinesLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, + >, + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, +>; #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerLayerStack { create_index_layers: Vec, @@ -848,6 +939,7 @@ pub struct ControlPlaneServiceTowerLayerStack { get_or_create_open_shards_layers: Vec, advise_reset_shards_layers: Vec, prune_shards_layers: Vec, + swap_indexing_pipelines_layers: Vec, } impl ControlPlaneServiceTowerLayerStack { pub fn stack_layer(mut self, layer: L) -> Self @@ -1120,6 +1212,33 @@ impl ControlPlaneServiceTowerLayerStack { >>::Service as tower::Service< super::metastore::PruneShardsRequest, >>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + SwapIndexingPipelinesRequest, + Response = SwapIndexingPipelinesResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service< + SwapIndexingPipelinesRequest, + >>::Future: Send + 'static, { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -1141,6 +1260,8 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.prune_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.swap_indexing_pipelines_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_create_index_layer(mut self, layer: L) -> Self @@ -1353,6 +1474,28 @@ impl ControlPlaneServiceTowerLayerStack { self.prune_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_swap_indexing_pipelines_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + SwapIndexingPipelinesRequest, + Response = SwapIndexingPipelinesResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.swap_indexing_pipelines_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -1496,6 +1639,14 @@ impl ControlPlaneServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let swap_indexing_pipelines_svc = self + .swap_indexing_pipelines_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: inner_client, create_index_svc, @@ -1508,6 +1659,7 @@ impl ControlPlaneServiceTowerLayerStack { get_or_create_open_shards_svc, advise_reset_shards_svc, prune_shards_svc, + swap_indexing_pipelines_svc, }; ControlPlaneServiceClient::new(tower_svc_stack) } @@ -1673,6 +1825,15 @@ where super::metastore::EmptyResponse, crate::control_plane::ControlPlaneError, >, + > + + tower::Service< + SwapIndexingPipelinesRequest, + Response = SwapIndexingPipelinesResponse, + Error = crate::control_plane::ControlPlaneError, + Future = BoxFuture< + SwapIndexingPipelinesResponse, + crate::control_plane::ControlPlaneError, + >, >, { async fn create_index( @@ -1739,6 +1900,12 @@ where ) -> crate::control_plane::ControlPlaneResult { self.clone().call(request).await } + async fn swap_indexing_pipelines( + &self, + request: SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.clone().call(request).await + } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1918,6 +2085,20 @@ where super::metastore::PruneShardsRequest::rpc_name(), )) } + async fn swap_indexing_pipelines( + &self, + request: SwapIndexingPipelinesRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner + .clone() + .swap_indexing_pipelines(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + SwapIndexingPipelinesRequest::rpc_name(), + )) + } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -2049,6 +2230,17 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn swap_indexing_pipelines( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .swap_indexing_pipelines(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -2450,6 +2642,36 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Swaps indexing pipelines of different indexes between different indexers. + pub async fn swap_indexing_pipelines( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.control_plane.ControlPlaneService/SwapIndexingPipelines", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.control_plane.ControlPlaneService", + "SwapIndexingPipelines", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -2546,6 +2768,14 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; + /// Swaps indexing pipelines of different indexes between different indexers. + async fn swap_indexing_pipelines( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -3137,6 +3367,57 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } + "/quickwit.control_plane.ControlPlaneService/SwapIndexingPipelines" => { + #[allow(non_camel_case_types)] + struct SwapIndexingPipelinesSvc( + pub Arc, + ); + impl< + T: ControlPlaneServiceGrpc, + > tonic::server::UnaryService + for SwapIndexingPipelinesSvc { + type Response = super::SwapIndexingPipelinesResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::swap_indexing_pipelines( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SwapIndexingPipelinesSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index 4278ec104eb..32df01062b7 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -138,6 +138,12 @@ impl RpcName for AdviseResetShardsRequest { } } +impl RpcName for SwapIndexingPipelinesRequest { + fn rpc_name() -> &'static str { + "swap_indexing_pipelines" + } +} + impl GetOrCreateOpenShardsFailureReason { pub fn create_failure( &self, diff --git a/quickwit/quickwit-serve/src/indexing_api/mod.rs b/quickwit/quickwit-serve/src/indexing_api/mod.rs index 9d3740615a3..e9e16d79431 100644 --- a/quickwit/quickwit-serve/src/indexing_api/mod.rs +++ b/quickwit/quickwit-serve/src/indexing_api/mod.rs @@ -14,4 +14,4 @@ mod rest_handler; -pub use rest_handler::{IndexingApi, indexing_get_handler}; +pub use rest_handler::{IndexingApi, indexing_get_handler, swap_pipelines_handler}; diff --git a/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs b/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs index 1dcc3cd05df..4412c83c43f 100644 --- a/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs @@ -16,6 +16,10 @@ use std::convert::Infallible; use quickwit_actors::{AskError, Mailbox, Observe}; use quickwit_indexing::actors::{IndexingService, IndexingServiceCounters}; +use quickwit_proto::control_plane::{ + ControlPlaneError, ControlPlaneService, ControlPlaneServiceClient, SwapIndexingPipelinesEntry, + SwapIndexingPipelinesRequest, SwapIndexingPipelinesResponse, SwapIndexingPipelinesResult, +}; use warp::{Filter, Rejection}; use crate::format::extract_format_from_qs; @@ -24,7 +28,15 @@ use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; #[derive(utoipa::OpenApi)] -#[openapi(paths(indexing_endpoint))] +#[openapi( + paths(indexing_endpoint, swap_pipelines_endpoint), + components(schemas( + SwapIndexingPipelinesRequest, + SwapIndexingPipelinesResponse, + SwapIndexingPipelinesEntry, + SwapIndexingPipelinesResult, + )) +)] pub struct IndexingApi; #[utoipa::path( @@ -59,3 +71,244 @@ pub fn indexing_get_handler( .recover(recover_fn) .boxed() } + +#[utoipa::path( + post, + tag = "Swap pipelines", + path = "/indexing/swap-pipelines", + request_body = SwapIndexingPipelinesRequest, + responses( + (status = 200, description = "Successfully swapped indexing pipelines.", body = SwapIndexingPipelinesResponse) + ) +)] +async fn swap_pipelines_endpoint( + body: SwapIndexingPipelinesRequest, + control_plane_client: ControlPlaneServiceClient, +) -> Result { + control_plane_client.swap_indexing_pipelines(body).await +} + +fn swap_pipelines_post_filter() -> impl Filter + Clone { + warp::path!("indexing" / "swap-pipelines").and(warp::post()) +} + +pub fn swap_pipelines_handler( + control_plane_client: ControlPlaneServiceClient, +) -> impl Filter + Clone { + swap_pipelines_post_filter() + .and(warp::body::json()) + .and(warp::any().map(move || control_plane_client.clone())) + .then(swap_pipelines_endpoint) + .and(extract_format_from_qs()) + .map(into_rest_api_response) + .recover(recover_fn) + .boxed() +} + +#[cfg(test)] +mod tests { + use quickwit_proto::control_plane::{ + ControlPlaneServiceClient, MockControlPlaneService, SwapIndexingPipelinesEntry, + SwapIndexingPipelinesRequest, SwapIndexingPipelinesResponse, SwapIndexingPipelinesResult, + }; + use warp::Filter; + + use super::swap_pipelines_handler; + use crate::rest::recover_fn; + + #[tokio::test] + async fn test_swap_pipelines_handler_success() { + let mut mock = MockControlPlaneService::new(); + mock.expect_swap_indexing_pipelines().returning(|request| { + let results = request + .swaps + .iter() + .map(|swap| SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: true, + reason: String::new(), + }) + .collect(); + Ok(SwapIndexingPipelinesResponse { results }) + }); + let control_plane_client = ControlPlaneServiceClient::from_mock(mock); + + let handler = swap_pipelines_handler(control_plane_client).recover(recover_fn); + + let body = serde_json::to_vec(&SwapIndexingPipelinesRequest { + swaps: vec![SwapIndexingPipelinesEntry { + left_node_id: "indexer-1".to_string(), + left_index_id: "index-a".to_string(), + right_node_id: "indexer-2".to_string(), + right_index_id: Some("index-b".to_string()), + }], + }) + .unwrap(); + + let resp = warp::test::request() + .method("POST") + .path("/indexing/swap-pipelines") + .header("content-type", "application/json") + .body(body) + .reply(&handler) + .await; + + assert_eq!(resp.status(), 200); + + let response: SwapIndexingPipelinesResponse = serde_json::from_slice(resp.body()).unwrap(); + assert_eq!(response.results.len(), 1); + assert!(response.results[0].success); + let swap = response.results[0].swap.as_ref().unwrap(); + assert_eq!(swap.left_node_id, "indexer-1"); + assert_eq!(swap.left_index_id, "index-a"); + assert_eq!(swap.right_node_id, "indexer-2"); + assert_eq!(swap.right_index_id.as_deref(), Some("index-b")); + } + + #[tokio::test] + async fn test_swap_pipelines_handler_partial_failure() { + let mut mock = MockControlPlaneService::new(); + mock.expect_swap_indexing_pipelines().returning(|request| { + let results = request + .swaps + .iter() + .enumerate() + .map(|(i, swap)| { + if i == 0 { + SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: true, + reason: String::new(), + } + } else { + SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: false, + reason: "pipeline count mismatch".to_string(), + } + } + }) + .collect(); + Ok(SwapIndexingPipelinesResponse { results }) + }); + let control_plane_client = ControlPlaneServiceClient::from_mock(mock); + + let handler = swap_pipelines_handler(control_plane_client).recover(recover_fn); + + let body = serde_json::to_vec(&SwapIndexingPipelinesRequest { + swaps: vec![ + SwapIndexingPipelinesEntry { + left_node_id: "indexer-1".to_string(), + left_index_id: "index-a".to_string(), + right_node_id: "indexer-2".to_string(), + right_index_id: Some("index-b".to_string()), + }, + SwapIndexingPipelinesEntry { + left_node_id: "indexer-3".to_string(), + left_index_id: "index-c".to_string(), + right_node_id: "indexer-4".to_string(), + right_index_id: Some("index-d".to_string()), + }, + ], + }) + .unwrap(); + + let resp = warp::test::request() + .method("POST") + .path("/indexing/swap-pipelines") + .header("content-type", "application/json") + .body(body) + .reply(&handler) + .await; + + assert_eq!(resp.status(), 200); + + let response: SwapIndexingPipelinesResponse = serde_json::from_slice(resp.body()).unwrap(); + assert_eq!(response.results.len(), 2); + assert!(response.results[0].success); + assert!(!response.results[1].success); + assert!( + response.results[1] + .reason + .contains("pipeline count mismatch") + ); + } + + #[tokio::test] + async fn test_swap_pipelines_handler_move_without_right_index() { + let mut mock = MockControlPlaneService::new(); + mock.expect_swap_indexing_pipelines().returning(|request| { + let results = request + .swaps + .iter() + .map(|swap| SwapIndexingPipelinesResult { + swap: Some(swap.clone()), + success: true, + reason: String::new(), + }) + .collect(); + Ok(SwapIndexingPipelinesResponse { results }) + }); + let control_plane_client = ControlPlaneServiceClient::from_mock(mock); + + let handler = swap_pipelines_handler(control_plane_client).recover(recover_fn); + + // Send JSON without right_index_id field — should deserialize to None. + let body = r#"{"swaps": [{"left_node_id": "indexer-1", "left_index_id": "index-a", "right_node_id": "indexer-2"}]}"#; + + let resp = warp::test::request() + .method("POST") + .path("/indexing/swap-pipelines") + .header("content-type", "application/json") + .body(body) + .reply(&handler) + .await; + + assert_eq!(resp.status(), 200); + + let response: SwapIndexingPipelinesResponse = serde_json::from_slice(resp.body()).unwrap(); + assert_eq!(response.results.len(), 1); + assert!(response.results[0].success); + let swap = response.results[0].swap.as_ref().unwrap(); + assert_eq!(swap.left_node_id, "indexer-1"); + assert_eq!(swap.left_index_id, "index-a"); + assert_eq!(swap.right_node_id, "indexer-2"); + assert!(swap.right_index_id.is_none()); + } + + #[tokio::test] + async fn test_swap_pipelines_handler_invalid_json_body() { + let mock = MockControlPlaneService::new(); + let control_plane_client = ControlPlaneServiceClient::from_mock(mock); + + let handler = swap_pipelines_handler(control_plane_client).recover(recover_fn); + + let resp = warp::test::request() + .method("POST") + .path("/indexing/swap-pipelines") + .header("content-type", "application/json") + .body(b"not json at all") + .reply(&handler) + .await; + + // Warp returns 400 for invalid JSON bodies. + assert_eq!(resp.status(), 400); + } + + #[tokio::test] + async fn test_swap_pipelines_handler_wrong_method() { + let mock = MockControlPlaneService::new(); + let control_plane_client = ControlPlaneServiceClient::from_mock(mock); + + let handler = swap_pipelines_handler(control_plane_client).recover(recover_fn); + + let resp = warp::test::request() + .method("GET") + .path("/indexing/swap-pipelines") + .reply(&handler) + .await; + + // GET on a POST-only route returns 405. + assert_eq!(resp.status(), 405); + } +} diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index ae33bb50a08..61aba1bef36 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -43,7 +43,7 @@ use crate::developer_api::developer_api_routes; use crate::elasticsearch_api::elastic_api_handlers; use crate::health_check_api::health_check_handlers; use crate::index_api::index_management_handlers; -use crate::indexing_api::indexing_get_handler; +use crate::indexing_api::{indexing_get_handler, swap_pipelines_handler}; use crate::ingest_api::ingest_api_handlers; use crate::jaeger_api::jaeger_api_handlers; use crate::metrics_api::metrics_handler; @@ -316,6 +316,10 @@ fn api_v1_routes( quickwit_services.indexing_service_opt.clone(), )) .boxed() + .or(swap_pipelines_handler( + quickwit_services.control_plane_client.clone(), + )) + .boxed() .or(search_routes(quickwit_services.search_service.clone())) .boxed() .or(ingest_api_handlers(