diff --git a/Cargo.lock b/Cargo.lock index 5532af6d692..fef7e53301e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1426,6 +1426,7 @@ dependencies = [ "gix", "gix-archive", "gix-error", + "gix-features", "gix-fsck", "gix-pack", "gix-status", diff --git a/Cargo.toml b/Cargo.toml index 366355a166f..343e6f0c12a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,10 @@ gitoxide-core-tools-archive = ["gitoxide-core/archive"] ## A sub-command to clean the worktree from untracked and ignored files. gitoxide-core-tools-clean = ["gitoxide-core/clean"] +## A sub-command to create a pack with delta compression. +## NOTE: This is an experimental feature and may change in the future. +gitoxide-core-tools-delta-create = ["gitoxide-core/experimental-delta-create"] + #! ### Building Blocks for mutually exclusive networking #! Blocking and async features are mutually exclusive and cause a compile-time error. This also means that `cargo … --all-features` will fail. #! Within each section, features can be combined. diff --git a/gitoxide-core/Cargo.toml b/gitoxide-core/Cargo.toml index b08fb0006c2..d86367a2432 100644 --- a/gitoxide-core/Cargo.toml +++ b/gitoxide-core/Cargo.toml @@ -47,6 +47,9 @@ async-client = ["gix/async-network-client-async-std", "gix-transport-configurati ## Data structures implement `serde::Serialize` and `serde::Deserialize`. serde = ["gix/serde", "dep:serde_json", "dep:serde", "bytesize/serde"] +## The experimental ability to create a pack with delta compression. +experimental-delta-create = ["gix-features", "dep:parking_lot"] + [dependencies] # deselect everything else (like "performance") as this should be controllable by the parent application. gix = { version = "^0.83.0", path = "../gix", default-features = false, features = ["merge", "blob-diff", "blame", "revision", "mailmap", "excludes", "attributes", "worktree-mutation", "credentials", "interrupt", "status", "dirwalk"] } @@ -94,6 +97,9 @@ tracing = { version = "0.1.43", optional = true } layout-rs = "0.1.3" open = "5.0.0" +# for experimental delta create +gix-features = { version = "^0.48.0", path = "../gix-features", default-features = false, features = ["crc32", "progress", "zlib"], optional = true } + document-features = { version = "0.2.0", optional = true } [dev-dependencies] diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs new file mode 100644 index 00000000000..c2bc08adb8a --- /dev/null +++ b/gitoxide-core/src/pack/delta_create.rs @@ -0,0 +1,859 @@ +use std::{collections::HashMap, io, path::Path, time::Instant}; + +use anyhow::anyhow; +use gix::{ + Count, NestedProgress, Progress, hash, hash::ObjectId, interrupt, odb::pack, parallel::InOrderIter, + prelude::Finalize, progress, +}; + +use crate::OutputFormat; + +/// A general purpose context for many operations provided here +/// +/// NOTE: copied from create.rs, but removed `expansion` and `thin` fields +pub struct Context { + /// If `Some(threads)`, use this amount of `threads` to accelerate the counting phase at the cost of losing + /// determinism as the order of objects during expansion changes with multiple threads unless no expansion is performed. + /// In the latter case, this flag has no effect. + /// If `None`, counting will only use one thread and thus yield the same sequence of objects in any case. + pub nondeterministic_thread_count: Option, + /// If set, don't use more than this amount of threads. + /// Otherwise, usually use as many threads as there are logical cores. + /// A value of 0 is interpreted as no-limit + pub thread_limit: Option, + /// If set, statistics about the operation will be written to the output stream. + pub statistics: Option, + /// The size of the cache storing fully decoded delta objects. This can greatly speed up pack decoding by reducing the length of delta + /// chains. Note that caches also incur a cost and poorly used caches may reduce overall performance. + /// This is a total, shared among all threads if `thread_limit` permits. + /// + /// If 0, the cache is disabled entirely. + pub pack_cache_size_in_bytes: usize, + /// The size of the cache to store full objects by their ID, bypassing any lookup in the object database. + /// Note that caches also incur a cost and poorly used caches may reduce overall performance. + /// This is a total, shared among all threads if `thread_limit` permits. + /// + /// If 0, the cache is disabled entirely. + pub object_cache_size_in_bytes: usize, + /// The output stream for use of additional information + pub out: W, +} + +/// NOTE: copied from create.rs, but: +/// - rewrite `input` transform +/// - use rewriten `iter_from_counts` +pub fn delta_create( + repository_path: impl AsRef, + input: impl io::BufRead + Send + 'static, + output_directory: Option>, + mut progress: P, + Context { + nondeterministic_thread_count, + thread_limit, + statistics, + pack_cache_size_in_bytes, + object_cache_size_in_bytes, + mut out, + .. + }: Context, +) -> anyhow::Result<()> +where + W: std::io::Write, + P: NestedProgress, + P::SubProgress: 'static, +{ + let repo = gix::discover(repository_path)?.into_sync(); + progress.init(Some(2), progress::steps()); + let make_cancellation_err = || anyhow!("Cancelled by user"); + let mut topo = HashMap::new(); + let parsed_input: Vec), Box>> = { + let mut progress = progress.add_child("iterating"); + progress.init(None, progress::count("objects")); + input + .lines() + .map(|line| { + line.map_err(|err| Box::new(err) as Box<_>).and_then(|line| { + let hex2oid = |hex: &str| { + ObjectId::from_hex(hex.as_bytes()) + .map_err(Into::>::into) + }; + if let Some((target, source)) = line.split_once(' ') { + Ok((hex2oid(target)?, Some(hex2oid(source)?))) + } else { + Ok((hex2oid(&line)?, None)) + } + }) + }) + .inspect(move |_| progress.inc()) + .collect() + }; + for res in &parsed_input { + if let Ok((target, Some(source))) = res { + topo.insert(target.clone(), source.clone()); + } + } + let mut handle = repo.objects.into_shared_arc().to_cache_arc(); + let mut input: Box>> + Send> = + Box::new(parsed_input.into_iter().map(|res| res.map(|(target, _)| target))); + + let mut stats = Statistics::default(); + let chunk_size = 1000; // What's a good value for this? + let counts = { + let mut progress = progress.add_child("counting"); + progress.init(None, progress::count("objects")); + let may_use_multiple_threads = nondeterministic_thread_count.is_some(); + let thread_limit = if may_use_multiple_threads { + nondeterministic_thread_count.or(thread_limit) + } else { + Some(1) + }; + if nondeterministic_thread_count.is_some() && !may_use_multiple_threads { + progress.fail("Cannot use multi-threaded counting in tree-diff object expansion mode as it may yield way too many objects.".into()); + } + let (_, _, thread_count) = gix::parallel::optimize_chunk_size_and_thread_limit(50, None, thread_limit, None); + let progress = progress::ThroughputOnDrop::new(progress); + + { + // Maybe should disable cache in some cases + handle.set_pack_cache(move || { + Box::new(pack::cache::lru::MemoryCappedHashmap::new( + pack_cache_size_in_bytes / thread_count, + )) + }); + handle.set_object_cache(move || { + Box::new(pack::cache::object::MemoryCappedHashmap::new( + object_cache_size_in_bytes / thread_count, + )) + }); + } + handle.prevent_pack_unload(); + handle.ignore_replacements = true; + let input_object_expansion = pack::data::output::count::objects::ObjectExpansion::AsIs; + let (mut counts, count_stats) = if may_use_multiple_threads { + pack::data::output::count::objects( + handle.clone(), + input, + &progress, + &interrupt::IS_INTERRUPTED, + pack::data::output::count::objects::Options { + thread_limit, + chunk_size, + input_object_expansion, + }, + )? + } else { + pack::data::output::count::objects_unthreaded( + &handle, + &mut input, + &progress, + &interrupt::IS_INTERRUPTED, + input_object_expansion, + )? + }; + stats.counts = count_stats; + counts.shrink_to_fit(); + counts + }; + + progress.inc(); + let num_objects = counts.len(); + let mut in_order_entries = { + let progress = progress.add_child("creating entries"); + InOrderIter::from(iter_from_counts::iter_from_counts( + counts, + topo, + handle, + Box::new(progress), + iter_from_counts::Options { + thread_limit, + chunk_size, + version: Default::default(), + }, + )) + }; + + let mut entries_progress = progress.add_child("consuming"); + entries_progress.init(Some(num_objects), progress::count("entries")); + let mut write_progress = progress.add_child("writing"); + write_progress.init(None, progress::bytes()); + let start = Instant::now(); + + let mut named_tempfile_store: Option = None; + let mut sink_store: std::io::Sink; + let (mut pack_file, output_directory): (&mut dyn std::io::Write, Option<_>) = match output_directory { + Some(dir) => { + named_tempfile_store = Some(tempfile::NamedTempFile::new_in(dir.as_ref())?); + (named_tempfile_store.as_mut().expect("packfile just set"), Some(dir)) + } + None => { + sink_store = std::io::sink(); + (&mut sink_store, None) + } + }; + let mut interruptible_output_iter = interrupt::Iter::new( + pack::data::output::bytes::FromEntriesIter::new( + in_order_entries.by_ref().inspect(|e| { + if let Ok(entries) = e { + entries_progress.inc_by(entries.len()); + } + }), + &mut pack_file, + num_objects as u32, + pack::data::Version::default(), + hash::Kind::default(), + ), + make_cancellation_err, + ); + for io_res in interruptible_output_iter.by_ref() { + let written = io_res??; + write_progress.inc_by(written as usize); + } + + let hash = interruptible_output_iter + .into_inner() + .digest() + .expect("iteration is done"); + let pack_name = format!("{hash}.pack"); + if let (Some(pack_file), Some(dir)) = (named_tempfile_store.take(), output_directory) { + pack_file.persist(dir.as_ref().join(pack_name))?; + } else { + writeln!(out, "{pack_name}")?; + } + stats.entries = in_order_entries.inner.finalize()?; + + write_progress.show_throughput(start); + entries_progress.show_throughput(start); + + if let Some(format) = statistics { + print(stats, format, out)?; + } + progress.inc(); + Ok(()) +} + +/// NOTE: copied from create.rs +fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> anyhow::Result<()> { + match format { + OutputFormat::Human => human_output(stats, out).map_err(Into::into), + #[cfg(feature = "serde")] + OutputFormat::Json => serde_json::to_writer_pretty(out, &stats).map_err(Into::into), + } +} + +/// NOTE: copied from create.rs +fn human_output( + Statistics { + counts: + pack::data::output::count::objects::Outcome { + input_objects, + expanded_objects, + decoded_objects, + total_objects, + }, + entries: + pack::data::output::entry::iter_from_counts::Outcome { + decoded_and_recompressed_objects, + missing_objects, + objects_copied_from_pack, + ref_delta_objects, + }, + }: Statistics, + mut out: impl std::io::Write, +) -> std::io::Result<()> { + let width = 30; + writeln!(out, "counting phase")?; + #[rustfmt::skip] + writeln!( + out, + "\t{:, + /// The amount of objects per chunk or unit of work to be sent to threads for processing + /// TODO: could this become the window size? + pub chunk_size: usize, + /// The pack data version to produce for each entry + pub version: pack::data::Version, + } + + // NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs, + // but rewrote parameters in parallel::reduce::Stepwise::new + pub fn iter_from_counts( + mut counts: Vec, + topo: HashMap, + db: Find, + mut progress: Box, + Options { + thread_limit, + chunk_size, + version, + }: Options, + ) -> impl Iterator), Error>> + + parallel::reduce::Finalize> + where + Find: pack::Find + Send + Clone + 'static, + { + assert!( + matches!(version, pack::data::Version::V2), + "currently we can only write version 2" + ); + let (chunk_size, thread_limit, _) = + parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None); + { + let progress = Arc::new(parking_lot::Mutex::new( + progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()), + )); + progress.lock().init(None, progress::count("counts")); + let enough_counts_present = counts.len() > 4_000; + let start = std::time::Instant::now(); + parallel::in_parallel_if( + || enough_counts_present, + counts.chunks_mut(chunk_size), + thread_limit, + |_n| Vec::::new(), + { + let progress = Arc::clone(&progress); + let db = db.clone(); + move |chunk, buf| { + let chunk_size = chunk.len(); + for count in chunk { + use pack::data::output::count::PackLocation::*; + match count.entry_pack_location { + LookedUp(_) => continue, + NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)), + } + } + progress.lock().inc_by(chunk_size); + Ok::<_, ()>(()) + } + }, + parallel::reduce::IdentityWithResult::<(), ()>::default(), + ) + .expect("infallible - we ignore none-existing objects"); + progress.lock().show_throughput(start); + } + + let sorted_counts = { + topo_sort(counts.as_mut_slice(), &topo).expect("no loop in delta topo"); + Arc::new(counts) + }; + let progress = Arc::new(parking_lot::Mutex::new(progress)); + let chunks = util::ChunkRanges::new(chunk_size, sorted_counts.len()); + + let oid_index_mapping = Arc::new( + sorted_counts + .iter() + .enumerate() + .map(|(index, count)| (count.id, index)) + .collect::>(), + ); // TODO: rearrange delta solving order or lru to avoid cache peak + parallel::reduce::Stepwise::new( + chunks.enumerate(), + thread_limit, + { + let progress = Arc::clone(&progress); + move |n| { + ( + // Cache entries object ID and offset for packs + std::collections::HashMap::>::new(), + // buffer object data for target + Vec::new(), + // buffer object data for source + Vec::new(), + progress + .lock() + .add_child_with_id(format!("thread {n}"), progress::UNKNOWN), + ) + } + }, + { + let sorted_counts = Arc::clone(&sorted_counts); + let oid_index_mapping = Arc::clone(&oid_index_mapping); + move |(chunk_id, chunk_range): (SequenceId, std::ops::Range), + (pack_index_cache, buf_t, buf_s, progress)| { + let mut out = Vec::new(); + let chunk = &sorted_counts[chunk_range]; + let mut stats = Outcome::default(); + progress.init(Some(chunk.len()), progress::count("objects")); + + for count in chunk.iter() { + let oid = count.id; + let db_find_cached = |oid, buf| db.try_find(oid, buf).map_err(Error::Find); + let entry = if let Some(source_oid) = topo.get(&oid) { + let mut find_existing_delta = || -> Option<_> { + let (compressed_data, decompressed_size) = find_delta( + count, + &db, + source_oid, + |pack_id, base_offset| { + let offsets_oid_mapping = + pack_index_cache.entry(pack_id).or_insert_with(|| { + db.pack_offsets_and_oid(pack_id) + .map(|mut v| { + v.sort_by_key(|e| e.0); + v + }) + .expect("pack used for counts is still available") + }); + offsets_oid_mapping + .binary_search_by_key(&base_offset, |e| e.0) + .ok() + .map(|idx| offsets_oid_mapping[idx].1) + }, + version, + )?; + Some(Ok(output::Entry { + id: oid.to_owned(), + kind: output::entry::Kind::DeltaRef { + object_index: *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), + }, + decompressed_size, + compressed_data, + })) + }; + // Find existing delta + if let Some(entry) = find_existing_delta() { + stats.objects_copied_from_pack += 1; + entry + } + // Build delta + else if let Some((target, _)) = db_find_cached(&oid, buf_t)? { + if let Some((source, _)) = db_find_cached(source_oid, buf_s)? { + let delta_data = delta_diff::diff(source.data, target.data) + .expect("delta diff algorithm should valid"); + let mut deflate = gix_features::zlib::stream::deflate::Write::new(Vec::new()); + std::io::copy(&mut delta_data.as_slice(), &mut deflate) + .map_err(|e| Error::NewEntry(e.into()))?; + deflate.flush().map_err(|e| Error::NewEntry(e.into()))?; + let compressed_delta = deflate.into_inner(); + Ok(output::Entry { + id: oid.to_owned(), + kind: output::entry::Kind::DeltaRef { + object_index: *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack + }, + decompressed_size: delta_data.len(), + compressed_data: compressed_delta, + }) + } else { + Ok(output::Entry::invalid()) + } + } else { + Ok(output::Entry::invalid()) + } + } else if let Some((data, _)) = db_find_cached(&oid, buf_t)? { + output::Entry::from_data(count, &data) + } else { + Ok(output::Entry::invalid()) + }?; + out.push(entry); + progress.inc(); + } + Ok((chunk_id, out, stats)) + } + }, + reduce::Statistics::default(), + ) + } + + /// Topological sort `counts` in place, parents first. + /// If there is a loop, returns Err(usize), meaning how many ObjectID are in loops indicated in the `to_parent`. + /// + /// # Panics + /// + /// Panics if any ObjectId in `to_parent` is not present in `counts`, which would indicate + /// that the parent-child relationship map references objects outside the given count set. + fn topo_sort( + counts: &mut [output::Count], + to_parent: &std::collections::HashMap, + ) -> Result<(), usize> { + use std::collections::HashMap; + + type CountIndex = usize; + + let n = counts.len(); + if n == 0 { + return Ok(()); + } + + // Firstly sort `vertexes` as children first via Kahn method... + let oid_to_idx: HashMap = counts + .iter() + .enumerate() + .map(|(idx, c)| (c.id.to_owned(), idx)) + .collect(); + let mut idx_to_child_count: HashMap = (0..n).map(|c| (c, 0)).collect(); + for (child, parent) in to_parent { + let child = oid_to_idx + .get(child) + .expect("child ObjectId in to_parent should exist in counts"); + let parent = oid_to_idx + .get(parent) + .expect("parent ObjectId in to_parent should exist in counts"); + if idx_to_child_count.contains_key(child) { + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count += 1; + } + } + } + + let mut stack: Vec = idx_to_child_count + .iter() + .filter_map(|(&c, count)| (*count == 0).then_some(c)) // Collect leaf vertex + .collect(); + let mut sorted = Vec::with_capacity(n); + while let Some(curr) = stack.pop() { + if let Some(parent) = to_parent.get(&counts[curr].id) { + let parent = oid_to_idx.get(parent).unwrap(); + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count -= 1; + if *count == 0 { + stack.push(*parent); + } + } + } + sorted.push(curr); + } + + match sorted.len().cmp(&n) { + Ordering::Less => Err(n - sorted.len()), + Ordering::Equal => { + // ...then reverse `vertexex`, and returns as parents first + sorted.reverse(); + util::apply_permutation(counts, &sorted); + Ok(()) + } + Ordering::Greater => { + unreachable!("sorted counts should less or equal than all counts") + } + } + } + + /// Returns `(compressed_delta_data, decompressed_size)` if the pack entry is a delta pointing to `source_oid`. + /// The compressed data is extracted as-is from the pack (no decompression/recompression round-trip). + fn find_delta( + count: &output::Count, + db: &impl pack::Find, + source_oid: &ObjectId, + mut pack_offset_to_oid: impl FnMut(u32, u64) -> Option, + target_version: pack::data::Version, + ) -> Option<(Vec, usize)> { + let entry = count + .entry_pack_location + .as_ref() + .and_then(|l| db.entry_by_location(l))?; + + if entry.version != target_version { + return None; + } + + let pack_offset_must_be_zero = 0; + let pack_entry = + pack::data::Entry::from_bytes(&entry.data, pack_offset_must_be_zero, count.id.as_slice().len()).ok()?; + + use pack::data::entry::Header::*; + let source_matches = match pack_entry.header { + OfsDelta { base_distance } => { + let pack_location = count.entry_pack_location.as_ref().expect("packed"); + let base_offset = pack_location + .pack_offset + .checked_sub(base_distance) + .expect("pack-offset - distance is firmly within the pack"); + pack_offset_to_oid(pack_location.pack_id, base_offset) + } + RefDelta { base_id } => Some(base_id), + _ => None, + } + .filter(|id| id == source_oid); + + if source_matches.is_none() { + return None; + } + + let compressed = entry.data[pack_entry.data_offset as usize..].to_vec(); + Some((compressed, pack_entry.decompressed_size as usize)) + } + + /// NOTE: except `apply_permutation`, copied from gix-pack/src/data/output/entry/iter_from_counts.rs + mod util { + #[derive(Clone)] + pub struct ChunkRanges { + cursor: usize, + size: usize, + len: usize, + } + + impl ChunkRanges { + pub fn new(size: usize, total: usize) -> Self { + ChunkRanges { + cursor: 0, + size, + len: total, + } + } + } + + impl Iterator for ChunkRanges { + type Item = std::ops::Range; + + fn next(&mut self) -> Option { + if self.cursor >= self.len { + None + } else { + let upper = (self.cursor + self.size).min(self.len); + let range = self.cursor..upper; + self.cursor = upper; + Some(range) + } + } + } + + pub fn apply_permutation(data: &mut [T], indices: &[usize]) { + let n = data.len(); + + // inverse transformation: indices[i] = j => indices[j] = i + let mut inv = vec![0; n]; + for (i, &j) in indices.iter().enumerate() { + inv[j] = i; + } + + for i in 0..n { + while inv[i] != i { + let target = inv[i]; + data.swap(i, target); + inv.swap(i, target); + } + } + } + } + /// NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs + mod reduce { + use std::marker::PhantomData; + + use super::Outcome; + use super::pack::data::output; + use super::{parallel, parallel::SequenceId}; + + pub struct Statistics { + total: Outcome, + _err: PhantomData, + } + + impl Default for Statistics { + fn default() -> Self { + Statistics { + total: Default::default(), + _err: PhantomData, + } + } + } + + impl parallel::Reduce for Statistics { + type Input = Result<(SequenceId, Vec, Outcome), Error>; + type FeedProduce = (SequenceId, Vec); + type Output = Outcome; + type Error = Error; + + fn feed(&mut self, item: Self::Input) -> Result { + item.map(|(cid, entries, stats)| { + // Should reuse Outcome::aggregate, but it's private + self.total.decoded_and_recompressed_objects += stats.decoded_and_recompressed_objects; + self.total.missing_objects += stats.missing_objects; + self.total.objects_copied_from_pack += stats.objects_copied_from_pack; + self.total.ref_delta_objects += stats.ref_delta_objects; + (cid, entries) + }) + } + + fn finalize(self) -> Result { + Ok(self.total) + } + } + } + + mod delta_diff { + use std::io::Write; + + /// Returned when failing to encode deltas. + #[derive(thiserror::Error, Debug)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to write bytes: {0}")] + IOError(std::io::Error), + #[error("Too large offset in Copy instruction, should <= 0xffffffff, got {0}")] + TooLargeOffset(usize), + #[error("Too large size in Copy instruction, should <= 0x00ffffff, got {0}")] + TooLargeSize(usize), + #[error("Too large data in Add instruction, length should <= 127, got {0}")] + TooLargeData(usize), + } + + /// Delta instruction + #[derive(Debug)] + pub enum Instruction<'a> { + /// Copy data from source + Copy { + /// Start position to copy + offset: usize, + /// Data length in bytes + size: usize, + }, + /// Insert bytes embedded in instruction + Add { + /// Data to insert + data: &'a [u8], + }, + } + + impl Instruction<'_> { + /// Encode instruction to bytes. + pub fn encode(self, mut writer: impl Write) -> Result<(), Error> { + match self { + Self::Copy { offset, mut size } => { + let mut header = 0x80u8; + let mut buf = [0u8; 7]; + let mut n = 0; + + if size == 0x10000 { + size = 0; + } else if size > 0x00ffffff { + return Err(Error::TooLargeSize(size)); + } + if offset > 0xffffffff { + return Err(Error::TooLargeOffset(offset)); + } + + for i in 0..4 { + let byte = (offset >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << i; + buf[n] = byte; + n += 1; + } + } + for i in 0..3 { + let byte = (size >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << (4 + i); + buf[n] = byte; + n += 1; + } + } + + writer.write_all(&[header]).map_err(Error::IOError)?; + writer.write_all(&buf[..n]).map_err(Error::IOError)?; + Ok(()) + } + Self::Add { data } => { + if data.len() > 127 { + return Err(Error::TooLargeData(data.len())); + } + + let header = data.len() as u8; + writer.write_all(&[header]).map_err(Error::IOError)?; + writer.write_all(data).map_err(Error::IOError)?; + Ok(()) + } + } + } + } + + /// Encode a variable-length integer for the delta header (7 bits per byte, MSB = continuation). + fn encode_delta_varint(mut value: usize, buf: &mut impl Write) -> Result<(), Error> { + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value > 0 { + byte |= 0x80; + } + buf.write_all(&[byte]).map_err(Error::IOError)?; + if value == 0 { + break; + } + } + Ok(()) + } + + /// Calculate delta from `source` to `target`, returning the instructions + /// (header + instructions) as expected by the Git pack format. + fn compute_delta<'a>(source: &[u8], target: &'a [u8]) -> Vec> { + let mut insts = Vec::new(); + + let mut common_prefix_len: usize = 0; + for (s, t) in source.iter().zip(target) { + if s == t { + common_prefix_len += 1; + } else { + break; + } + } + if common_prefix_len > 0 { + insts.push(Instruction::Copy { + offset: 0, + size: common_prefix_len, + }); + } + + for chunk in target[common_prefix_len..].chunks(127) { + insts.push(Instruction::Add { data: chunk }); + } + insts + } + + /// Calculate delta from `source` to `target`, returning the instructions + /// (header + instructions) as expected by the Git pack format. + pub fn diff(source: &[u8], target: &[u8]) -> Result, Error> { + let mut delta_data = Vec::new(); + encode_delta_varint(source.len(), &mut delta_data)?; + encode_delta_varint(target.len(), &mut delta_data)?; + for inst in compute_delta(source, target) { + inst.encode(&mut delta_data)?; + } + Ok(delta_data) + } + } +} diff --git a/gitoxide-core/src/pack/mod.rs b/gitoxide-core/src/pack/mod.rs index 0805418356c..d2443b19f6d 100644 --- a/gitoxide-core/src/pack/mod.rs +++ b/gitoxide-core/src/pack/mod.rs @@ -10,3 +10,8 @@ pub use receive::receive; pub mod create; pub use create::create; + +#[cfg(feature = "experimental-delta-create")] +pub mod delta_create; +#[cfg(feature = "experimental-delta-create")] +pub use delta_create::delta_create; diff --git a/justfile b/justfile index 46d681cd181..5252bbd7ed4 100755 --- a/justfile +++ b/justfile @@ -13,13 +13,13 @@ alias c := check alias nt := nextest # Run all tests, clippy, including journey tests, try building docs -test: clippy check doc unit-tests doc-tests journey-tests-pure journey-tests-small journey-tests-async journey-tests check-mode +test: clippy check doc unit-tests doc-tests journey-tests-pure journey-tests-small journey-tests-async journey-tests check-mode journey-tests-delta-create # Run all tests, without clippy, and try building docs ci-test: check doc unit-tests check-mode # Run all journey tests - should be run in a fresh clone or after `cargo clean` -ci-journey-tests: journey-tests-pure journey-tests-small journey-tests-async journey-tests +ci-journey-tests: journey-tests-pure journey-tests-small journey-tests-async journey-tests journey-tests-delta-create # Clean the `target` directory clear-target: @@ -260,6 +260,12 @@ journey-tests-async: cargo build -p gix-testtools dbg="$({{ j }} dbg)" && tests/journey.sh "$dbg/ein" "$dbg/gix" "$dbg/jtt" async +# Run journey tests with delta-create feature enabled +journey-tests-delta-create: + cargo build --no-default-features --features max-pure,gitoxide-core-tools-delta-create + cargo build -p gix-testtools --bin jtt + dbg="$({{ j }} dbg)" && tests/journey.sh "$dbg/ein" "$dbg/gix" "$dbg/jtt" delta-create + # Build a customized `cross` container image for testing cross-image target: docker build --build-arg "TARGET={{ target }}" \ diff --git a/src/plumbing/main.rs b/src/plumbing/main.rs index d9ef65d66dd..61d0b910260 100644 --- a/src/plumbing/main.rs +++ b/src/plumbing/main.rs @@ -963,6 +963,36 @@ pub fn main() -> Result<()> { }, ) } + #[cfg(feature = "gitoxide-core-tools-delta-create")] + free::pack::Subcommands::DeltaCreate { + repository, + statistics, + nondeterministic_count, + pack_cache_size_mb, + counting_threads, + object_cache_size_mb, + output_directory, + } => prepare_and_run( + "pack-create", + trace, + verbose, + progress, + progress_keep_open, + core::pack::create::PROGRESS_RANGE, + move |progress, out, _err| { + let input = stdin_or_bail()?; + let repository = repository.unwrap_or_else(|| PathBuf::from(".")); + let context = core::pack::delta_create::Context { + thread_limit, + nondeterministic_thread_count: nondeterministic_count.then_some(counting_threads), + pack_cache_size_in_bytes: pack_cache_size_mb.unwrap_or(0) * 1_000_000, + object_cache_size_in_bytes: object_cache_size_mb.unwrap_or(0) * 1_000_000, + statistics: if statistics { Some(format) } else { None }, + out, + }; + core::pack::delta_create::delta_create(repository, input, output_directory, progress, context) + }, + ), #[cfg(feature = "gitoxide-core-async-client")] free::pack::Subcommands::Receive { protocol, diff --git a/src/plumbing/options/free.rs b/src/plumbing/options/free.rs index 055d8a0d7a4..869e885d426 100644 --- a/src/plumbing/options/free.rs +++ b/src/plumbing/options/free.rs @@ -184,6 +184,53 @@ pub mod pack { /// Otherwise the expansion mode is 'tree-traversal' by default. tips: Vec, }, + #[cfg(feature = "gitoxide-core-tools-delta-create")] + /// Create a new pack with customized delta topological relationships. NOTE: This is an experimental feature and may change in the future. + DeltaCreate { + #[clap(long, short = 'r')] + /// the directory containing the '.git' repository from which objects should be read. + repository: Option, + + #[clap(long, default_value_t = 3, requires = "nondeterministic_count")] + /// The amount of threads to use when counting and the `--nondeterminisitc-count` flag is set, defaulting + /// to the globally configured threads. + /// + /// Use it to have different trade-offs between counting performance and cost in terms of CPU, as the scaling + /// here is everything but linear. The effectiveness of each core seems to be no more than 30%. + counting_threads: usize, + + #[clap(long)] + /// if set, the counting phase may be accelerated using multithreading. + /// + /// On the flip side, however, one will loose deterministic counting results which affects the + /// way the resulting pack is structured. + nondeterministic_count: bool, + + #[clap(long, short = 's')] + /// If set statistical information will be presented to inform about pack creation details. + /// It's a form of instrumentation for developers to help improve pack generation. + statistics: bool, + + #[clap(long)] + /// The size in megabytes for a cache to speed up pack access for packs with long delta chains. + /// It is shared among all threads, so 4 threads would use their own cache 1/4th of the size. + /// + /// If unset, no cache will be used. + pack_cache_size_mb: Option, + + #[clap(long)] + /// The size in megabytes for a cache to speed up accessing entire objects, bypassing object database access when hit. + /// It is shared among all threads, so 4 threads would use their own cache 1/4th of the size. + /// + /// This cache type is currently only effective when using the 'diff-tree' object expansion. + /// + /// If unset, no cache will be used. + object_cache_size_mb: Option, + + /// The directory into which to write the pack file. + #[clap(long, short = 'o')] + output_directory: Option, + }, /// Use the gix-protocol to receive a pack, emulating a clone. #[cfg(any(feature = "gitoxide-core-async-client", feature = "gitoxide-core-blocking-client"))] Receive { diff --git a/tests/helpers.sh b/tests/helpers.sh index e0994f25278..b43e7d5b5ae 100644 --- a/tests/helpers.sh +++ b/tests/helpers.sh @@ -59,6 +59,30 @@ function small-repo-in-sandbox() { } &>/dev/null } +# Creates a repo with 4 versions of a file (blob#0..#3) in 50-line increments. +# Aggressive repacking forces Git to build a delta chain. +# blob#3 (latest/largest) becomes the base, with others as sequential deltas. +function pack-repo-in-sandbox() { + sandbox + { + git init + git checkout -b main + git config commit.gpgsign false + git config tag.gpgsign false + + for i in 0 1 2 3; do + python3 -c " +for j in range(50 * ($i + 1)): + print(f'line {j}') +" > file.txt + git add file.txt + git commit -m "blob#${i}" + done + + git repack -a -d --window=10 --depth=4 + } &>/dev/null +} + function launch-git-daemon() { local i git_daemon_url_file diff --git a/tests/journey/gix.sh b/tests/journey/gix.sh index 4ab6ebe1d5b..3b42bc09578 100644 --- a/tests/journey/gix.sh +++ b/tests/journey/gix.sh @@ -200,6 +200,7 @@ title "gix commit-graph" (with "gix free" snapshot="$snapshot/no-repo" title "gix free pack" + (if test "$kind" != "delta-create"; then (when "running 'pack'" snapshot="$snapshot/pack" @@ -702,4 +703,67 @@ title "gix commit-graph" ) ) ) + fi) + + title "gix free pack delta-create" + (if test "$kind" = "delta-create"; then + (with "the 'delta-create' sub-command" + snapshot="$snapshot/delta-create" + (pack-repo-in-sandbox + REPO_DIR="$PWD" + BLOB0=$(git rev-parse HEAD~3:file.txt) + BLOB1=$(git rev-parse HEAD~2:file.txt) + BLOB2=$(git rev-parse HEAD~1:file.txt) + BLOB3=$(git rev-parse HEAD:file.txt) + + (with "new delta (blob#1 -> blob#0)" + (with "no output directory" + it "prints the pack name" && { + WITH_SNAPSHOT="$snapshot/new-delta-no-output-dir-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + (with "output directory" + (sandbox + mkdir out + it "creates a pack file" && { + WITH_SNAPSHOT="$snapshot/new-delta-output-dir-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -r '$REPO_DIR' -o out/" + } + it "writes a .pack file to the output directory" && { + WITH_SNAPSHOT="$snapshot/new-delta-output-dir-content" \ + expect_run $SUCCESSFULLY ls out/ + } + ) + ) + (with "statistics" + it "shows statistics" && { + WITH_SNAPSHOT="$snapshot/new-delta-statistics-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -s" + } + ) + (with "--format json" + it "outputs statistics as JSON" && { + WITH_SNAPSHOT="$snapshot/new-delta-statistics-json-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose --format json free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -s" + } + ) + ) + + (with "new base (blob#0)" + it "stores as a full object" && { + WITH_SNAPSHOT="$snapshot/new-base-success" \ + expect_run $SUCCESSFULLY bash -c "echo '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + + (with "reuse delta (blob#0 -> blob#3)" + it "copies the existing delta" && { + WITH_SNAPSHOT="$snapshot/reuse-delta-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB0 $BLOB3' '$BLOB3' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + ) + ) + fi) ) diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-base-success b/tests/snapshots/plumbing/no-repo/delta-create/new-base-success new file mode 100644 index 00000000000..6e457eb3a5c --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-base-success @@ -0,0 +1 @@ +b4628a57c2c40d9b4f209b8322026f0ff5767864.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success new file mode 100644 index 00000000000..d59f983b528 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success @@ -0,0 +1 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content new file mode 100644 index 00000000000..d59f983b528 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content @@ -0,0 +1 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-success new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success new file mode 100644 index 00000000000..1d726c292c0 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success @@ -0,0 +1,15 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack +{ + "counts": { + "input_objects": 2, + "expanded_objects": 0, + "decoded_objects": 2, + "total_objects": 2 + }, + "entries": { + "decoded_and_recompressed_objects": 0, + "missing_objects": 0, + "objects_copied_from_pack": 0, + "ref_delta_objects": 0 + } +} \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success new file mode 100644 index 00000000000..c116c6f0c26 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success @@ -0,0 +1,11 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack +counting phase + input objects 2 + expanded objects 0 + decoded objects 2 + total objects 2 +generation phase + decoded and recompressed 0 + pack-to-pack copies 0 + ref-delta-objects 0 + missing objects 0 \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success b/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success new file mode 100644 index 00000000000..a67ca617424 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success @@ -0,0 +1 @@ +d566a17c88859ad426bb2d5e5c20e34819a0789b.pack \ No newline at end of file