Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use quickwit_config::{
use quickwit_index_management::{IndexService, clear_cache_directory};
use quickwit_indexing::IndexingPipeline;
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::mature_merge::{MatureMergeConfig, merge_mature_all_indexes};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand Down Expand Up @@ -163,6 +164,56 @@ pub fn build_tool_command() -> Command {
.required(true),
])
)
.subcommand(
Command::new("merge-mature")
.display_order(10)
.about("Merges mature splits across all indexes and nodes.")
.long_about(
"Scans indexes for merge opportunities in mature Published splits. Considers \
opportunities across all origin nodes and sources. Runs once and exits."
)
.args(&[
arg!(--"dry-run"
"Prints the planned merge operations without executing them.")
.required(false),
arg!(--"max-concurrent-merges" <MAX_CONCURRENT_MERGES>
"Maximum number of merges to run concurrently (default: 10).")
.display_order(1)
.required(false),
arg!(--"retention-safety-buffer-days" <RETENTION_SAFETY_BUFFER_DAYS>
"Splits within this many days of the retention cutoff are excluded (default: 5).")
.display_order(2)
.required(false),
arg!(--"min-merge-group-size" <MIN_MERGE_GROUP_SIZE>
"Minimum number of splits in a group to trigger a merge (default: 5).")
.display_order(3)
.required(false),
arg!(--"input-split-max-num-docs" <INPUT_SPLIT_MAX_NUM_DOCS>
"Maximum number of docs in a split for it to be eligible (default: 10_000).")
.display_order(4)
.required(false),
arg!(--"max-merge-group-size" <MAX_MERGE_GROUP_SIZE>
"Maximum number of splits per merge operation (default: 100).")
.display_order(5)
.required(false),
arg!(--"split-target-num-docs" <SPLIT_TARGET_NUM_DOCS>
"Maximum total docs per merge operation (default: 5_000_000).")
.display_order(6)
.required(false),
Comment thread
Darkheir marked this conversation as resolved.
arg!(--"index-parallelism" <INDEX_PARALLELISM>
"Number of indexes processed concurrently (default: 50).")
.display_order(7)
.required(false),
arg!(--"index-id-patterns" <INDEX_ID_PATTERNS>
"Comma-separated list of index ID patterns to include (default: '*').")
.display_order(8)
.required(false),
arg!(--"metrics"
"Expose Prometheus metrics on the REST listen address during the run.")
.display_order(9)
.required(false),
])
)
.arg_required_else_help(true)
}

Expand Down Expand Up @@ -207,6 +258,13 @@ pub struct MergeArgs {
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct MatureMergeArgs {
pub config_uri: Uri,
pub merge_config: MatureMergeConfig,
pub serve_metrics: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ExtractSplitArgs {
pub config_uri: Uri,
Expand All @@ -221,6 +279,7 @@ pub enum ToolCliCommand {
LocalIngest(LocalIngestDocsArgs),
LocalSearch(LocalSearchArgs),
Merge(MergeArgs),
MatureMerge(MatureMergeArgs),
ExtractSplit(ExtractSplitArgs),
}

Expand All @@ -234,6 +293,7 @@ impl ToolCliCommand {
"local-ingest" => Self::parse_local_ingest_args(submatches),
"local-search" => Self::parse_local_search_args(submatches),
"merge" => Self::parse_merge_args(submatches),
"merge-mature" => Self::parse_mature_merge_args(submatches),
"extract-split" => Self::parse_extract_split_args(submatches),
_ => bail!("unknown tool subcommand `{subcommand}`"),
}
Expand Down Expand Up @@ -385,12 +445,84 @@ impl ToolCliCommand {
}))
}

fn parse_mature_merge_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
.remove_one::<String>("config")
.map(|uri_str| Uri::from_str(&uri_str))
.expect("`config` should be a required arg.")?;
let defaults = MatureMergeConfig::default();
let dry_run = matches.get_flag("dry-run");
let max_concurrent_merges = matches
.remove_one::<String>("max-concurrent-merges")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.max_concurrent_merges);
let retention_safety_buffer_days = matches
.remove_one::<String>("retention-safety-buffer-days")
.map(|s| s.parse::<u64>())
.transpose()?
.unwrap_or(defaults.retention_safety_buffer_days);
let min_merge_group_size = matches
.remove_one::<String>("min-merge-group-size")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.min_merge_group_size);
let input_split_max_num_docs = matches
.remove_one::<String>("input-split-max-num-docs")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.input_split_max_num_docs);
let max_merge_group_size = matches
.remove_one::<String>("max-merge-group-size")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.max_merge_group_size);
let split_target_num_docs = matches
.remove_one::<String>("split-target-num-docs")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.split_target_num_docs);
let index_parallelism = matches
.remove_one::<String>("index-parallelism")
.map(|s| s.parse::<usize>())
.transpose()?
.unwrap_or(defaults.index_parallelism);
Comment thread
rdettai-sk marked this conversation as resolved.
let index_id_patterns = matches
.remove_one::<String>("index-id-patterns")
.map(|s| s.split(',').map(|p| p.trim().to_string()).collect())
.unwrap_or(defaults.index_id_patterns);
Comment thread
rdettai-sk marked this conversation as resolved.
let serve_metrics = matches.get_flag("metrics");

if max_concurrent_merges == 0 {
bail!("`max-concurrent-merges` must be greater than or equal to 1.");
}
if index_parallelism == 0 {
bail!("`index-parallelism` must be greater than or equal to 1.");
}
Ok(Self::MatureMerge(MatureMergeArgs {
config_uri,
serve_metrics,
merge_config: MatureMergeConfig {
dry_run,
max_concurrent_merges,
retention_safety_buffer_days,
min_merge_group_size,
input_split_max_num_docs,
max_merge_group_size,
split_target_num_docs,
index_parallelism,
index_id_patterns,
},
}))
}

pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::GarbageCollect(args) => garbage_collect_index_cli(args).await,
Self::LocalIngest(args) => local_ingest_docs_cli(args).await,
Self::LocalSearch(args) => local_search_cli(args).await,
Self::Merge(args) => merge_cli(args).await,
Self::MatureMerge(args) => merge_mature_cli(args).await,
Self::ExtractSplit(args) => extract_split_cli(args).await,
}
}
Expand Down Expand Up @@ -651,6 +783,43 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
Ok(())
}

pub async fn merge_mature_cli(args: MatureMergeArgs) -> anyhow::Result<()> {
debug!(args=?args, "merge-mature");
info!(merge_config=?args.merge_config, "merge-mature configuration");
println!("❯ Scanning all indexes for mature merge opportunities...");
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let runtimes_config = RuntimesConfig::default();
start_actor_runtimes(
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;

if args.serve_metrics {
let metrics_addr = config.rest_config.listen_addr;
tokio::spawn(serve_metrics(metrics_addr));
}

merge_mature_all_indexes(
metastore,
storage_resolver,
&config.data_dir_path,
args.merge_config.clone(),
config.node_id,
)
.await?;

if !args.merge_config.dry_run {
info!("mature splits successfully merged, waiting for explicit termination signal");
tokio::time::sleep(Duration::MAX).await;
}

Ok(())
}

pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> {
debug!(args=?args, "garbage-collect-index");
println!("❯ Garbage collecting index...");
Expand Down Expand Up @@ -955,3 +1124,48 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {

Ok(cluster)
}

/// A shortcut to expose the metrics without loading the whole quickwit_serve
/// machinery.
async fn serve_metrics(addr: std::net::SocketAddr) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let listener = match tokio::net::TcpListener::bind(addr).await {
Ok(l) => l,
Err(err) => {
tracing::warn!("metrics server could not bind to {addr}: {err}");
return;
}
};
tracing::info!("metrics server listening on http://{addr}/metrics");
loop {
let Ok((mut stream, _peer)) = listener.accept().await else {
continue;
};
tokio::spawn(async move {
let mut buf = [0u8; 4096];
let n = match stream.read(&mut buf).await {
Ok(n) => n,
Err(_) => return,
};
let request = std::str::from_utf8(&buf[..n]).unwrap_or("");
let is_metrics = request.starts_with("GET /metrics");
let (status, body) = if is_metrics {
match quickwit_common::metrics::metrics_text_payload() {
Ok(payload) => ("200 OK", payload),
Err(e) => {
tracing::error!("failed to encode prometheus metrics: {e}");
("500 Internal Server Error", String::new())
}
}
} else {
("404 Not Found", String::new())
};
let response = format!(
"HTTP/1.1 {status}\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: \
{}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
}
Loading
Loading