diff --git a/Cargo.toml b/Cargo.toml index 69ff0c5..adc3180 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "solrcopy" -version = "0.9.0" +version = "0.9.1" edition = "2024" rust-version = "1.88" diff --git a/README.md b/README.md index ba3958d..eb2e690 100644 --- a/README.md +++ b/README.md @@ -42,14 +42,21 @@ Extracting and updating documents in huge cores can be challenging. It can take Bellow some tricks for dealing with such cores: -1. For reducing time, you can use the switches `--readers` and `--writers` for executing operations in parallel. +1. For reducing time, you can use the switches `--readers` and `--writers` for executing operations in parallel. Make sure the sum of this parameters fits the number of threads that your processor cores can handle. 2. When the number of docs to extract is huge, `backup` subcommand tend to slow as times goes and eventually fails. This is because Solr is suffers to get docs batches with hight skip/start parameters. For dealing with this: 1. Use the parameters `--iterate-by`n `between` and `--step`for iterating through parameter `--query` with variables `{begin}` and `{end}`. 2. This way it will iterate and restrict by hour, day, range the docs being downloaded. 3. For example: `--query 'date:[{begin} TO {end}]' --iterate-by day --between '2020-04-01' '2020-04-30T23:59:59'` + 4. Keep the number of iterations low by specifying the parameters `--step` and `--num-docs` to adequated values. As the process will run in two nested loops, the amount of time/effort will raise if the number of iterations increases. 3. Use the parameter `--param shards=shard1` for copying by each shard by name in `backkup`subcommand. 4. Use the parameter `--delay` for avoiding to overload the Solr server. +### Non-Stored Fields + + When you're backing up the index with `solrcopy`, this can result in a lossy process. In many cases, a core doesn't store fields that are only going to use for searching - and not for displaying. When backuping with `solrcopy`, you'll lose ths information. You will not be able to restore the index so that it works the same as before and the data will be lost for good if the index disappears. + +In this case, if you need the value of the non-stored fields the proper way is to use the replication handler or the built-in backup feature in cloud mode. + ### Command Line Arguments #### solrcopy commands diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d394c8b..995de47 100755 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -9,7 +9,9 @@ services: hostname: solrhost environment: - SOLR_IMAGE_TAG=${TAG:-latest} + - SOLR_CREATE_CORE=${CORE:-example} - SOLR_RUN_MODE=${MODE:-testing} + - JAVA_TOOL_OPTIONS="--sun-misc-unsafe-memory-access=allow" ports: - "8983:8983" volumes: @@ -31,6 +33,9 @@ services: retries: 1 interval: 60s configs: + - source: bashrc.sh + target: /home/solr/.bashrc + mode: 0644 - source: solr-setup-precreate.sh target: /opt/solr/docker/scripts/solr-setup-precreate mode: 0755 @@ -67,6 +72,26 @@ volumes: name: configuration configs: + bashrc.sh: + content: | + # ~/.bashrc: executed by bash(1) for non-login shells. + # see /usr/share/doc/bash/examples/startup-files (in the package bash-doc) + # for examples + + # If not running interactively, don't do anything + case $- in + *i*) ;; + *) return;; + esac + + $ Some useful aliases + alias ls='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto' + alias ll='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto --almost-all -l' + alias la='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto --almost-all' + alias l='ls --escape --dereference-command-line --human-readable --time-style=iso --no-group --color=auto -CF' + # END OF SCRIPT # + + solr-setup-precreate.sh: content: | #!/bin/bash @@ -176,7 +201,7 @@ configs: case "$${SOLR_RUN_MODE:-testing}" in standalone) SOLR_RUN_COMMAND="solr-foreground --user-managed";; cloud) SOLR_RUN_COMMAND="solr-foreground";; - precreate) SOLR_RUN_COMMAND="solr-precreate";; + precreate) SOLR_RUN_COMMAND="solr-precreate $${SOLR_CREATE_CORE:-}";; demo) SOLR_RUN_COMMAND="solr-demo";; testing) SOLR_RUN_COMMAND="solr-runas-user-managed";; *) echo "ERROR: Invalid Solr run mode: $${SOLR_RUN_MODE:-}." diff --git a/src/args.rs b/src/args.rs index c0d1b0d..a663c57 100644 --- a/src/args.rs +++ b/src/args.rs @@ -276,7 +276,13 @@ pub(crate) struct ParallelArgs { /// Extra parameter for Solr Update Handler. /// See: - #[arg(short, long, display_order = 60, value_name = "useParams=mypars")] + #[arg( + short, + long, + display_order = 60, + value_name = "p1=value1&p2=value2", + default_value = "echoParams=none" + )] pub params: Option, /// How many times should continue on source document errors diff --git a/src/backup.rs b/src/backup.rs index d028fe2..ba4e9e3 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -10,9 +10,8 @@ use super::{ steps::{Requests, Slices}, }; use crossbeam_channel::{Receiver, Sender, bounded}; -use crossbeam_utils::thread; -use log::{debug, error, info, trace}; -use std::sync::{Arc, atomic::AtomicBool}; +use log::{debug, error, info}; +use std::thread; use std::{path::PathBuf, time::Instant}; pub(crate) fn backup_main(params: &Backup) -> BoxedError { @@ -36,7 +35,6 @@ pub(crate) fn backup_main(params: &Backup) -> BoxedError { params.options.core ); - let ctrl_c = monitor_term_sinal(); let started = Instant::now(); thread::scope(|pool| { @@ -49,32 +47,45 @@ pub(crate) fn backup_main(params: &Backup) -> BoxedError { let (sender, receiver) = bounded::(writers_channel.to_usize()); let (progress, reporter) = bounded::(transfer.writers.to_usize()); - pool.spawn(|_| { - debug!("Started generator thread"); - start_querying_core(params, &schema, generator, &ctrl_c); - debug!("Finished generator thread"); - }); + let gen_handle = thread::Builder::new() + .name("Generator".to_string()) + .spawn_scoped(pool, || { + start_querying_core(params, &schema, generator); + }) + .unwrap(); - start_solr_readers(pool, params, sender, sequence, num_found); + let reader_handles = start_solr_readers(pool, params, sender, sequence); - start_archive_writers(pool, params, receiver, progress, num_retrieve); + let writer_handles = start_archive_writers(pool, params, receiver, progress, num_retrieve); - retrieved = forall_progress(reporter, num_retrieve, params.options.is_quiet()); - }) - .unwrap(); + let bar_handle = thread::Builder::new() + .name("Generator".to_string()) + .spawn_scoped(pool, || { + retrieved = forall_progress(reporter, num_retrieve, params.options.is_quiet()); + }) + .unwrap(); - // TODO: handle the finished thread with join + let mut handles = vec![]; + handles.push(gen_handle); + handles.extend(reader_handles); + handles.extend(writer_handles); + handles.push(bar_handle); + for handle in handles { + handle.join().unwrap(); + } + }); - finish_writing(ctrl_c, started, num_retrieve, retrieved, params.transfer.delay_after) + finish_progress(started, num_retrieve, retrieved, params.transfer.delay_after) } -fn start_solr_readers( - pool: &thread::Scope<'_>, params: &Backup, sender: Sender, sequence: Receiver, - num_found: u64, -) { +fn start_solr_readers<'scope>( + pool: &'scope thread::Scope<'scope, '_>, params: &Backup, sender: Sender, + sequence: Receiver, +) -> Vec> { let merr = params.transfer.max_errors; let delay = params.transfer.delay_per_request; - let must_match = if params.workaround_shards > 0 { num_found } else { 0 }; + + let mut handles = vec![]; for ir in 0..params.transfer.readers { let producer = sender.clone(); @@ -83,27 +94,31 @@ fn start_solr_readers( let reader = ir; let thread_name = format!("Reader_{}", reader); - pool.builder() + let handle = thread::Builder::new() .name(thread_name) - .spawn(move |_| { + .spawn_scoped(pool, move || { debug!("Started reader #{}", reader); - start_retrieving_docs(reader, iterator, producer, must_match, merr, delay); + start_retrieving_docs(reader, iterator, producer, merr, delay); debug!("Finished reader #{}", reader); }) .unwrap(); + handles.push(handle); } drop(sequence); drop(sender); + handles } -fn start_archive_writers( - pool: &thread::Scope<'_>, params: &Backup, receiver: Receiver, +fn start_archive_writers<'scope>( + pool: &'scope thread::Scope<'scope, '_>, params: &Backup, receiver: Receiver, progress: Sender, num_retrieve: u64, -) { +) -> Vec> { let output_pat = params.get_archive_pattern(num_retrieve); let max = params.archive_files; let comp = params.archive_compression; + let mut handles = vec![]; + for iw in 0..params.transfer.writers { let consumer = receiver.clone(); let updater = progress.clone(); @@ -113,39 +128,41 @@ fn start_archive_writers( let writer = iw; let thread_name = format!("Writer_{}", writer); - pool.builder() + let handle = thread::Builder::new() .name(thread_name) - .spawn(move |_| { + .spawn_scoped(pool, move || { debug!("Started writer #{}", writer); start_storing_docs(writer, dir, name, comp, max, consumer, updater); debug!("Finished writer #{}", writer); }) .unwrap(); + handles.push(handle); } drop(receiver); drop(progress); + handles } -fn finish_writing( - ctrl_c: Arc, started: Instant, num_retrieve: u64, retrieved: u64, delay_after: u64, +fn finish_progress( + started: Instant, num_retrieve: u64, retrieved: u64, delay_after: u64, ) -> BoxedError { + let ctrl_c = monitor_term_sinal(); if ctrl_c.aborted() { raise("# Execution aborted by user!") } else { - let (r, n, s) = (retrieved, num_retrieve, started.elapsed()); - info!("Downloaded {} of {} documents in {:?}.", r, n, s); if retrieved > 0 { - wait_with_progress(delay_after, "Exporting documents to archives..."); + wait_with_progress(delay_after, "Finished exporting documents to archives..."); } + let (r, n, s) = (retrieved, num_retrieve, started.elapsed()); + info!("Downloaded {} of {} documents in {:?}.", r, n, s); Ok(()) } } // region Channels -fn start_querying_core( - params: &Backup, schema: &SolrCore, generator: Sender, ctrl_c: &Arc, -) { +fn start_querying_core(params: &Backup, schema: &SolrCore, generator: Sender) { + let ctrl_c = monitor_term_sinal(); let core_fields = params.merge_core_fields(schema); let slices: Slices = params.get_slices(); @@ -157,11 +174,13 @@ fn start_querying_core( if num_found == 0 { continue; } + let expected = if params.workaround_shards > 0 { num_found } else { 0 }; let num_retrieve = params.get_docs_to_retrieve(num_found); let requests: Requests = params.get_requests_for_range( retrieved, num_retrieve, &core_fields, + expected, &range.begin, &range.end, ); @@ -177,8 +196,7 @@ fn start_querying_core( } fn start_retrieving_docs( - reader: u64, iterator: Receiver, producer: Sender, must_match: u64, - max_errors: u64, delay: u64, + reader: u64, iterator: Receiver, producer: Sender, max_errors: u64, delay: u64, ) { let ctrl_c = monitor_term_sinal(); let mut error_count = 0; @@ -190,7 +208,7 @@ fn start_retrieving_docs( break; } let failed = match received { - Ok(step) => retrieve_docs_from_solr(reader, &producer, step, &mut client, must_match), + Ok(step) => retrieve_docs_from_solr(reader, &producer, step, &mut client), Err(_) => true, }; if failed { @@ -210,10 +228,10 @@ fn start_retrieving_docs( } fn retrieve_docs_from_solr( - reader: u64, producer: &Sender, step: Step, client: &mut SolrClient, must_match: u64, + reader: u64, producer: &Sender, step: Step, client: &mut SolrClient, ) -> bool { let query_url = step.url.as_str(); - let response = fetch_docs_from_solr(reader, client, query_url, must_match); + let response = fetch_docs_from_solr(reader, client, query_url, step.expected); match response { Err(_) => true, Ok(content) => { @@ -234,7 +252,7 @@ fn retrieve_docs_from_solr( } fn fetch_docs_from_solr( - reader: u64, client: &mut SolrClient, query_url: &str, must_match: u64, + reader: u64, client: &mut SolrClient, query_url: &str, expected: u64, ) -> Result { let mut times = 0; loop { @@ -245,11 +263,14 @@ fn fetch_docs_from_solr( return Err(()); } Ok(content) => { - if must_match > 0 { + if expected > 0 { match SolrCore::parse_num_found(&content) { Ok(num_found) => { - trace!("#{} got num_found {} not {}", times, num_found, must_match); - if must_match != num_found.to_u64() && times < 13 { + if expected != num_found.to_u64() && times < 13 { + debug!( + "#{} got num_found {} but expected {}", + times, num_found, expected + ); times += 1; wait(times); continue; diff --git a/src/models.rs b/src/models.rs index cd96083..16dea31 100644 --- a/src/models.rs +++ b/src/models.rs @@ -11,6 +11,7 @@ pub(crate) struct Documents { #[derive(Debug)] pub(crate) struct Step { pub curr: u64, + pub expected: u64, pub url: String, } diff --git a/src/restore.rs b/src/restore.rs index 6e34d4d..30d0c04 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -8,12 +8,12 @@ use super::{ state::*, }; use crossbeam_channel::{Receiver, Sender, bounded}; -use crossbeam_utils::thread; use log::{debug, error, info, trace}; use std::sync::{ Arc, atomic::{AtomicBool, AtomicU64, Ordering}, }; +use std::thread; use std::{path::Path, path::PathBuf, time::Instant}; pub(crate) fn restore_main(params: &Restore) -> BoxedError { @@ -37,11 +37,12 @@ pub(crate) fn restore_main(params: &Restore) -> BoxedError { core ); - wait_with_progress( - params.transfer.delay_before, - &format!("Starting restore for core {}...", core), - ); - + if params.options.is_quiet() { + wait_with_progress( + params.transfer.delay_before, + &format!("Starting restore for core {}...", core), + ); + } pre_post_processing(params, false)?; let started = Instant::now(); @@ -76,30 +77,51 @@ fn unzip_archives_and_send(params: &Restore, found: &[PathBuf]) -> BoxedResult(writers_channel.to_usize()); let (progress, reporter) = bounded::(transfer.writers.to_usize()); - pool.spawn(move |_| { - debug!("Started generator thread"); - start_listing_archives(found, generator); - debug!("Finished generator thread"); - }); + let scan_handle = thread::Builder::new() + .name("Scanner".to_string()) + .spawn_scoped(pool, || { + start_listing_archives(found, generator); + }) + .unwrap(); - start_archive_readers(pool, transfer, sequence, sender); + let reader_handles = start_archive_readers(pool, transfer, sequence, sender); let update_hadler_url = params.get_update_url(); debug!("Solr Update Handler: {}", update_hadler_url); - start_solr_writers(pool, transfer, receiver, progress, update_hadler_url); + let writer_handles = + start_archive_writers(pool, transfer, receiver, progress, update_hadler_url); + + let bar_handle = thread::Builder::new() + .name("Generator".to_string()) + .spawn_scoped(pool, || { + updated = foreach_progress(reporter, doc_count, params.options.is_quiet()); + }) + .unwrap(); + + let mut handles = vec![]; + handles.push(scan_handle); + handles.extend(reader_handles); + handles.extend(writer_handles); + handles.push(bar_handle); - updated = foreach_progress(reporter, doc_count, params.options.is_quiet()); - }) - .unwrap(); + for handle in handles { + handle.join().unwrap(); + } + }); + if updated > 0 && !params.no_final_commit { + crate::commit::commit_main(¶ms.options.to_command())?; + } - finish_sending(params, updated) + finish_progress(updated) } fn start_archive_readers<'scope>( - pool: &thread::Scope<'scope>, transfer: &ParallelArgs, sequence: Receiver<&'scope Path>, - sender: Sender, -) { + pool: &'scope thread::Scope<'scope, '_>, transfer: &ParallelArgs, + sequence: Receiver<&'scope Path>, sender: Sender, +) -> Vec> { + let mut handles = vec![]; + for ir in 0..transfer.readers { let producer = sender.clone(); let iterator = sequence.clone(); @@ -107,27 +129,31 @@ fn start_archive_readers<'scope>( let reader = ir; let thread_name = format!("Reader_{}", reader); - pool.builder() + let handle = thread::Builder::new() .name(thread_name) - .spawn(move |_| { + .spawn_scoped(pool, move || { debug!("Started reader #{}", reader); start_reading_archive(reader, iterator, producer); debug!("Finished reader #{}", reader); }) .unwrap(); + handles.push(handle); } drop(sequence); drop(sender); + handles } -fn start_solr_writers( - pool: &thread::Scope<'_>, transfer: &ParallelArgs, receiver: Receiver, +fn start_archive_writers<'scope>( + pool: &'scope thread::Scope<'scope, '_>, transfer: &ParallelArgs, receiver: Receiver, progress: Sender, update_hadler_url: String, -) { +) -> Vec> { let update_errors = Arc::new(AtomicU64::new(0)); let merr = transfer.max_errors; let delay = transfer.delay_per_request; + let mut handles = vec![]; + for iw in 0..transfer.writers { let consumer = receiver.clone(); let updater = progress.clone(); @@ -136,31 +162,26 @@ fn start_solr_writers( let writer = iw; let thread_name = format!("Writer_{}", writer); - pool.builder() + + let handle = thread::Builder::new() .name(thread_name) - .spawn(move |_| { + .spawn_scoped(pool, move || { debug!("Started writer #{}", writer); start_indexing_docs(writer, &url, consumer, updater, &arcerr, merr, delay); debug!("Finished writer #{}", writer); }) .unwrap(); + handles.push(handle); } drop(receiver); drop(progress); + handles } -fn finish_sending(params: &Restore, updated: u64) -> BoxedResult { +fn finish_progress(updated: u64) -> BoxedResult { let ctrl_c = monitor_term_sinal(); - if ctrl_c.aborted() { - raise("# Execution aborted by user!") - } else { - if updated > 0 && !params.no_final_commit { - // let params2 = Command { options: params.options }; - crate::commit::commit_main(¶ms.options.to_command())?; - } - Ok(updated) - } + if ctrl_c.aborted() { raise("# Execution aborted by user!") } else { Ok(updated) } } fn estimate_batch_count(found: &[PathBuf]) -> BoxedResult { diff --git a/src/steps.rs b/src/steps.rs index e957070..014d114 100644 --- a/src/steps.rs +++ b/src/steps.rs @@ -32,6 +32,7 @@ pub(crate) struct Requests { pub curr: u64, pub limit: u64, pub num_docs: u64, + pub expected: u64, pub url: String, } @@ -208,7 +209,7 @@ impl Iterator for Requests { let remaining = self.limit - self.curr; let rows = self.num_docs.min(remaining); let query = format!("{}&start={}&rows={}", self.url, self.curr, rows); - let res = Step { url: query, curr: self.prev + self.curr }; + let res = Step { url: query, curr: self.prev + self.curr, expected: self.expected }; self.curr += self.num_docs; Some(res) } else { @@ -271,7 +272,8 @@ impl Backup { } pub(crate) fn get_requests_for_range( - &self, retrieved: u64, num_retrieve: u64, core_fields: &[String], begin: &str, end: &str, + &self, retrieved: u64, num_retrieve: u64, core_fields: &[String], expected: u64, + begin: &str, end: &str, ) -> Requests { let selected = self.get_query_fields(core_fields); let query = self.get_query_url(&selected, begin, end); @@ -280,6 +282,7 @@ impl Backup { curr: self.skip, limit: num_retrieve, num_docs: self.num_docs, + expected, url: query, } } @@ -395,7 +398,7 @@ mod tests { let mut i = 0; for step in - gets.get_requests_for_range(0, num_retrieve, &core_info.fields, EMPTY_STR, EMPTY_STR) + gets.get_requests_for_range(0, num_retrieve, &core_info.fields, 0, EMPTY_STR, EMPTY_STR) { let url = step.url; assert_eq!(url.is_empty(), false); diff --git a/src/testsolr.rs b/src/testsolr.rs index a4501ae..743fbff 100644 --- a/src/testsolr.rs +++ b/src/testsolr.rs @@ -8,6 +8,7 @@ mod testsolr { use chrono::offset::Local; use clap::Parser; use glob::glob; + use log::error; use std::fs::remove_file; fn test_command_line_args_for(args: &[&str]) { @@ -46,7 +47,9 @@ mod testsolr { let listed = glob("target/demo*").unwrap(); let found = listed.filter_map(Result::ok).collect::>(); for file in found { - remove_file(file).unwrap(); + if let Err(e) = remove_file(&file) { + error!("Failed to remove file {}: {}", file.display(), e); + } } }