diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index d32db8a9e45..4fa52e6b9ea 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -39,6 +39,7 @@ use quickwit_config::{ use quickwit_index_management::{IndexService, clear_cache_directory}; use quickwit_indexing::IndexingPipeline; use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService}; +use quickwit_indexing::mature_merge::{MatureMergeConfig, merge_mature_all_indexes}; use quickwit_indexing::models::{ DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, }; @@ -163,6 +164,56 @@ pub fn build_tool_command() -> Command { .required(true), ]) ) + .subcommand( + Command::new("merge-mature") + .display_order(10) + .about("Merges mature splits across all indexes and nodes.") + .long_about( + "Scans indexes for merge opportunities in mature Published splits. Considers \ + opportunities across all origin nodes and sources. Runs once and exits." + ) + .args(&[ + arg!(--"dry-run" + "Prints the planned merge operations without executing them.") + .required(false), + arg!(--"max-concurrent-merges" + "Maximum number of merges to run concurrently (default: 10).") + .display_order(1) + .required(false), + arg!(--"retention-safety-buffer-days" + "Splits within this many days of the retention cutoff are excluded (default: 5).") + .display_order(2) + .required(false), + arg!(--"min-merge-group-size" + "Minimum number of splits in a group to trigger a merge (default: 5).") + .display_order(3) + .required(false), + arg!(--"input-split-max-num-docs" + "Maximum number of docs in a split for it to be eligible (default: 10_000).") + .display_order(4) + .required(false), + arg!(--"max-merge-group-size" + "Maximum number of splits per merge operation (default: 100).") + .display_order(5) + .required(false), + arg!(--"split-target-num-docs" + "Maximum total docs per merge operation (default: 5_000_000).") + .display_order(6) + .required(false), + arg!(--"index-parallelism" + "Number of indexes processed concurrently (default: 50).") + .display_order(7) + .required(false), + arg!(--"index-id-patterns" + "Comma-separated list of index ID patterns to include (default: '*').") + .display_order(8) + .required(false), + arg!(--"metrics" + "Expose Prometheus metrics on the REST listen address during the run.") + .display_order(9) + .required(false), + ]) + ) .arg_required_else_help(true) } @@ -207,6 +258,13 @@ pub struct MergeArgs { pub source_id: SourceId, } +#[derive(Debug, Eq, PartialEq)] +pub struct MatureMergeArgs { + pub config_uri: Uri, + pub merge_config: MatureMergeConfig, + pub serve_metrics: bool, +} + #[derive(Debug, Eq, PartialEq)] pub struct ExtractSplitArgs { pub config_uri: Uri, @@ -221,6 +279,7 @@ pub enum ToolCliCommand { LocalIngest(LocalIngestDocsArgs), LocalSearch(LocalSearchArgs), Merge(MergeArgs), + MatureMerge(MatureMergeArgs), ExtractSplit(ExtractSplitArgs), } @@ -234,6 +293,7 @@ impl ToolCliCommand { "local-ingest" => Self::parse_local_ingest_args(submatches), "local-search" => Self::parse_local_search_args(submatches), "merge" => Self::parse_merge_args(submatches), + "merge-mature" => Self::parse_mature_merge_args(submatches), "extract-split" => Self::parse_extract_split_args(submatches), _ => bail!("unknown tool subcommand `{subcommand}`"), } @@ -385,12 +445,84 @@ impl ToolCliCommand { })) } + fn parse_mature_merge_args(mut matches: ArgMatches) -> anyhow::Result { + let config_uri = matches + .remove_one::("config") + .map(|uri_str| Uri::from_str(&uri_str)) + .expect("`config` should be a required arg.")?; + let defaults = MatureMergeConfig::default(); + let dry_run = matches.get_flag("dry-run"); + let max_concurrent_merges = matches + .remove_one::("max-concurrent-merges") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.max_concurrent_merges); + let retention_safety_buffer_days = matches + .remove_one::("retention-safety-buffer-days") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.retention_safety_buffer_days); + let min_merge_group_size = matches + .remove_one::("min-merge-group-size") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.min_merge_group_size); + let input_split_max_num_docs = matches + .remove_one::("input-split-max-num-docs") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.input_split_max_num_docs); + let max_merge_group_size = matches + .remove_one::("max-merge-group-size") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.max_merge_group_size); + let split_target_num_docs = matches + .remove_one::("split-target-num-docs") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.split_target_num_docs); + let index_parallelism = matches + .remove_one::("index-parallelism") + .map(|s| s.parse::()) + .transpose()? + .unwrap_or(defaults.index_parallelism); + let index_id_patterns = matches + .remove_one::("index-id-patterns") + .map(|s| s.split(',').map(|p| p.trim().to_string()).collect()) + .unwrap_or(defaults.index_id_patterns); + let serve_metrics = matches.get_flag("metrics"); + + if max_concurrent_merges == 0 { + bail!("`max-concurrent-merges` must be greater than or equal to 1."); + } + if index_parallelism == 0 { + bail!("`index-parallelism` must be greater than or equal to 1."); + } + Ok(Self::MatureMerge(MatureMergeArgs { + config_uri, + serve_metrics, + merge_config: MatureMergeConfig { + dry_run, + max_concurrent_merges, + retention_safety_buffer_days, + min_merge_group_size, + input_split_max_num_docs, + max_merge_group_size, + split_target_num_docs, + index_parallelism, + index_id_patterns, + }, + })) + } + pub async fn execute(self) -> anyhow::Result<()> { match self { Self::GarbageCollect(args) => garbage_collect_index_cli(args).await, Self::LocalIngest(args) => local_ingest_docs_cli(args).await, Self::LocalSearch(args) => local_search_cli(args).await, Self::Merge(args) => merge_cli(args).await, + Self::MatureMerge(args) => merge_mature_cli(args).await, Self::ExtractSplit(args) => extract_split_cli(args).await, } } @@ -651,6 +783,43 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { Ok(()) } +pub async fn merge_mature_cli(args: MatureMergeArgs) -> anyhow::Result<()> { + debug!(args=?args, "merge-mature"); + info!(merge_config=?args.merge_config, "merge-mature configuration"); + println!("❯ Scanning all indexes for mature merge opportunities..."); + let config = load_node_config(&args.config_uri).await?; + let (storage_resolver, metastore_resolver) = + get_resolvers(&config.storage_configs, &config.metastore_configs); + let metastore = metastore_resolver.resolve(&config.metastore_uri).await?; + + let runtimes_config = RuntimesConfig::default(); + start_actor_runtimes( + runtimes_config, + &HashSet::from_iter([QuickwitService::Indexer]), + )?; + + if args.serve_metrics { + let metrics_addr = config.rest_config.listen_addr; + tokio::spawn(serve_metrics(metrics_addr)); + } + + merge_mature_all_indexes( + metastore, + storage_resolver, + &config.data_dir_path, + args.merge_config.clone(), + config.node_id, + ) + .await?; + + if !args.merge_config.dry_run { + info!("mature splits successfully merged, waiting for explicit termination signal"); + tokio::time::sleep(Duration::MAX).await; + } + + Ok(()) +} + pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> { debug!(args=?args, "garbage-collect-index"); println!("❯ Garbage collecting index..."); @@ -955,3 +1124,48 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { Ok(cluster) } + +/// A shortcut to expose the metrics without loading the whole quickwit_serve +/// machinery. +async fn serve_metrics(addr: std::net::SocketAddr) { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(l) => l, + Err(err) => { + tracing::warn!("metrics server could not bind to {addr}: {err}"); + return; + } + }; + tracing::info!("metrics server listening on http://{addr}/metrics"); + loop { + let Ok((mut stream, _peer)) = listener.accept().await else { + continue; + }; + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let n = match stream.read(&mut buf).await { + Ok(n) => n, + Err(_) => return, + }; + let request = std::str::from_utf8(&buf[..n]).unwrap_or(""); + let is_metrics = request.starts_with("GET /metrics"); + let (status, body) = if is_metrics { + match quickwit_common::metrics::metrics_text_payload() { + Ok(payload) => ("200 OK", payload), + Err(e) => { + tracing::error!("failed to encode prometheus metrics: {e}"); + ("500 Internal Server Error", String::new()) + } + } + } else { + ("404 Not Found", String::new()) + }; + let response = format!( + "HTTP/1.1 {status}\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: \ + {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ); + let _ = stream.write_all(response.as_bytes()).await; + }); + } +} diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 0d1da7d02c5..660a8b62d05 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -52,11 +52,28 @@ use crate::models::{ }; use crate::soft_delete_query::SoftDeletedDocIdsQuery; +/// The mapping resolution assiated to the merge. To perform deletes a full doc +/// mapper is required. For regular merges, we only need the tokenizer manager. +#[derive(Clone)] +enum MapperContext { + TokenizersOnly(quickwit_query::tokenizers::TokenizerManager), + DocMapper(Arc), +} + +impl MapperContext { + fn tokenizer_manager(&self) -> quickwit_query::tokenizers::TokenizerManager { + match self { + MapperContext::TokenizersOnly(tokenizer_manager) => tokenizer_manager.clone(), + MapperContext::DocMapper(doc_mapper) => doc_mapper.tokenizer_manager().clone(), + } + } +} + #[derive(Clone)] pub struct MergeExecutor { pipeline_id: MergePipelineId, metastore: MetastoreServiceClient, - doc_mapper: Arc, + mapper_context: MapperContext, io_controls: IoControls, merge_packager_mailbox: Mailbox, } @@ -364,7 +381,24 @@ impl MergeExecutor { MergeExecutor { pipeline_id, metastore, - doc_mapper, + mapper_context: MapperContext::DocMapper(doc_mapper), + io_controls, + merge_packager_mailbox, + } + } + + /// Creates a simpler MergeExecutor that doesn't support deletes. + pub fn new_with_tokenizers_only( + pipeline_id: MergePipelineId, + metastore: MetastoreServiceClient, + tokenizer_manager: quickwit_query::tokenizers::TokenizerManager, + io_controls: IoControls, + merge_packager_mailbox: Mailbox, + ) -> Self { + MergeExecutor { + pipeline_id, + metastore, + mapper_context: MapperContext::TokenizersOnly(tokenizer_manager), io_controls, merge_packager_mailbox, } @@ -380,7 +414,7 @@ impl MergeExecutor { ) -> anyhow::Result { let (union_index_meta, split_directories, per_split_metas) = open_split_directories( &tantivy_dirs, - self.doc_mapper.tokenizer_manager().tantivy_manager(), + self.mapper_context.tokenizer_manager().tantivy_manager(), )?; // Build a mapping from each segment ID to the soft-deleted doc IDs of its parent split. let soft_deleted_docs: HashMap> = per_split_metas @@ -415,7 +449,7 @@ impl MergeExecutor { // splits. let merged_index = open_index( controlled_directory.clone(), - self.doc_mapper.tokenizer_manager().tantivy_manager(), + self.mapper_context.tokenizer_manager().tantivy_manager(), )?; ctx.record_progress(); @@ -437,6 +471,9 @@ impl MergeExecutor { merge_scratch_directory: TempDirectory, ctx: &ActorContext, ) -> anyhow::Result> { + let MapperContext::DocMapper(doc_mapper) = &self.mapper_context else { + anyhow::bail!("DocMapper is required to process delete and merge operations"); + }; let list_delete_tasks_request = ListDeleteTasksRequest::new(split.index_uid.clone(), split.delete_opstamp); let delete_tasks = ctx @@ -464,7 +501,7 @@ impl MergeExecutor { let (union_index_meta, split_directories, per_split_metas) = open_split_directories( &tantivy_dirs, - self.doc_mapper.tokenizer_manager().tantivy_manager(), + doc_mapper.tokenizer_manager().tantivy_manager(), )?; // Build a mapping from each segment ID to the soft-deleted doc IDs of the input split. let soft_deleted_docs: HashMap> = @@ -487,7 +524,7 @@ impl MergeExecutor { union_index_meta, split_directories, delete_tasks, - doc_mapper_opt: Some(self.doc_mapper.clone()), + doc_mapper_opt: Some(doc_mapper.clone()), soft_deleted_docs, }, merge_scratch_directory.path(), @@ -498,12 +535,7 @@ impl MergeExecutor { // This will have the side effect of deleting the directory containing the downloaded split. let mut merged_index = Index::open(controlled_directory.clone())?; ctx.record_progress(); - merged_index.set_tokenizers( - self.doc_mapper - .tokenizer_manager() - .tantivy_manager() - .clone(), - ); + merged_index.set_tokenizers(doc_mapper.tokenizer_manager().tantivy_manager().clone()); merged_index.set_fast_field_tokenizers( get_quickwit_fastfield_normalizer_manager() .tantivy_manager() @@ -536,8 +568,7 @@ impl MergeExecutor { let uncompressed_docs_size_in_bytes = (num_docs as f32 * split.uncompressed_docs_size_in_bytes as f32 / split.num_docs as f32) as u64; - let time_range = if let Some(timestamp_field_name) = self.doc_mapper.timestamp_field_name() - { + let time_range = if let Some(timestamp_field_name) = doc_mapper.timestamp_field_name() { let reader = merged_segment_reader .fast_fields() .date(timestamp_field_name)?; @@ -605,7 +636,7 @@ impl MergeExecutor { let union_directory = UnionDirectory::union_of(directory_stack); let union_index = open_index( union_directory, - self.doc_mapper.tokenizer_manager().tantivy_manager(), + self.mapper_context.tokenizer_manager().tantivy_manager(), )?; ctx.record_progress(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index bbe5267d514..3818edd8c73 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -33,6 +33,15 @@ pub struct MergePermit { } impl MergePermit { + /// Creates a `MergePermit` from an owned semaphore permit, without notifying any + /// `MergeSchedulerService` on drop. Use this when managing concurrency externally. + pub fn new(permit: OwnedSemaphorePermit) -> MergePermit { + MergePermit { + _semaphore_permit: Some(permit), + merge_scheduler_mailbox: None, + } + } + #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> MergePermit { MergePermit { diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 5d68bb59285..7d124288288 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -17,7 +17,7 @@ use std::path::Path; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::io::IoControls; -use quickwit_common::temp_dir::{self, TempDirectory}; +use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::SplitMetadata; use tantivy::Directory; use tracing::{debug, info, instrument}; @@ -62,14 +62,13 @@ impl Handler for MergeSplitDownloader { merge_task: MergeTask, ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { - let merge_scratch_directory = temp_dir::Builder::default() - .join("merge") - .tempdir_in(self.scratch_directory.path()) + let merge_scratch_directory = self + .scratch_directory + .named_temp_child("merge") .map_err(|error| anyhow::anyhow!(error))?; info!(dir=%merge_scratch_directory.path().display(), "download-merge-splits"); - let downloaded_splits_directory = temp_dir::Builder::default() - .join("downloaded-splits") - .tempdir_in(merge_scratch_directory.path()) + let downloaded_splits_directory = merge_scratch_directory + .named_temp_child("downloaded-splits") .map_err(|error| anyhow::anyhow!(error))?; let tantivy_dirs = self .download_splits( diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 3a11d6b1f54..2d85ca1a1af 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -35,6 +35,7 @@ use crate::source::{SourceActor, SuggestTruncate}; pub struct PublisherCounters { pub num_published_splits: u64, pub num_replace_operations: u64, + pub num_replaced_splits: u64, pub num_empty_splits: u64, } @@ -218,6 +219,7 @@ impl Handler for Publisher { self.counters.num_published_splits += 1; } else { self.counters.num_replace_operations += 1; + self.counters.num_replaced_splits += replaced_splits.len() as u64; } } else { self.counters.num_empty_splits += 1; @@ -365,6 +367,7 @@ mod tests { let publisher_observation = publisher_handle.process_pending_and_observe().await.state; assert_eq!(publisher_observation.num_published_splits, 1); + assert_eq!(publisher_observation.num_replaced_splits, 0); let suggest_truncate_checkpoints: Vec = source_inbox .drain_for_test_typed::() @@ -441,6 +444,7 @@ mod tests { let publisher_observation = publisher_handle.process_pending_and_observe().await.state; assert_eq!(publisher_observation.num_published_splits, 0); assert_eq!(publisher_observation.num_replace_operations, 0); + assert_eq!(publisher_observation.num_replaced_splits, 0); assert_eq!(publisher_observation.num_empty_splits, 1); let suggest_truncate_checkpoints: Vec = source_inbox @@ -528,6 +532,7 @@ mod tests { let publisher_observation = publisher_handle.process_pending_and_observe().await.state; assert_eq!(publisher_observation.num_published_splits, 0); assert_eq!(publisher_observation.num_replace_operations, 1); + assert_eq!(publisher_observation.num_replaced_splits, 2); let merge_planner_msgs = merge_planner_inbox.drain_for_test_typed::(); assert_eq!(merge_planner_msgs.len(), 1); assert_eq!(merge_planner_msgs[0].new_splits.len(), 1); @@ -568,6 +573,7 @@ mod tests { let publisher_observation = publisher_handle.process_pending_and_observe().await.state; assert_eq!(publisher_observation.num_published_splits, 0); + assert_eq!(publisher_observation.num_replaced_splits, 0); let merger_messages = merge_planner_inbox.drain_for_test(); assert!(merger_messages.is_empty()); @@ -647,6 +653,7 @@ mod tests { // Publish must still succeed despite the race condition (warning is non-fatal). let observation = publisher_handle.process_pending_and_observe().await.state; assert_eq!(observation.num_replace_operations, 1); + assert_eq!(observation.num_replaced_splits, 1); universe.assert_quit().await; } } diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 8e0f300ac90..9183fda3890 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -35,6 +35,8 @@ pub use crate::split_store::{IndexingSplitStore, get_tantivy_directory_from_spli pub mod actors; mod controlled_directory; +pub mod mature_merge; +pub mod mature_merge_plan; pub mod merge_policy; mod metrics; pub mod models; diff --git a/quickwit/quickwit-indexing/src/mature_merge.rs b/quickwit/quickwit-indexing/src/mature_merge.rs new file mode 100644 index 00000000000..de04b9c4ae2 --- /dev/null +++ b/quickwit/quickwit-indexing/src/mature_merge.rs @@ -0,0 +1,844 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::{Context, bail}; +use bytesize::ByteSize; +use futures::StreamExt; +use quickwit_actors::{ActorExitStatus, Universe}; +use quickwit_common::io::IoControls; +use quickwit_common::{KillSwitch, temp_dir}; +use quickwit_metastore::{ + IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, + MetastoreServiceStreamSplitsExt, SplitState, +}; +use quickwit_proto::indexing::MergePipelineId; +use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, +}; +use quickwit_proto::types::NodeId; +use quickwit_storage::StorageResolver; +use tantivy::Inventory; +use time::OffsetDateTime; +use tokio::sync::Semaphore; +use tracing::{info, warn}; + +use crate::actors::{ + MergeExecutor, MergePermit, MergeSplitDownloader, Packager, Publisher, PublisherType, Uploader, + UploaderType, +}; +use crate::mature_merge_plan::{MATURITY_BUFFER, plan_merge_operations_for_index}; +use crate::merge_policy::{MergeOperation, MergeTask, NopMergePolicy}; +use crate::split_store::{IndexingSplitCache, IndexingSplitStore}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MatureMergeConfig { + /// Splits within this many days of the retention cutoff are left untouched. + pub retention_safety_buffer_days: u64, + /// Minimum number of splits in a group before a merge operation is emitted. + pub min_merge_group_size: usize, + /// Maximum number of docs in a split for it to be eligible for mature merging. + pub input_split_max_num_docs: usize, + /// Maximum number of splits per merge operation. + pub max_merge_group_size: usize, + /// Maximum total number of documents per merge operation. + pub split_target_num_docs: usize, + /// Number of indexes processed concurrently. Lower to avoid fetching splits + /// metadata too eagerly. + pub index_parallelism: usize, + /// Maximum number of merges running concurrently across all indexes. + pub max_concurrent_merges: usize, + /// Print planned operations without executing them. + pub dry_run: bool, + /// List of index patterns to include in the mature merge process. + pub index_id_patterns: Vec, +} + +impl Default for MatureMergeConfig { + fn default() -> Self { + Self { + retention_safety_buffer_days: 5, + min_merge_group_size: 5, + input_split_max_num_docs: 10_000, + max_merge_group_size: 100, + split_target_num_docs: 5_000_000, + index_parallelism: 50, + max_concurrent_merges: 10, + dry_run: false, + index_id_patterns: vec!["*".to_string()], + } + } +} + +/// Statistics for the merges performed on a single index. +#[derive(Debug, Default)] +struct IndexMergeOutcome { + num_published_merges: u64, + num_replaced_splits: u64, +} + +struct IndexMergeSummary { + num_merges_planned: usize, + num_input_splits: usize, + total_input_bytes: u64, + outcome: IndexMergeOutcome, +} + +/// Fetches all published splits for the given index from the metastore (no +/// node-id filter) and calls [`plan_merge_operations_for_index`]. +async fn fetch_splits_and_plan( + index_metadata: &IndexMetadata, + metastore: &MetastoreServiceClient, + now: OffsetDateTime, + config: &MatureMergeConfig, +) -> anyhow::Result> { + let index_uid = index_metadata.index_uid.clone(); + let list_splits_query = ListSplitsQuery::for_index(index_uid) + .with_split_state(SplitState::Published) + .retain_mature(now - MATURITY_BUFFER); + let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?; + let splits_stream = metastore.list_splits(list_splits_request).await?; + let splits = splits_stream.collect_splits_metadata().await?; + + if splits.iter().any(|s| !s.tags.is_empty()) { + // with tags and doc mapping evolutions, we might have weird edge cases + // -> just refuse them for now + bail!("tags not supported in mature merges") + } + + let total_splits = splits.len(); + let operations = + plan_merge_operations_for_index(&index_metadata.index_config, splits, now, config); + + info!( + index_id = %index_metadata.index_config.index_id, + total_splits, + num_planned_merges = operations.len(), + "fetched splits for mature merge planning" + ); + Ok(operations) +} + +/// Executes the given merge operations for a single index using the standard +/// actor pipeline: `MergeSplitDownloader -> MergeExecutor -> Packager -> +/// Uploader -> Publisher`. +/// +/// Tags are not supported and we use the default tokenizer manager. In practice +/// we could use the tags and custom tokenizers from the current doc mapping, +/// but schema evolutions could lead to un-anticipated edge cases. +#[allow(clippy::too_many_arguments)] +async fn run_mature_merges_for_index( + index_metadata: &IndexMetadata, + operations: Vec, + metastore: MetastoreServiceClient, + split_store: IndexingSplitStore, + semaphore: Arc, + data_dir_path: &std::path::Path, + config: &MatureMergeConfig, + node_id: NodeId, +) -> anyhow::Result { + if operations.is_empty() { + return Ok(IndexMergeOutcome { + num_published_merges: 0, + num_replaced_splits: 0, + }); + } + + let index_config = &index_metadata.index_config; + let index_uid = index_metadata.index_uid.clone(); + + let indexing_directory = temp_dir::Builder::default() + .join("mature-merge") + .tempdir_in(data_dir_path) + .context("failed to create temp directory for mature merge")?; + + let pipeline_id = MergePipelineId { + node_id, + index_uid, + source_id: "_mature_merge".to_string(), + }; + + let universe = Universe::new(); + let kill_switch = KillSwitch::default(); + + // Build chain from publisher inward (each actor gets the next actor's mailbox). + + let merge_publisher = Publisher::new( + PublisherType::MergePublisher, + metastore.clone(), + // No feedback loop to a merge planner. + None, + None, + ); + let (merge_publisher_mailbox, merge_publisher_handle) = universe + .spawn_builder() + .set_kill_switch(kill_switch.clone()) + .spawn(merge_publisher); + + let merge_uploader = Uploader::new( + UploaderType::MergeUploader, + metastore.clone(), + Arc::new(NopMergePolicy), + index_config.retention_policy_opt.clone(), + split_store.clone(), + merge_publisher_mailbox.into(), + config.max_concurrent_merges, + Default::default(), + ); + let (merge_uploader_mailbox, merge_uploader_handle) = universe + .spawn_builder() + .set_kill_switch(kill_switch.clone()) + .spawn(merge_uploader); + + // Tag fields not supported for now + let tag_fields = Vec::new(); + let merge_packager = Packager::new("MaturePackager", tag_fields, merge_uploader_mailbox); + let (merge_packager_mailbox, merge_packager_handle) = universe + .spawn_builder() + .set_kill_switch(kill_switch.clone()) + .spawn(merge_packager); + + let merge_executor = MergeExecutor::new_with_tokenizers_only( + pipeline_id, + metastore, + // we only support the default tokenizer manager + quickwit_query::create_default_quickwit_tokenizer_manager(), + IoControls::default().set_component("mature_merger"), + merge_packager_mailbox, + ); + let (merge_executor_mailbox, merge_executor_handle) = universe + .spawn_builder() + .set_kill_switch(kill_switch.clone()) + .spawn(merge_executor); + + let merge_split_downloader = MergeSplitDownloader { + scratch_directory: indexing_directory, + split_store, + executor_mailbox: merge_executor_mailbox, + io_controls: IoControls::default().set_component("mature_split_downloader"), + }; + let (merge_split_downloader_mailbox, merge_split_downloader_handle) = universe + .spawn_builder() + .set_kill_switch(kill_switch.clone()) + .spawn(merge_split_downloader); + + // Send all merge tasks to the downloader, gated by the concurrency semaphore. + let inventory: Inventory = Inventory::default(); + for operation in operations { + let permit = Arc::clone(&semaphore) + .acquire_owned() + .await + .expect("semaphore should not be closed"); + let merge_task = MergeTask { + merge_operation: inventory.track(operation), + _merge_permit: MergePermit::new(permit), + }; + if merge_split_downloader_mailbox + .send_message(merge_task) + .await + .is_err() + { + anyhow::bail!("merge split downloader actor died unexpectedly"); + } + } + + // Dropping the downloader mailbox signals no more tasks are coming. + // The pipeline will cascade-exit once all pending tasks are processed. + drop(merge_split_downloader_mailbox); + + let (downloader_status, _) = merge_split_downloader_handle.join().await; + let (executor_status, _) = merge_executor_handle.join().await; + let (packager_status, _) = merge_packager_handle.join().await; + let (uploader_status, _) = merge_uploader_handle.join().await; + let (publisher_status, publisher_counters) = merge_publisher_handle.join().await; + + universe.quit().await; + + for (name, status) in [ + ("downloader", downloader_status), + ("executor", executor_status), + ("packager", packager_status), + ("uploader", uploader_status), + ("publisher", publisher_status), + ] { + if !matches!(status, ActorExitStatus::Success | ActorExitStatus::Quit) { + anyhow::bail!( + "mature merge actor `{}` exited with unexpected status: {:?}", + name, + status + ); + } + } + + Ok(IndexMergeOutcome { + num_published_merges: publisher_counters.num_replace_operations, + num_replaced_splits: publisher_counters.num_replaced_splits, + }) +} + +/// Plans and optionally executes mature merges for a single index +#[allow(clippy::too_many_arguments)] +async fn merge_mature_single_index( + index_metadata: IndexMetadata, + metastore: &MetastoreServiceClient, + storage_resolver: &StorageResolver, + semaphore: Arc, + data_dir_path: &std::path::Path, + config: &MatureMergeConfig, + node_id: NodeId, + now: OffsetDateTime, +) -> anyhow::Result { + let index_id = index_metadata.index_config.index_id.clone(); + let operations = fetch_splits_and_plan(&index_metadata, metastore, now, config).await?; + let num_merges_planned = operations.len(); + let num_input_splits: usize = operations.iter().map(|op| op.splits.len()).sum(); + let total_input_bytes: u64 = operations + .iter() + .flat_map(|op| op.splits.iter()) + .map(|s| s.uncompressed_docs_size_in_bytes) + .sum(); + + if config.dry_run { + for op in &operations { + log_op_for_dry_run(op, &index_metadata.index_config.index_id); + } + return Ok(IndexMergeSummary { + num_merges_planned, + num_input_splits, + total_input_bytes, + outcome: IndexMergeOutcome::default(), + }); + } + + if operations.is_empty() { + return Ok(IndexMergeSummary { + num_merges_planned: 0, + total_input_bytes: 0, + num_input_splits: 0, + outcome: IndexMergeOutcome::default(), + }); + } + + let index_uri = index_metadata.index_uri(); + let remote_storage = storage_resolver + .resolve(index_uri) + .await + .context("failed to resolve index storage")?; + let split_store = + IndexingSplitStore::new(remote_storage, Arc::new(IndexingSplitCache::no_caching())); + + let outcome = run_mature_merges_for_index( + &index_metadata, + operations, + metastore.clone(), + split_store, + semaphore, + data_dir_path, + config, + node_id, + ) + .await?; + + if num_merges_planned > 0 { + info!( + index_id = %index_id, + planned = num_merges_planned, + published_merges = outcome.num_published_merges, + replaced_splits = outcome.num_replaced_splits, + input_splits = num_input_splits, + input_bytes = total_input_bytes, + "mature split merges complete for index" + ); + } + + Ok(IndexMergeSummary { + num_merges_planned, + num_input_splits, + total_input_bytes, + outcome, + }) +} + +/// Aggregates per-index results, logs per-index and global summary lines, and warns on errors. +fn log_merge_results(results: Vec>) { + let mut total_planned_merges = 0usize; + let mut total_input_splits = 0usize; + let mut total_input_bytes = 0u64; + let mut total_successfully_published_merges = 0u64; + let mut total_successfully_replaced_splits = 0u64; + + let mut num_indexes_successfully_merged = 0usize; + let mut num_indexes_partially_merged = 0usize; + let mut num_indexes_without_opportunity = 0usize; + + for result in results { + match result { + Ok(summary) => { + total_planned_merges += summary.num_merges_planned; + total_input_splits += summary.num_input_splits; + total_input_bytes += summary.total_input_bytes; + total_successfully_published_merges += summary.outcome.num_published_merges; + total_successfully_replaced_splits += summary.outcome.num_replaced_splits; + + if summary.num_merges_planned == 0 { + num_indexes_without_opportunity += 1; + } else if summary.outcome.num_published_merges + == (summary.num_merges_planned as u64) + { + num_indexes_successfully_merged += 1; + } else { + num_indexes_partially_merged += 1; + } + } + Err(err) => { + warn!(err = ?err, "error processing index during mature merge"); + } + } + } + info!( + num_indexes_successfully_merged, + num_indexes_partially_merged, + num_indexes_without_opportunity, + total_planned_merges, + total_successfully_published_merges, + total_successfully_replaced_splits, + total_input_splits, + total_input_bytes, + "mature merge complete" + ); +} + +fn log_op_for_dry_run(op: &MergeOperation, index_id: &str) { + let start_time = op + .splits + .iter() + .filter_map(|s| s.time_range.as_ref().map(|r| r.start())) + .min() + .unwrap_or(&0); + let end_time = op + .splits + .iter() + .filter_map(|s| s.time_range.as_ref().map(|r| r.end())) + .max() + .unwrap_or(&0); + let fmt_ts = |ts: i64| { + OffsetDateTime::from_unix_timestamp(ts) + .map(|dt| { + format!( + "{}-{:02}-{:02}T{:02}", + dt.year(), + dt.month() as u8, + dt.day(), + dt.hour() + ) + }) + .unwrap_or_else(|_| ts.to_string()) + }; + // print is better than log because dry-run will be used interactively from the CLI + println!( + "[dry-run] {index_id}: {} splits | {} docs | {} | {} → {}", + op.splits.len(), + op.splits.iter().map(|s| s.num_docs).sum::(), + ByteSize(op.splits.iter().map(|s| s.footer_offsets.end).sum::()), + fmt_ts(*start_time), + fmt_ts(*end_time), + ); +} + +/// Processes all indexes from the metastore, discovering and running mature +/// merge opportunities. +/// +/// If `dry_run` is `true`, the planned operations are printed but not executed. +pub async fn merge_mature_all_indexes( + metastore: MetastoreServiceClient, + storage_resolver: StorageResolver, + data_dir_path: &std::path::Path, + config: MatureMergeConfig, + node_id: NodeId, +) -> anyhow::Result<()> { + let indexes_metadata = metastore + .list_indexes_metadata(ListIndexesMetadataRequest { + index_id_patterns: config.index_id_patterns.clone(), + }) + .await + .context("failed to list indexes")? + .deserialize_indexes_metadata() + .await + .context("failed to deserialize indexes metadata")?; + + info!( + num_indexes = indexes_metadata.len(), + "starting mature merge" + ); + + let semaphore = Arc::new(Semaphore::new(config.max_concurrent_merges)); + let metastore_ref = &metastore; + let storage_resolver_ref = &storage_resolver; + let config_ref = &config; + + if indexes_metadata + .iter() + .any(|m| !m.index_config.doc_mapping.tag_fields.is_empty()) + { + // with tags and doc mapping evolutions, we might have weird edge cases + // -> just refuse them for now + bail!("tags not supported in mature merges"); + } + + let results: Vec> = futures::stream::iter(indexes_metadata) + .map(|index_metadata| { + let node_id = node_id.clone(); + let semaphore = Arc::clone(&semaphore); + async move { + let now = OffsetDateTime::now_utc(); + merge_mature_single_index( + index_metadata, + metastore_ref, + storage_resolver_ref, + semaphore, + data_dir_path, + config_ref, + node_id, + now, + ) + .await + } + }) + .buffer_unordered(config.index_parallelism) + .collect() + .await; + + log_merge_results(results); + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use quickwit_common::temp_dir::TempDirectory; + use quickwit_config::ConfigFormat; + use quickwit_metastore::{ + IndexMetadata, IndexMetadataResponseExt, SplitMaturity, SplitMetadata, + UpdateIndexRequestExt, + }; + use quickwit_proto::metastore::{ + IndexMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, + MockMetastoreService, UpdateIndexRequest, + }; + use quickwit_proto::types::NodeId; + use quickwit_storage::RamStorage; + + use super::*; + use crate::TestSandbox; + + /// Tests the short-circuit path: when no merge operations are planned, + /// `run_mature_merges_for_index` returns 0 immediately without spawning any actors. + #[tokio::test] + async fn test_run_mature_merges_for_index_no_operations() -> anyhow::Result<()> { + let mock_metastore = MockMetastoreService::new(); + let storage = Arc::new(RamStorage::default()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); + let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index"); + let data_dir = TempDirectory::for_test(); + let node_id = NodeId::from("test-node"); + + let semaphore = Arc::new(Semaphore::new(2)); + let outcome = run_mature_merges_for_index( + &index_metadata, + vec![], + MetastoreServiceClient::from_mock(mock_metastore), + split_store, + semaphore, + data_dir.path(), + &MatureMergeConfig::default(), + node_id, + ) + .await?; + + assert_eq!(outcome.num_published_merges, 0); + assert_eq!(outcome.num_replaced_splits, 0); + Ok(()) + } + + /// Tests the full per index pipeline end-to-end with a single merge operation + #[tokio::test] + async fn test_run_mature_merges_for_index_merges_real_splits() -> anyhow::Result<()> { + let doc_mapping_yaml = r#" + field_mappings: + - name: body + type: text + - name: ts + type: datetime + input_formats: [unix_timestamp] + fast: true + timestamp_field: ts + "#; + let test_sandbox = + TestSandbox::create("test-index-mature2", doc_mapping_yaml, "", &["body"]).await?; + + // each add_documents() call produces 1 split + for i in 0..4u64 { + test_sandbox + .add_documents(std::iter::once( + serde_json::json!({"body": format!("doc{i}"), "ts": 1_631_072_713u64 + i}), + )) + .await?; + } + + let metastore = test_sandbox.metastore(); + let index_uid = test_sandbox.index_uid(); + + let split_metas: Vec = metastore + .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) + .await? + .collect_splits_metadata() + .await?; + assert_eq!(split_metas.len(), 4); + + let index_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await? + .deserialize_index_metadata()?; + + let merge_op = MergeOperation::new_merge_operation(split_metas); + let split_store = + IndexingSplitStore::create_without_local_store_for_test(test_sandbox.storage()); + let data_dir = TempDirectory::for_test(); + let semaphore = Arc::new(Semaphore::new(2)); + + let outcome = run_mature_merges_for_index( + &index_metadata, + vec![merge_op], + metastore.clone(), + split_store, + semaphore, + data_dir.path(), + &MatureMergeConfig::default(), + test_sandbox.node_id(), + ) + .await?; + + assert_eq!(outcome.num_published_merges, 1); + assert_eq!(outcome.num_replaced_splits, 4); + + // The 4 input splits are now MarkedForDeletion; 1 merged Published split should remain. + let published_after: Vec = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query( + &ListSplitsQuery::for_index(index_uid).with_split_state(SplitState::Published), + )?) + .await? + .collect_splits_metadata() + .await?; + assert_eq!(published_after.len(), 1); + assert_eq!(published_after[0].num_docs, 4); + assert_eq!(published_after[0].maturity, SplitMaturity::Mature); + assert_eq!( + published_after[0].time_range, + Some(1_631_072_713..=1_631_072_716) + ); + + test_sandbox.assert_quit().await; + Ok(()) + } + + #[tokio::test] + async fn test_merge_mature_single_index_schema_evolution() -> anyhow::Result<()> { + let doc_mapping_v1_yaml = r#" + field_mappings: + - name: ts + type: datetime + input_formats: [unix_timestamp] + fast: true + - name: label + type: text + fast: true + tokenizer: lowercase + timestamp_field: ts + "#; + let test_sandbox = + TestSandbox::create("test-index-schema-evo", doc_mapping_v1_yaml, "", &["label"]) + .await?; + + let base_time = 1_631_072_713i64; // Wednesday, September 8, 2021 at 3:45:13 AM UTC + + // create 3 splits with v1 mapping + for i in 0..3i64 { + test_sandbox + .add_documents(std::iter::once( + serde_json::json!({"label": format!("Doc{i}"), "ts": base_time + i}), + )) + .await?; + } + + let metastore = test_sandbox.metastore(); + let index_uid = test_sandbox.index_uid(); + + let v1_splits: Vec = metastore + .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) + .await? + .collect_splits_metadata() + .await?; + assert_eq!(v1_splits.len(), 3); + let v1_doc_mapping_uid = v1_splits[0].doc_mapping_uid; + + // Update the index config: change tokenizer to `default` and add a secondary timestamp. + let index_metadata_v1 = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await? + .deserialize_index_metadata()?; + let doc_mapping_v2 = ConfigFormat::Yaml.parse( + r#" + field_mappings: + - name: ts + type: datetime + input_formats: [unix_timestamp] + fast: true + - name: label + type: text + fast: true + tokenizer: default + - name: ts2 + type: datetime + input_formats: [unix_timestamp] + fast: true + timestamp_field: ts + secondary_timestamp_field: ts2 + "# + .as_bytes(), + )?; + let update_request = UpdateIndexRequest::try_from_updates( + index_uid.clone(), + &doc_mapping_v2, + &index_metadata_v1.index_config.indexing_settings, + &index_metadata_v1.index_config.ingest_settings, + &index_metadata_v1.index_config.search_settings, + &index_metadata_v1.index_config.retention_policy_opt, + )?; + metastore.update_index(update_request).await?; + + // create 3 more splits with v2 mapping + for i in 3..6i64 { + test_sandbox + .add_documents(std::iter::once(serde_json::json!({ + "label": format!("Doc{i}"), + "ts": base_time + i, + "ts2": base_time + i + 1000, + }))) + .await?; + } + + let all_splits: Vec = metastore + .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) + .await? + .collect_splits_metadata() + .await?; + assert_eq!(all_splits.len(), 6); + let v2_doc_mapping_uid = all_splits + .iter() + .find(|s| s.doc_mapping_uid != v1_doc_mapping_uid) + .unwrap() + .doc_mapping_uid; + assert_eq!( + all_splits + .iter() + .filter(|s| s.doc_mapping_uid == v1_doc_mapping_uid) + .count(), + 3 + ); + assert_eq!( + all_splits + .iter() + .filter(|s| s.doc_mapping_uid == v2_doc_mapping_uid) + .count(), + 3 + ); + + let index_metadata_v2 = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await? + .deserialize_index_metadata()?; + let data_dir = TempDirectory::for_test(); + let semaphore = Arc::new(Semaphore::new(2)); + // Splits have the default 48h maturation period. Pass a `now` far enough in the future + // so all splits (both v1 and v2) are mature at `now - MATURITY_BUFFER (6h)`. + let now = OffsetDateTime::now_utc() + time::Duration::days(3); + // Override min_merge_group_size to 2 so that 3-split groups qualify. + let config = MatureMergeConfig { + min_merge_group_size: 2, + ..MatureMergeConfig::default() + }; + + let summary = merge_mature_single_index( + index_metadata_v2, + &metastore, + &test_sandbox.storage_resolver(), + semaphore, + data_dir.path(), + &config, + test_sandbox.node_id(), + now, + ) + .await?; + + // Both the v1 and v2 groups (3 splits each, different doc_mapping_uid) get merged. + assert_eq!(summary.num_merges_planned, 2); + assert_eq!(summary.outcome.num_published_merges, 2); + assert_eq!(summary.outcome.num_replaced_splits, 6); + + let published_after: Vec = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query( + &ListSplitsQuery::for_index(index_uid).with_split_state(SplitState::Published), + )?) + .await? + .collect_splits_metadata() + .await?; + assert_eq!(published_after.len(), 2); + + // The merged v1 split preserves the original doc_mapping_uid, time range, and has no + // secondary_time_range because the v1 schema had no secondary timestamp field. + let merged_v1 = published_after + .iter() + .find(|s| s.doc_mapping_uid == v1_doc_mapping_uid) + .expect("merged v1 split must exist"); + assert_eq!(merged_v1.num_docs, 3); + assert_eq!(merged_v1.maturity, SplitMaturity::Mature); + assert_eq!(merged_v1.time_range, Some(base_time..=base_time + 2)); + assert_eq!(merged_v1.secondary_time_range, None); + + // The merged v2 split has the updated doc_mapping_uid and a secondary_time_range + // derived from the ts2 field. + let merged_v2 = published_after + .iter() + .find(|s| s.doc_mapping_uid == v2_doc_mapping_uid) + .expect("merged v2 split must exist"); + assert_eq!(merged_v2.num_docs, 3); + assert_eq!(merged_v2.maturity, SplitMaturity::Mature); + assert_eq!(merged_v2.time_range, Some(base_time + 3..=base_time + 5)); + assert_eq!( + merged_v2.secondary_time_range, + Some(base_time + 1003..=base_time + 1005) + ); + + test_sandbox.assert_quit().await; + Ok(()) + } +} diff --git a/quickwit/quickwit-indexing/src/mature_merge_plan.rs b/quickwit/quickwit-indexing/src/mature_merge_plan.rs new file mode 100644 index 00000000000..92a15a2fed9 --- /dev/null +++ b/quickwit/quickwit-indexing/src/mature_merge_plan.rs @@ -0,0 +1,459 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::time::Duration; + +use quickwit_config::IndexConfig; +use quickwit_metastore::SplitMetadata; +use time::OffsetDateTime; + +use crate::mature_merge::MatureMergeConfig; +use crate::merge_policy::MergeOperation; + +pub const SECS_PER_DAY: i64 = 60 * 60 * 24; + +/// Wait a couple of hours after the split got mature to be extra sure no merge +/// process is still running on it. +pub const MATURITY_BUFFER: Duration = Duration::from_hours(6); + +/// Computes the earliest UTC-day midnight (seconds since epoch) that is safe to merge, +/// given the index's retention policy and the current time. +fn retention_safety_cutoff_secs( + index_config: &IndexConfig, + now_secs: i64, + config: &MatureMergeConfig, +) -> Option { + let retention_policy = index_config.retention_policy_opt.as_ref()?; + let period = retention_policy.retention_period().ok()?; + let retention_safety_buffer = Duration::from_hours(config.retention_safety_buffer_days * 24); + if period <= retention_safety_buffer { + // No safe window: exclude every split by returning a cutoff in the far future. + return Some(i64::MAX); + } + let cutoff_raw = now_secs - period.as_secs() as i64 + retention_safety_buffer.as_secs() as i64; + // Round up to the next day boundary so we never partially exclude a day bucket. + Some((cutoff_raw / SECS_PER_DAY + 1) * SECS_PER_DAY) +} + +/// Converts a single day-bucket group of eligible splits into one or more balanced +/// [`MergeOperation`]s respecting constraints. +fn plan_operations_for_group( + mut group_splits: Vec, + config: &MatureMergeConfig, +) -> Vec { + if group_splits.len() < config.min_merge_group_size { + return Vec::new(); + } + // Sort ascending by end time so each sub-operation covers the most compact range. + group_splits.sort_by_key(|s| s.time_range.as_ref().map(|r| *r.end()).unwrap_or(0)); + + let n = group_splits.len(); + let total_docs: usize = group_splits.iter().map(|s| s.num_docs).sum(); + + // Minimum number of balanced operations needed to respect both per-operation limits. + let k = n + .div_ceil(config.max_merge_group_size) + .max(total_docs.div_ceil(config.split_target_num_docs)) + .max(1); + + // Divide into k balanced chunks (first chunks are ≥ last chunks by at most 1 split). + let chunk_size = n.div_ceil(k); + group_splits + .chunks(chunk_size) + .filter(|chunk| chunk.len() >= config.min_merge_group_size) + .map(|chunk| MergeOperation::new_merge_operation(chunk.to_vec())) + .collect() +} + +/// Group by UTC day (floored to midnight in seconds) of the split's time range, +/// and returns one or more [`MergeOperation`]s per group that meets the size +/// threshold. +/// +/// Rules: +/// - Splits without a `time_range` are skipped (cannot assign a day). +/// - A split is only assigned to a bucket when *both* `time_range.start()` and `time_range.end()` +/// fall on the same UTC day (i.e., the split does not span midnight). +/// - Immature splits are excluded. +/// - Splits whose `time_range.end()` falls within the retention safety buffer are excluded. +/// +/// Important: This plan merges splits accross sources. It can be problematic if +/// the IndexingSettings are different (e.g different maturation period), which +/// was made possible on Kafka sources by specifying an override in the +/// client_params. +pub fn plan_merge_operations_for_index( + index_config: &IndexConfig, + splits: Vec, + now: OffsetDateTime, + config: &MatureMergeConfig, +) -> Vec { + let now_secs = now.unix_timestamp(); + + let earliest_cutoff_timestamp = retention_safety_cutoff_secs(index_config, now_secs, config); + + // Key: (partition_id, doc_mapping_uid_string, day_bucket_seconds, secondary_day_opt) + let mut groups: HashMap<(u64, String, i64, Option), Vec> = HashMap::new(); + + for split in splits { + // Only splits that have been mature for a while + if !split.is_mature(now - MATURITY_BUFFER) { + continue; + } + + // Enforce the max size for splits to be considered for merging. + if split.num_docs > config.input_split_max_num_docs { + continue; + } + + // The timestamp field is required + let Some(ref time_range) = split.time_range else { + continue; + }; + + let start_day = time_range.start() / SECS_PER_DAY; + let end_day = time_range.end() / SECS_PER_DAY; + + // also group on secondary time range to make sure retention can still be applied + let secondary_day_opt = split + .secondary_time_range + .as_ref() + // In the nominal case, the secondary time (ingest time) is only + // slightly greater than the primary time (event time). Using + // `start()` here decreases the chances of further fragmenting the + // group at the day limits. + .map(|r| r.start() / SECS_PER_DAY); + + // Both endpoints must fall on the same UTC day. + if start_day != end_day { + continue; + } + + // Check that we are not too close to the retention cutoff. + if let Some(cutoff) = earliest_cutoff_timestamp + && *time_range.end() < cutoff + { + continue; + } + + let key = ( + split.partition_id, + split.doc_mapping_uid.to_string(), + start_day, + secondary_day_opt, + ); + groups.entry(key).or_default().push(split); + } + + let mut operations = Vec::new(); + for (_key, group_splits) in groups { + operations.extend(plan_operations_for_group(group_splits, config)); + } + operations +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_config::{IndexConfig, RetentionPolicy}; + use quickwit_metastore::{SplitMaturity, SplitMetadata}; + use quickwit_proto::types::{DocMappingUid, IndexUid}; + use time::OffsetDateTime; + + use super::*; + + /// Builds a mature [`SplitMetadata`] for use in tests. + /// + /// - `day_bucket`: UTC day expressed as seconds-since-epoch (midnight). For example `day_bucket + /// = 0` means 1970-01-01, `day_bucket = SECS_PER_DAY` means 1970-01-02. + fn mature_split_for_test( + split_id: &str, + index_uid: &IndexUid, + partition_id: u64, + doc_mapping_uid: DocMappingUid, + num_docs: usize, + day_bucket: i64, + ) -> SplitMetadata { + SplitMetadata { + split_id: split_id.to_string(), + index_uid: index_uid.clone(), + partition_id, + num_docs, + doc_mapping_uid, + // Both endpoints on the same UTC day — the split spans one hour. + time_range: Some(day_bucket..=(day_bucket + 3600)), + maturity: SplitMaturity::Mature, + ..Default::default() + } + } + + fn index_config_no_retention() -> IndexConfig { + IndexConfig::for_test("test-index", "s3://test-bucket/test-index") + } + + fn index_config_with_retention(period: &str) -> IndexConfig { + let mut config = index_config_no_retention(); + config.retention_policy_opt = Some(RetentionPolicy { + retention_period: period.to_string(), + evaluation_schedule: "daily".to_string(), + timestamp_type: Default::default(), + }); + config + } + + // UTC day 0 = 1970-01-01. Use a recent-ish day to avoid the retention buffer. + // We use day 20000 (approx 2024-10) so splits are "recent" relative to a "now" we control. + const RECENT_DAY: i64 = 20_000 * SECS_PER_DAY; + + fn now_well_after_recent_day() -> OffsetDateTime { + // 1 day after the splits' day — they are mature but not in a retention buffer. + OffsetDateTime::from_unix_timestamp(RECENT_DAY + SECS_PER_DAY + 1).unwrap() + } + + #[test] + fn test_plan_basic() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + let splits: Vec = (0..10) + .map(|i| { + mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ) + }) + .collect(); + + let operations = plan_merge_operations_for_index( + &index_config_no_retention(), + splits, + now_well_after_recent_day(), + &MatureMergeConfig::default(), + ); + + assert_eq!(operations.len(), 1); + assert_eq!(operations[0].splits.len(), 10); + } + + #[test] + fn test_plan_below_threshold() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + // Only 4 splits — below the min_merge_group_size (5). + let splits: Vec = (0..4) + .map(|i| { + mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ) + }) + .collect(); + + let operations = plan_merge_operations_for_index( + &index_config_no_retention(), + splits, + now_well_after_recent_day(), + &MatureMergeConfig { + min_merge_group_size: 5, + ..Default::default() + }, + ); + + assert!(operations.is_empty(), "expected no operations for 4 splits"); + } + + #[test] + fn test_plan_immature_splits_excluded() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + let now = now_well_after_recent_day(); + let now_ts = now.unix_timestamp(); + + // All splits are immature (maturation period far in the future). + let splits: Vec = (0..10) + .map(|i| { + let mut split = mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ); + split.maturity = SplitMaturity::Immature { + maturation_period: Duration::from_secs(999_999), + }; + // Make sure create_timestamp is recent so the split is truly immature. + split.create_timestamp = now_ts; + split + }) + .collect(); + + let operations = plan_merge_operations_for_index( + &index_config_no_retention(), + splits, + now, + &MatureMergeConfig::default(), + ); + + assert!(operations.is_empty(), "immature splits should be excluded"); + } + + #[test] + fn test_plan_multiday_split_skipped() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + + // 10 splits, but each one spans midnight (start on day N, end on day N+1). + let splits: Vec = (0..10) + .map(|i| { + let mut split = mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ); + // Extend time_range to cross midnight. + split.time_range = Some(RECENT_DAY - 3600..=RECENT_DAY + 3600); + split + }) + .collect(); + + let operations = plan_merge_operations_for_index( + &index_config_no_retention(), + splits, + now_well_after_recent_day(), + &MatureMergeConfig::default(), + ); + + assert!(operations.is_empty(), "multi-day splits should be skipped"); + } + + #[test] + fn test_plan_retention_safety_buffer() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + + // Retention period = 90 days. Safety buffer = 30 days. + // Splits must have time_range.end >= now - 90d + 30d = now - 60d. + // We put splits at RECENT_DAY but set "now" to be RECENT_DAY + 91 days. + // Then: cutoff_raw = (RECENT_DAY + 91d) - 90d + 30d = RECENT_DAY + 31d + // cutoff = RECENT_DAY + 32d (rounded up to next day boundary) + // Because RECENT_DAY + 3600 < cutoff, splits should be excluded. + let now_ts = RECENT_DAY + 91 * SECS_PER_DAY; + let now = OffsetDateTime::from_unix_timestamp(now_ts).unwrap(); + + let splits: Vec = (0..10) + .map(|i| { + mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ) + }) + .collect(); + + let config = index_config_with_retention("90 days"); + + let merge_config = MatureMergeConfig { + retention_safety_buffer_days: 30, + ..MatureMergeConfig::default() + }; + let operations = plan_merge_operations_for_index(&config, splits, now, &merge_config); + + assert!( + operations.is_empty(), + "splits within retention safety buffer should be excluded" + ); + } + + #[test] + fn test_plan_retention_period_too_short_skipped() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + + let splits: Vec = (0..10) + .map(|i| { + mature_split_for_test( + &format!("split-{i}"), + &index_uid, + 1, + doc_mapping_uid, + 100, + RECENT_DAY, + ) + }) + .collect(); + + // Retention period of 3 days is <= retention_safety_buffer_days (default 5 days) + // so the index should be skipped entirely. + let config = index_config_with_retention("3 days"); + + let operations = plan_merge_operations_for_index( + &config, + splits, + now_well_after_recent_day(), + &MatureMergeConfig::default(), + ); + + assert!( + operations.is_empty(), + "index with short retention should produce no operations" + ); + } + + #[test] + fn test_plan_different_partitions_grouped_separately() { + let index_uid = IndexUid::for_test("test-index", 0); + let doc_mapping_uid = DocMappingUid::random(); + + // 6 splits per partition, two partitions => 2 separate merge operations. + let splits: Vec = (0..12) + .map(|i| { + mature_split_for_test( + &format!("split-{i}"), + &index_uid, + i as u64 / 6, // partition 0 for i in 0..6, partition 1 for i in 6..12 + doc_mapping_uid, + 100, + RECENT_DAY, + ) + }) + .collect(); + + let mut operations = plan_merge_operations_for_index( + &index_config_no_retention(), + splits, + now_well_after_recent_day(), + &MatureMergeConfig::default(), + ); + operations.sort_by_key(|op| op.splits[0].partition_id); + + assert_eq!(operations.len(), 2); + assert!(operations[0].splits.iter().all(|s| s.partition_id == 0)); + assert!(operations[1].splits.iter().all(|s| s.partition_id == 1)); + } +}