From 7bf07c8c194bc12274c32e52dd6c8018c478e266 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Wed, 29 Apr 2026 17:16:26 +0800 Subject: [PATCH 01/11] feat: delta_create --- Cargo.lock | 1 + gitoxide-core/Cargo.toml | 6 + gitoxide-core/src/pack/delta_create.rs | 819 +++++++++++++++++++++++++ gitoxide-core/src/pack/mod.rs | 5 + 4 files changed, 831 insertions(+) create mode 100644 gitoxide-core/src/pack/delta_create.rs diff --git a/Cargo.lock b/Cargo.lock index 2e847e20906..31e842c9464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1389,6 +1389,7 @@ dependencies = [ "gix", "gix-archive", "gix-error", + "gix-features", "gix-fsck", "gix-pack", "gix-status", diff --git a/gitoxide-core/Cargo.toml b/gitoxide-core/Cargo.toml index ff07bea921f..7a31a734ad6 100644 --- a/gitoxide-core/Cargo.toml +++ b/gitoxide-core/Cargo.toml @@ -47,6 +47,9 @@ async-client = ["gix/async-network-client-async-std", "gix-transport-configurati ## Data structures implement `serde::Serialize` and `serde::Deserialize`. serde = ["gix/serde", "dep:serde_json", "dep:serde", "bytesize/serde"] +## The experimental ability to create a pack with delta compression. +experimental-delta-create = ["gix-features"] + [dependencies] # deselect everything else (like "performance") as this should be controllable by the parent application. gix = { version = "^0.83.0", path = "../gix", default-features = false, features = ["merge", "blob-diff", "blame", "revision", "mailmap", "excludes", "attributes", "worktree-mutation", "credentials", "interrupt", "status", "dirwalk"] } @@ -94,6 +97,9 @@ tracing = { version = "0.1.43", optional = true } layout-rs = "0.1.3" open = "5.0.0" +# for experimental delta create +gix-features = { version = "^0.48.0", path = "../gix-features", default-features = false, features = ["crc32", "progress", "zlib"], optional = true } + document-features = { version = "0.2.0", optional = true } [dev-dependencies] diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs new file mode 100644 index 00000000000..b72713d92ef --- /dev/null +++ b/gitoxide-core/src/pack/delta_create.rs @@ -0,0 +1,819 @@ +// NOTE: copied from create.rs +use std::{collections::HashMap, io, path::Path, time::Instant}; + +use anyhow::anyhow; +use gix::{ + hash, hash::ObjectId, interrupt, odb::pack, parallel::InOrderIter, prelude::Finalize, progress, Count, + NestedProgress, Progress, +}; + +use crate::OutputFormat; + +/// A general purpose context for many operations provided here +pub struct Context { + /// If `Some(threads)`, use this amount of `threads` to accelerate the counting phase at the cost of losing + /// determinism as the order of objects during expansion changes with multiple threads unless no expansion is performed. + /// In the latter case, this flag has no effect. + /// If `None`, counting will only use one thread and thus yield the same sequence of objects in any case. + pub nondeterministic_thread_count: Option, + /// If set, don't use more than this amount of threads. + /// Otherwise, usually use as many threads as there are logical cores. + /// A value of 0 is interpreted as no-limit + pub thread_limit: Option, + /// If set, statistics about the operation will be written to the output stream. + pub statistics: Option, + /// The size of the cache storing fully decoded delta objects. This can greatly speed up pack decoding by reducing the length of delta + /// chains. Note that caches also incur a cost and poorly used caches may reduce overall performance. + /// This is a total, shared among all threads if `thread_limit` permits. + /// + /// If 0, the cache is disabled entirely. + pub pack_cache_size_in_bytes: usize, + /// The size of the cache to store full objects by their ID, bypassing any lookup in the object database. + /// Note that caches also incur a cost and poorly used caches may reduce overall performance. + /// This is a total, shared among all threads if `thread_limit` permits. + /// + /// If 0, the cache is disabled entirely. + pub object_cache_size_in_bytes: usize, + /// The output stream for use of additional information + pub out: W, +} + +pub fn delta_create( + repository_path: impl AsRef, + input: impl io::BufRead + Send + 'static, + output_directory: Option>, + mut progress: P, + Context { + thread_limit, + statistics, + pack_cache_size_in_bytes, + object_cache_size_in_bytes, + mut out, + .. + }: Context, +) -> anyhow::Result<()> +where + W: std::io::Write, + P: NestedProgress, + P::SubProgress: 'static, +{ + type ObjectIdIter<'a> = + dyn Iterator), Box>> + Send + 'a; + + let repo = gix::discover(repository_path)?.into_sync(); + progress.init(Some(2), progress::steps()); + let make_cancellation_err = || anyhow!("Cancelled by user"); + let mut topo = HashMap::new(); + let (mut handle, input): (_, Box>) = { + let mut progress = progress.add_child("iterating"); + progress.init(None, progress::count("objects")); + let handle = repo.objects.into_shared_arc().to_cache_arc(); + ( + handle, + Box::new( + input + .lines() + .map(|line| { + line.map_err(|err| Box::new(err) as Box<_>).and_then(|line| { + let hex2oid = |hex: &str| { + ObjectId::from_hex(hex.as_bytes()) + .map_err(Into::>::into) + }; + if let Some((target, source)) = line.split_once(' ') { + Ok((hex2oid(target)?, Some(hex2oid(source)?))) + } else { + Ok((hex2oid(&line)?, None)) + } + }) + }) + .inspect(move |_| progress.inc()) + .inspect(|res| { + if let Ok((target, Some(source))) = res { + topo.insert(target.clone(), source.clone()); + } + }), + ), + ) + }; + + let mut stats = Statistics::default(); + let chunk_size = 1000; // What's a good value for this? + let counts = { + // TODO: parallel speedup + let mut progress = progress.add_child("counting"); + progress.init(None, progress::count("objects")); + let progress = progress::ThroughputOnDrop::new(progress); + + { + handle + .set_pack_cache(move || Box::new(pack::cache::lru::MemoryCappedHashmap::new(pack_cache_size_in_bytes))); + handle.set_object_cache(move || { + Box::new(pack::cache::object::MemoryCappedHashmap::new( + object_cache_size_in_bytes, + )) + }); + } + handle.prevent_pack_unload(); + handle.ignore_replacements = true; + let (mut counts, count_stats) = pack::data::output::count::objects_unthreaded( + &handle, + &mut input.map(|res| res.map(|(target, _)| target)), + &progress, + &interrupt::IS_INTERRUPTED, + pack::data::output::count::objects::ObjectExpansion::AsIs, + )?; + stats.counts = count_stats; + counts.shrink_to_fit(); + counts + }; + + progress.inc(); + let num_objects = counts.len(); + let mut in_order_entries = { + let progress = progress.add_child("creating entries"); + InOrderIter::from(iter_from_counts::iter_from_counts( + counts, + topo, + handle, + Box::new(progress), + iter_from_counts::Options { + thread_limit, + chunk_size, + version: Default::default(), + }, + )) + }; + + let mut entries_progress = progress.add_child("consuming"); + entries_progress.init(Some(num_objects), progress::count("entries")); + let mut write_progress = progress.add_child("writing"); + write_progress.init(None, progress::bytes()); + let start = Instant::now(); + + let mut named_tempfile_store: Option = None; + let mut sink_store: std::io::Sink; + let (mut pack_file, output_directory): (&mut dyn std::io::Write, Option<_>) = match output_directory { + Some(dir) => { + named_tempfile_store = Some(tempfile::NamedTempFile::new_in(dir.as_ref())?); + (named_tempfile_store.as_mut().expect("packfile just set"), Some(dir)) + } + None => { + sink_store = std::io::sink(); + (&mut sink_store, None) + } + }; + let mut interruptible_output_iter = interrupt::Iter::new( + pack::data::output::bytes::FromEntriesIter::new( + in_order_entries.by_ref().inspect(|e| { + if let Ok(entries) = e { + entries_progress.inc_by(entries.len()); + } + }), + &mut pack_file, + num_objects as u32, + pack::data::Version::default(), + hash::Kind::default(), + ), + make_cancellation_err, + ); + for io_res in interruptible_output_iter.by_ref() { + let written = io_res??; + write_progress.inc_by(written as usize); + } + + let hash = interruptible_output_iter + .into_inner() + .digest() + .expect("iteration is done"); + let pack_name = format!("{hash}.pack"); + if let (Some(pack_file), Some(dir)) = (named_tempfile_store.take(), output_directory) { + pack_file.persist(dir.as_ref().join(pack_name))?; + } else { + writeln!(out, "{pack_name}")?; + } + stats.entries = in_order_entries.inner.finalize()?; + + write_progress.show_throughput(start); + entries_progress.show_throughput(start); + + if let Some(format) = statistics { + print(stats, format, out)?; + } + progress.inc(); + Ok(()) +} + +fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> anyhow::Result<()> { + match format { + OutputFormat::Human => human_output(stats, out).map_err(Into::into), + #[cfg(feature = "serde")] + OutputFormat::Json => serde_json::to_writer_pretty(out, &stats).map_err(Into::into), + } +} + +fn human_output( + Statistics { + counts: + pack::data::output::count::objects::Outcome { + input_objects, + expanded_objects, + decoded_objects, + total_objects, + }, + entries: + pack::data::output::entry::iter_from_counts::Outcome { + decoded_and_recompressed_objects, + missing_objects, + objects_copied_from_pack, + ref_delta_objects, + }, + }: Statistics, + mut out: impl std::io::Write, +) -> std::io::Result<()> { + let width = 30; + writeln!(out, "counting phase")?; + #[rustfmt::skip] + writeln!( + out, + "\t{:, + /// The amount of objects per chunk or unit of work to be sent to threads for processing + /// TODO: could this become the window size? + pub chunk_size: usize, + /// The pack data version to produce for each entry + pub version: pack::data::Version, + } + + pub fn iter_from_counts( + mut counts: Vec, + topo: HashMap, + db: Find, + mut progress: Box, + Options { + thread_limit, + chunk_size, + version, + }: Options, + ) -> impl Iterator), Error>> + + parallel::reduce::Finalize> + where + Find: pack::Find + Send + Clone + 'static, + { + assert!( + matches!(version, pack::data::Version::V2), + "currently we can only write version 2" + ); + let (chunk_size, thread_limit, _) = + parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None); + { + let progress = Arc::new(parking_lot::Mutex::new( + progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()), + )); + progress.lock().init(None, progress::count("counts")); + let enough_counts_present = counts.len() > 4_000; + let start = std::time::Instant::now(); + parallel::in_parallel_if( + || enough_counts_present, + counts.chunks_mut(chunk_size), + thread_limit, + |_n| Vec::::new(), + { + let progress = Arc::clone(&progress); + let db = db.clone(); + move |chunk, buf| { + let chunk_size = chunk.len(); + for count in chunk { + use pack::data::output::count::PackLocation::*; + match count.entry_pack_location { + LookedUp(_) => continue, + NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)), + } + } + progress.lock().inc_by(chunk_size); + Ok::<_, ()>(()) + } + }, + parallel::reduce::IdentityWithResult::<(), ()>::default(), + ) + .expect("infallible - we ignore none-existing objects"); + progress.lock().show_throughput(start); + } + + let sorted_counts = { + topo_sort(counts.as_mut_slice(), &topo).expect("no loop in delta topo"); + Arc::new(counts) + }; + let progress = Arc::new(parking_lot::Mutex::new(progress)); + let chunks = util::ChunkRanges::new(chunk_size, sorted_counts.len()); + + let oid_index_mapping = Arc::new( + sorted_counts + .iter() + .enumerate() + .map(|(index, count)| (count.id, index)) + .collect::>(), + ); // TODO: rearrange delta solving order or lru to avoid cache peak + parallel::reduce::Stepwise::new( + chunks.enumerate(), + thread_limit, + { + let progress = Arc::clone(&progress); + move |n| { + ( + // Cache entries object ID and offset for packs + std::collections::HashMap::>::new(), + // buffer object data for target + Vec::new(), + // buffer object data for source + Vec::new(), + progress + .lock() + .add_child_with_id(format!("thread {n}"), progress::UNKNOWN), + ) + } + }, + { + let sorted_counts = Arc::clone(&sorted_counts); + let oid_index_mapping = Arc::clone(&oid_index_mapping); + move |(chunk_id, chunk_range): (SequenceId, std::ops::Range), + (pack_index_cache, buf_t, buf_s, progress)| { + let mut out = Vec::new(); + let chunk = &sorted_counts[chunk_range]; + let mut stats = Outcome::default(); + progress.init(Some(chunk.len()), progress::count("objects")); + + for count in chunk.iter() { + let oid = count.id; + let db_find_cached = |oid, buf| db.try_find(oid, buf).map_err(Error::Find); + let entry = if let Some(source_oid) = topo.get(&oid) { + let mut find_existing_delta = || -> Option<_> { + let (_location, pack_entry) = count + .entry_pack_location + .as_ref() + .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))?; + let delta = find_delta( + count, + &pack_entry, + source_oid, + |pack_id, base_offset| { + let offsets_oid_mapping = + pack_index_cache.entry(pack_id).or_insert_with(|| { + db.pack_offsets_and_oid(pack_id) + .map(|mut v| { + v.sort_by_key(|e| e.0); + v + }) + .expect("pack used for counts is still available") + }); + offsets_oid_mapping + .binary_search_by_key(&base_offset, |e| e.0) + .ok() + .map(|idx| offsets_oid_mapping[idx].1) + }, + version, + )?; + Some(entry::from_delta_ref( + count, + delta, + *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), + )) + }; + // Find existing delta + if let Some(entry) = find_existing_delta() { + stats.objects_copied_from_pack += 1; + entry + } + // Build delta + else if let Some((target, _)) = db_find_cached(&oid, buf_t)? { + if let Some((source, _)) = db_find_cached(source_oid, buf_s)? { + let delta_insts = delta_diff::compute_delta(source.data, target.data); + let mut delta_data_buf = Vec::new(); + for inst in delta_insts { + // Panic here because delta algorithm is incorrect, should fast fail + inst.encode(&mut delta_data_buf) + .expect("delta instruction should valid"); + } + entry::from_delta_ref( + count, + delta_data_buf.as_slice(), + *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack + ) + } else { + Ok(output::Entry::invalid()) + } + } else { + Ok(output::Entry::invalid()) + } + } else if let Some((data, _)) = db_find_cached(&oid, buf_t)? { + output::Entry::from_data(count, &data) + } else { + Ok(output::Entry::invalid()) + }?; + out.push(entry); + progress.inc(); + } + Ok((chunk_id, out, stats)) + } + }, + reduce::Statistics::default(), + ) + } + + /// Topological sort `counts` in place, parents first. + /// If there is a loop, returns Err(usize), meaning how many ObjectID are in loops indicated in the `to_parent`. + fn topo_sort( + counts: &mut [output::Count], + to_parent: &std::collections::HashMap, + ) -> Result<(), usize> { + // firstly sort `vertexes` as children first, then reverse `vertexex` + use std::collections::HashMap; + + type CountIndex = usize; + + let n = counts.len(); + if n == 0 { + return Ok(()); + } + + let oid_to_idx: HashMap = counts + .iter() + .enumerate() + .map(|(idx, c)| (c.id.to_owned(), idx)) + .collect(); + + let mut idx_to_child_count: HashMap = (0..n).map(|c| (c, 0)).collect(); + for (child, parent) in to_parent { + let child = oid_to_idx.get(child).unwrap(); + let parent = oid_to_idx.get(parent).unwrap(); + if idx_to_child_count.contains_key(child) { + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count += 1; + } + } + } + + // leaf vertex collection + let mut stack: Vec = idx_to_child_count + .iter() + .filter_map(|(&c, count)| (*count == 0).then_some(c)) + .collect(); + + let mut sorted = Vec::with_capacity(n); + while let Some(curr) = stack.pop() { + if let Some(parent) = to_parent.get(&counts[curr].id) { + let parent = oid_to_idx.get(parent).unwrap(); + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count -= 1; + if *count == 0 { + stack.push(*parent); + } + } + } + sorted.push(curr); + } + + match sorted.len().cmp(&n) { + Ordering::Less => Err(n - sorted.len()), + Ordering::Equal => { + sorted.reverse(); + util::apply_permutation(counts, &sorted); + Ok(()) + } + Ordering::Greater => { + unreachable!("sorted counts should less or equal than all counts") + } + } + } + + fn find_delta<'a>( + count: &output::Count, + entry: &'a pack::find::Entry, + source_oid: &ObjectId, + mut pack_offset_to_oid: impl FnMut(u32, u64) -> Option, + target_version: pack::data::Version, + ) -> Option<&'a [u8]> { + if entry.version != target_version { + return None; + } + + let pack_offset_must_be_zero = 0; + let pack_entry = + pack::data::Entry::from_bytes(&entry.data, pack_offset_must_be_zero, count.id.as_slice().len()).ok()?; + + use pack::data::entry::Header::*; + match pack_entry.header { + OfsDelta { base_distance } => { + let pack_location = count.entry_pack_location.as_ref().expect("packed"); + let base_offset = pack_location + .pack_offset + .checked_sub(base_distance) + .expect("pack-offset - distance is firmly within the pack"); + pack_offset_to_oid(pack_location.pack_id, base_offset) + } + RefDelta { base_id } => Some(base_id), + _ => None, + } + .filter(|id| id == source_oid) + .map(|_| &entry.data[pack_entry.data_offset as usize..]) + } + + // NOTE: copied from gix-pack/src/data/output/entry/mod.rs + mod entry { + use std::io::Write; + + use gix::odb::pack; + + use pack::data::output::{self, entry::Error, Entry}; + + /// Like [`output::Entry::from_base()`], but with type OfsDelta. + /// `delta_data` is encoded instructions. Header with encoded size and will be encoded by `output::Entry::to_entry_header` + /// `object_index` is the absolute index to the object. + pub fn from_delta_ref(count: &output::Count, delta_data: &[u8], object_index: usize) -> Result { + Ok(output::Entry { + id: count.id.to_owned(), + kind: output::entry::Kind::DeltaRef { object_index }, + decompressed_size: delta_data.len(), + compressed_data: { + let mut out = gix_features::zlib::stream::deflate::Write::new(Vec::new()); + if let Err(err) = std::io::copy(&mut &*delta_data, &mut out) { + match err.kind() { + std::io::ErrorKind::Other => return Err(Error::ZlibDeflate(err)), + err => unreachable!("Should never see other errors than zlib, but got {:?}", err), + } + } + out.flush()?; + out.into_inner() + }, + }) + } + } + + mod util { + #[derive(Clone)] + pub struct ChunkRanges { + cursor: usize, + size: usize, + len: usize, + } + + impl ChunkRanges { + pub fn new(size: usize, total: usize) -> Self { + ChunkRanges { + cursor: 0, + size, + len: total, + } + } + } + + impl Iterator for ChunkRanges { + type Item = std::ops::Range; + + fn next(&mut self) -> Option { + if self.cursor >= self.len { + None + } else { + let upper = (self.cursor + self.size).min(self.len); + let range = self.cursor..upper; + self.cursor = upper; + Some(range) + } + } + } + + pub fn apply_permutation(data: &mut [T], indices: &[usize]) { + let n = data.len(); + + // inverse transformation: indices[i] = j => indices[j] = i + let mut inv = vec![0; n]; + for (i, &j) in indices.iter().enumerate() { + inv[j] = i; + } + + for i in 0..n { + while inv[i] != i { + let target = inv[i]; + data.swap(i, target); + inv.swap(i, target); + } + } + } + } + mod reduce { + use std::marker::PhantomData; + + use super::pack::data::output; + use super::Outcome; + use super::{parallel, parallel::SequenceId}; + + pub struct Statistics { + total: Outcome, + _err: PhantomData, + } + + impl Default for Statistics { + fn default() -> Self { + Statistics { + total: Default::default(), + _err: PhantomData, + } + } + } + + impl parallel::Reduce for Statistics { + type Input = Result<(SequenceId, Vec, Outcome), Error>; + type FeedProduce = (SequenceId, Vec); + type Output = Outcome; + type Error = Error; + + fn feed(&mut self, item: Self::Input) -> Result { + item.map(|(cid, entries, stats)| { + // Should reuse Outcome::aggregate, but it's private + self.total.decoded_and_recompressed_objects += stats.decoded_and_recompressed_objects; + self.total.missing_objects += stats.missing_objects; + self.total.objects_copied_from_pack += stats.objects_copied_from_pack; + self.total.ref_delta_objects += stats.ref_delta_objects; + (cid, entries) + }) + } + + fn finalize(self) -> Result { + Ok(self.total) + } + } + } + + mod delta_diff { + use std::io::Write; + + /// Returned when failing to encode deltas. + #[derive(thiserror::Error, Debug)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to write bytes: {0}")] + IOError(std::io::Error), + #[error("Too large offset in Copy instruction, should <= 0xffffffff, got {0}")] + TooLargeOffset(usize), + #[error("Too large size in Copy instruction, should <= 0x00ffffff, got {0}")] + TooLargeSize(usize), + #[error("Too large data in Add instruction, length should <= 127, got {0}")] + TooLargeData(usize), + } + + /// Delta instruction + #[derive(Debug)] + pub enum Instruction<'a> { + /// Copy data from source + Copy { + /// Start position to copy + offset: usize, + /// Data length in bytes + size: usize, + }, + /// Insert bytes embedded in instruction + Add { + /// Data to insert + data: &'a [u8], + }, + } + + impl Instruction<'_> { + /// Encode instruction to bytes. + pub fn encode(self, mut writer: impl Write) -> Result<(), Error> { + match self { + Self::Copy { offset, mut size } => { + let mut header = 0x80u8; + let mut buf = [0u8; 7]; + let mut n = 0; + + if size == 0x10000 { + size = 0; + } else if size > 0x00ffffff { + return Err(Error::TooLargeSize(size)); + } + if offset > 0xffffffff { + return Err(Error::TooLargeOffset(offset)); + } + + for i in 0..4 { + let byte = (offset >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << i; + buf[n] = byte; + n += 1; + } + } + for i in 0..3 { + let byte = (size >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << (4 + i); + buf[n] = byte; + n += 1; + } + } + + writer.write_all(&[header]).map_err(Error::IOError)?; + writer.write_all(&buf[..n]).map_err(Error::IOError)?; + Ok(()) + } + Self::Add { data } => { + if data.len() > 127 { + return Err(Error::TooLargeData(data.len())); + } + + let header = data.len() as u8; + writer.write_all(&[header]).map_err(Error::IOError)?; + writer.write_all(data).map_err(Error::IOError)?; + Ok(()) + } + } + } + } + + /// Calculate delta instructions from `source` to `target`. + pub fn compute_delta<'a>(source: &[u8], target: &'a [u8]) -> Vec> { + // TODO: more efficient + // TODO: more configurable + let mut common_prefix_len: usize = 0; + for (s, t) in source.iter().zip(target) { + if s == t { + common_prefix_len += 1; + } else { + break; + } + } + + let mut insts = Vec::new(); + if common_prefix_len > 0 { + insts.push(Instruction::Copy { + offset: 0, + size: common_prefix_len, + }); + } + for chunk in target[common_prefix_len..].chunks(127) { + insts.push(Instruction::Add { data: chunk }); + } + insts + } + } +} diff --git a/gitoxide-core/src/pack/mod.rs b/gitoxide-core/src/pack/mod.rs index 0805418356c..d2443b19f6d 100644 --- a/gitoxide-core/src/pack/mod.rs +++ b/gitoxide-core/src/pack/mod.rs @@ -10,3 +10,8 @@ pub use receive::receive; pub mod create; pub use create::create; + +#[cfg(feature = "experimental-delta-create")] +pub mod delta_create; +#[cfg(feature = "experimental-delta-create")] +pub use delta_create::delta_create; From 5ad6fee1b62e93799ec042fc9694f9ac23fae090 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 10:46:22 +0800 Subject: [PATCH 02/11] expose delta-create in gix cli --- Cargo.toml | 4 +++ src/plumbing/main.rs | 30 +++++++++++++++++++++++ src/plumbing/options/free.rs | 47 ++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index adb75c8ad8c..51d02b24a7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,10 @@ gitoxide-core-tools-archive = ["gitoxide-core/archive"] ## A sub-command to clean the worktree from untracked and ignored files. gitoxide-core-tools-clean = ["gitoxide-core/clean"] +## A sub-command to create a pack with delta compression. +## NOTE: This is an experimental feature and may change in the future. +gitoxide-core-tools-delta-create = ["gitoxide-core/experimental-delta-create"] + #! ### Building Blocks for mutually exclusive networking #! Blocking and async features are mutually exclusive and cause a compile-time error. This also means that `cargo … --all-features` will fail. #! Within each section, features can be combined. diff --git a/src/plumbing/main.rs b/src/plumbing/main.rs index d02d376762a..5bb0eb63894 100644 --- a/src/plumbing/main.rs +++ b/src/plumbing/main.rs @@ -925,6 +925,36 @@ pub fn main() -> Result<()> { }, ) } + #[cfg(feature = "gitoxide-core-tools-delta-create")] + free::pack::Subcommands::DeltaCreate { + repository, + statistics, + nondeterministic_count, + pack_cache_size_mb, + counting_threads, + object_cache_size_mb, + output_directory, + } => prepare_and_run( + "pack-create", + trace, + verbose, + progress, + progress_keep_open, + core::pack::create::PROGRESS_RANGE, + move |progress, out, _err| { + let input = stdin_or_bail()?; + let repository = repository.unwrap_or_else(|| PathBuf::from(".")); + let context = core::pack::delta_create::Context { + thread_limit, + nondeterministic_thread_count: nondeterministic_count.then_some(counting_threads), + pack_cache_size_in_bytes: pack_cache_size_mb.unwrap_or(0) * 1_000_000, + object_cache_size_in_bytes: object_cache_size_mb.unwrap_or(0) * 1_000_000, + statistics: if statistics { Some(format) } else { None }, + out, + }; + core::pack::delta_create::delta_create(repository, input, output_directory, progress, context) + }, + ), #[cfg(feature = "gitoxide-core-async-client")] free::pack::Subcommands::Receive { protocol, diff --git a/src/plumbing/options/free.rs b/src/plumbing/options/free.rs index 6ce2b59defc..a49b5b03f18 100644 --- a/src/plumbing/options/free.rs +++ b/src/plumbing/options/free.rs @@ -184,6 +184,53 @@ pub mod pack { /// Otherwise the expansion mode is 'tree-traversal' by default. tips: Vec, }, + #[cfg(feature = "gitoxide-core-tools-delta-create")] + /// Create a new pack with customized delta topological relationships. NOTE: This is an experimental feature and may change in the future. + DeltaCreate { + #[clap(long, short = 'r')] + /// the directory containing the '.git' repository from which objects should be read. + repository: Option, + + #[clap(long, default_value_t = 3, requires = "nondeterministic_count")] + /// The amount of threads to use when counting and the `--nondeterminisitc-count` flag is set, defaulting + /// to the globally configured threads. + /// + /// Use it to have different trade-offs between counting performance and cost in terms of CPU, as the scaling + /// here is everything but linear. The effectiveness of each core seems to be no more than 30%. + counting_threads: usize, + + #[clap(long)] + /// if set, the counting phase may be accelerated using multithreading. + /// + /// On the flip side, however, one will loose deterministic counting results which affects the + /// way the resulting pack is structured. + nondeterministic_count: bool, + + #[clap(long, short = 's')] + /// If set statistical information will be presented to inform about pack creation details. + /// It's a form of instrumentation for developers to help improve pack generation. + statistics: bool, + + #[clap(long)] + /// The size in megabytes for a cache to speed up pack access for packs with long delta chains. + /// It is shared among all threads, so 4 threads would use their own cache 1/4th of the size. + /// + /// If unset, no cache will be used. + pack_cache_size_mb: Option, + + #[clap(long)] + /// The size in megabytes for a cache to speed up accessing entire objects, bypassing object database access when hit. + /// It is shared among all threads, so 4 threads would use their own cache 1/4th of the size. + /// + /// This cache type is currently only effective when using the 'diff-tree' object expansion. + /// + /// If unset, no cache will be used. + object_cache_size_mb: Option, + + /// The directory into which to write the pack file. + #[clap(long, short = 'o')] + output_directory: Option, + }, /// Use the gix-protocol to receive a pack, emulating a clone. #[cfg(any(feature = "gitoxide-core-async-client", feature = "gitoxide-core-blocking-client"))] Receive { From 042f89b82772c94e88c7d1501526edaf347a134b Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 21:30:53 +0800 Subject: [PATCH 03/11] fix: eliminate decompress-recompress round-trip in delta_create `from_delta_ref` now accepts pre-compressed data and a separate `decompressed_size`, removing the redundant zlib round-trip. - `find_delta` returns `(&[u8], u64)` borrowing compressed bytes directly from the pack entry - "new delta" path compresses `diff()` output once via deflate::Write - `compute_delta` renamed to `diff`, encodes varint header + instructions into a single buffer - Feature gate `experimental-delta-create` now includes `dep:parking_lot` Co-Authored-By: Claude Opus 4.7 --- gitoxide-core/Cargo.toml | 2 +- gitoxide-core/src/pack/delta_create.rs | 111 ++++++++++++++++--------- 2 files changed, 73 insertions(+), 40 deletions(-) diff --git a/gitoxide-core/Cargo.toml b/gitoxide-core/Cargo.toml index 7a31a734ad6..b520d7287df 100644 --- a/gitoxide-core/Cargo.toml +++ b/gitoxide-core/Cargo.toml @@ -48,7 +48,7 @@ async-client = ["gix/async-network-client-async-std", "gix-transport-configurati serde = ["gix/serde", "dep:serde_json", "dep:serde", "bytesize/serde"] ## The experimental ability to create a pack with delta compression. -experimental-delta-create = ["gix-features"] +experimental-delta-create = ["gix-features", "dep:parking_lot"] [dependencies] # deselect everything else (like "performance") as this should be controllable by the parent application. diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index b72713d92ef..94ede29173e 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -278,7 +278,7 @@ pub mod input_iteration { // NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs mod iter_from_counts { - use std::{cmp::Ordering, collections::HashMap, sync::Arc}; + use std::{cmp::Ordering, collections::HashMap, io::Write, sync::Arc}; use gix::{hash::ObjectId, odb::pack, parallel, parallel::SequenceId, progress, Count, Progress}; @@ -407,7 +407,7 @@ mod iter_from_counts { .entry_pack_location .as_ref() .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))?; - let delta = find_delta( + let (compressed_delta, decompressed_size) = find_delta( count, &pack_entry, source_oid, @@ -430,7 +430,8 @@ mod iter_from_counts { )?; Some(entry::from_delta_ref( count, - delta, + compressed_delta.to_vec(), + decompressed_size as usize, *oid_index_mapping .get(source_oid) .expect("all target and source objects should in ONE pack"), @@ -444,16 +445,17 @@ mod iter_from_counts { // Build delta else if let Some((target, _)) = db_find_cached(&oid, buf_t)? { if let Some((source, _)) = db_find_cached(source_oid, buf_s)? { - let delta_insts = delta_diff::compute_delta(source.data, target.data); - let mut delta_data_buf = Vec::new(); - for inst in delta_insts { - // Panic here because delta algorithm is incorrect, should fast fail - inst.encode(&mut delta_data_buf) - .expect("delta instruction should valid"); - } + let delta_data = delta_diff::diff(source.data, target.data) + .expect("delta diff algorithm should valid"); + let mut deflate = gix_features::zlib::stream::deflate::Write::new(Vec::new()); + std::io::copy(&mut delta_data.as_slice(), &mut deflate) + .map_err(|e| Error::NewEntry(e.into()))?; + deflate.flush().map_err(|e| Error::NewEntry(e.into()))?; + let compressed_delta = deflate.into_inner(); entry::from_delta_ref( count, - delta_data_buf.as_slice(), + compressed_delta, + delta_data.len(), *oid_index_mapping .get(source_oid) .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack @@ -545,13 +547,15 @@ mod iter_from_counts { } } + /// Returns `(compressed_delta_data, decompressed_size)` if the pack entry is a delta pointing to `source_oid`. + /// The compressed data is borrowed as-is from the pack (no decompression/recompression round-trip). fn find_delta<'a>( count: &output::Count, - entry: &'a pack::find::Entry, + entry: &'a pack::find::Entry, // FIXIT: use `db: Find` instead of `Entry` source_oid: &ObjectId, mut pack_offset_to_oid: impl FnMut(u32, u64) -> Option, target_version: pack::data::Version, - ) -> Option<&'a [u8]> { + ) -> Option<(&'a [u8], u64)> { if entry.version != target_version { return None; } @@ -561,7 +565,7 @@ mod iter_from_counts { pack::data::Entry::from_bytes(&entry.data, pack_offset_must_be_zero, count.id.as_slice().len()).ok()?; use pack::data::entry::Header::*; - match pack_entry.header { + let source_matches = match pack_entry.header { OfsDelta { base_distance } => { let pack_location = count.entry_pack_location.as_ref().expect("packed"); let base_offset = pack_location @@ -573,37 +577,38 @@ mod iter_from_counts { RefDelta { base_id } => Some(base_id), _ => None, } - .filter(|id| id == source_oid) - .map(|_| &entry.data[pack_entry.data_offset as usize..]) + .filter(|id| id == source_oid); + + if source_matches.is_none() { + return None; + } + + let compressed = &entry.data[pack_entry.data_offset as usize..]; + Some((compressed, pack_entry.decompressed_size)) } // NOTE: copied from gix-pack/src/data/output/entry/mod.rs mod entry { - use std::io::Write; - use gix::odb::pack; use pack::data::output::{self, entry::Error, Entry}; /// Like [`output::Entry::from_base()`], but with type OfsDelta. - /// `delta_data` is encoded instructions. Header with encoded size and will be encoded by `output::Entry::to_entry_header` - /// `object_index` is the absolute index to the object. - pub fn from_delta_ref(count: &output::Count, delta_data: &[u8], object_index: usize) -> Result { + /// `compressed_delta_data` is already zlib-compressed delta data (header + instructions). + /// `decompressed_size` is the size of the delta data after decompression. + /// `object_index` is the absolute index to the base object. + pub fn from_delta_ref( + count: &output::Count, + compressed_delta_data: Vec, + decompressed_size: usize, + object_index: usize, + ) -> Result { + // FIXIT: too trivial to exists as a function Ok(output::Entry { id: count.id.to_owned(), kind: output::entry::Kind::DeltaRef { object_index }, - decompressed_size: delta_data.len(), - compressed_data: { - let mut out = gix_features::zlib::stream::deflate::Write::new(Vec::new()); - if let Err(err) = std::io::copy(&mut &*delta_data, &mut out) { - match err.kind() { - std::io::ErrorKind::Other => return Err(Error::ZlibDeflate(err)), - err => unreachable!("Should never see other errors than zlib, but got {:?}", err), - } - } - out.flush()?; - out.into_inner() - }, + decompressed_size, + compressed_data: compressed_delta_data, }) } } @@ -790,10 +795,27 @@ mod iter_from_counts { } } - /// Calculate delta instructions from `source` to `target`. - pub fn compute_delta<'a>(source: &[u8], target: &'a [u8]) -> Vec> { - // TODO: more efficient - // TODO: more configurable + /// Encode a variable-length integer for the delta header (7 bits per byte, MSB = continuation). + fn encode_delta_varint(mut value: usize, buf: &mut impl Write) -> Result<(), Error> { + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value > 0 { + byte |= 0x80; + } + buf.write_all(&[byte]).map_err(Error::IOError)?; + if value == 0 { + break; + } + } + Ok(()) + } + + /// Calculate delta from `source` to `target`, returning the instructions + /// (header + instructions) as expected by the Git pack format. + fn compute_delta<'a>(source: &[u8], target: &'a [u8]) -> Vec> { + let mut insts = Vec::new(); + let mut common_prefix_len: usize = 0; for (s, t) in source.iter().zip(target) { if s == t { @@ -802,18 +824,29 @@ mod iter_from_counts { break; } } - - let mut insts = Vec::new(); if common_prefix_len > 0 { insts.push(Instruction::Copy { offset: 0, size: common_prefix_len, }); } + for chunk in target[common_prefix_len..].chunks(127) { insts.push(Instruction::Add { data: chunk }); } insts } + + /// Calculate delta from `source` to `target`, returning the instructions + /// (header + instructions) as expected by the Git pack format. + pub fn diff(source: &[u8], target: &[u8]) -> Result, Error> { + let mut delta_data = Vec::new(); + encode_delta_varint(source.len(), &mut delta_data)?; + encode_delta_varint(target.len(), &mut delta_data)?; + for inst in compute_delta(source, target) { + inst.encode(&mut delta_data)?; + } + Ok(delta_data) + } } } From 8ecf7542ec6cd66ec4b32dc90f3d9a108b281beb Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 21:32:26 +0800 Subject: [PATCH 04/11] test: add journey tests for delta-create subcommand Covers three code paths: new delta (compute_delta), new base (from_data), and reuse delta (find_delta). Uses a dedicated `kind="delta-create"` to avoid modifying existing journey tests. Co-Authored-By: Claude Opus 4.7 --- justfile | 6 ++ tests/helpers.sh | 24 +++++++ tests/journey/gix.sh | 64 +++++++++++++++++++ .../no-repo/delta-create/new-base-success | 1 + .../new-delta-no-output-dir-success | 1 + .../delta-create/new-delta-output-dir-content | 1 + .../delta-create/new-delta-output-dir-success | 0 .../new-delta-statistics-json-success | 15 +++++ .../delta-create/new-delta-statistics-success | 11 ++++ .../no-repo/delta-create/reuse-delta-success | 1 + 10 files changed, 124 insertions(+) create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-base-success create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-success create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success create mode 100644 tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success diff --git a/justfile b/justfile index e4acd89388d..46569b44a7a 100755 --- a/justfile +++ b/justfile @@ -292,6 +292,12 @@ journey-tests-async: cargo build -p gix-testtools dbg="$({{ j }} dbg)" && tests/journey.sh "$dbg/ein" "$dbg/gix" "$dbg/jtt" async +# Run journey tests with delta-create feature enabled +journey-tests-delta-create: + cargo build --no-default-features --features max-pure,gitoxide-core-tools-delta-create + cargo build -p gix-testtools --bin jtt + dbg="$({{ j }} dbg)" && tests/journey.sh "$dbg/ein" "$dbg/gix" "$dbg/jtt" delta-create + # Build a customized `cross` container image for testing cross-image target: docker build --build-arg "TARGET={{ target }}" \ diff --git a/tests/helpers.sh b/tests/helpers.sh index e0994f25278..b43e7d5b5ae 100644 --- a/tests/helpers.sh +++ b/tests/helpers.sh @@ -59,6 +59,30 @@ function small-repo-in-sandbox() { } &>/dev/null } +# Creates a repo with 4 versions of a file (blob#0..#3) in 50-line increments. +# Aggressive repacking forces Git to build a delta chain. +# blob#3 (latest/largest) becomes the base, with others as sequential deltas. +function pack-repo-in-sandbox() { + sandbox + { + git init + git checkout -b main + git config commit.gpgsign false + git config tag.gpgsign false + + for i in 0 1 2 3; do + python3 -c " +for j in range(50 * ($i + 1)): + print(f'line {j}') +" > file.txt + git add file.txt + git commit -m "blob#${i}" + done + + git repack -a -d --window=10 --depth=4 + } &>/dev/null +} + function launch-git-daemon() { local i git_daemon_url_file diff --git a/tests/journey/gix.sh b/tests/journey/gix.sh index 4ab6ebe1d5b..3b42bc09578 100644 --- a/tests/journey/gix.sh +++ b/tests/journey/gix.sh @@ -200,6 +200,7 @@ title "gix commit-graph" (with "gix free" snapshot="$snapshot/no-repo" title "gix free pack" + (if test "$kind" != "delta-create"; then (when "running 'pack'" snapshot="$snapshot/pack" @@ -702,4 +703,67 @@ title "gix commit-graph" ) ) ) + fi) + + title "gix free pack delta-create" + (if test "$kind" = "delta-create"; then + (with "the 'delta-create' sub-command" + snapshot="$snapshot/delta-create" + (pack-repo-in-sandbox + REPO_DIR="$PWD" + BLOB0=$(git rev-parse HEAD~3:file.txt) + BLOB1=$(git rev-parse HEAD~2:file.txt) + BLOB2=$(git rev-parse HEAD~1:file.txt) + BLOB3=$(git rev-parse HEAD:file.txt) + + (with "new delta (blob#1 -> blob#0)" + (with "no output directory" + it "prints the pack name" && { + WITH_SNAPSHOT="$snapshot/new-delta-no-output-dir-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + (with "output directory" + (sandbox + mkdir out + it "creates a pack file" && { + WITH_SNAPSHOT="$snapshot/new-delta-output-dir-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -r '$REPO_DIR' -o out/" + } + it "writes a .pack file to the output directory" && { + WITH_SNAPSHOT="$snapshot/new-delta-output-dir-content" \ + expect_run $SUCCESSFULLY ls out/ + } + ) + ) + (with "statistics" + it "shows statistics" && { + WITH_SNAPSHOT="$snapshot/new-delta-statistics-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -s" + } + ) + (with "--format json" + it "outputs statistics as JSON" && { + WITH_SNAPSHOT="$snapshot/new-delta-statistics-json-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB1 $BLOB0' '$BLOB0' | '$exe_plumbing' --no-verbose --format json free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1 -s" + } + ) + ) + + (with "new base (blob#0)" + it "stores as a full object" && { + WITH_SNAPSHOT="$snapshot/new-base-success" \ + expect_run $SUCCESSFULLY bash -c "echo '$BLOB0' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + + (with "reuse delta (blob#0 -> blob#3)" + it "copies the existing delta" && { + WITH_SNAPSHOT="$snapshot/reuse-delta-success" \ + expect_run $SUCCESSFULLY bash -c "printf '%s\\n%s\\n' '$BLOB0 $BLOB3' '$BLOB3' | '$exe_plumbing' --no-verbose free pack delta-create --pack-cache-size-mb 1 --object-cache-size-mb 1" + } + ) + ) + ) + fi) ) diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-base-success b/tests/snapshots/plumbing/no-repo/delta-create/new-base-success new file mode 100644 index 00000000000..6e457eb3a5c --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-base-success @@ -0,0 +1 @@ +b4628a57c2c40d9b4f209b8322026f0ff5767864.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success new file mode 100644 index 00000000000..d59f983b528 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-no-output-dir-success @@ -0,0 +1 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content new file mode 100644 index 00000000000..d59f983b528 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-content @@ -0,0 +1 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-output-dir-success new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success new file mode 100644 index 00000000000..1d726c292c0 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-json-success @@ -0,0 +1,15 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack +{ + "counts": { + "input_objects": 2, + "expanded_objects": 0, + "decoded_objects": 2, + "total_objects": 2 + }, + "entries": { + "decoded_and_recompressed_objects": 0, + "missing_objects": 0, + "objects_copied_from_pack": 0, + "ref_delta_objects": 0 + } +} \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success new file mode 100644 index 00000000000..c116c6f0c26 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/new-delta-statistics-success @@ -0,0 +1,11 @@ +a2c6eb76193fba6f5993ff94f0e9d64e59c27535.pack +counting phase + input objects 2 + expanded objects 0 + decoded objects 2 + total objects 2 +generation phase + decoded and recompressed 0 + pack-to-pack copies 0 + ref-delta-objects 0 + missing objects 0 \ No newline at end of file diff --git a/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success b/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success new file mode 100644 index 00000000000..a67ca617424 --- /dev/null +++ b/tests/snapshots/plumbing/no-repo/delta-create/reuse-delta-success @@ -0,0 +1 @@ +d566a17c88859ad426bb2d5e5c20e34819a0789b.pack \ No newline at end of file From 8067318fd83596fde85f4397607fa9b9c82ecd4b Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 21:37:30 +0800 Subject: [PATCH 05/11] refactor: inline trivial `from_delta_ref` into call sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `from_delta_ref` function was a one-liner wrapper around `output::Entry` construction — too trivial to warrant its own function and `mod entry` block. Co-Authored-By: Claude Opus 4.7 --- gitoxide-core/src/pack/delta_create.rs | 62 +++++++++----------------- 1 file changed, 20 insertions(+), 42 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index 94ede29173e..75f82c1bc74 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -428,14 +428,16 @@ mod iter_from_counts { }, version, )?; - Some(entry::from_delta_ref( - count, - compressed_delta.to_vec(), - decompressed_size as usize, - *oid_index_mapping - .get(source_oid) - .expect("all target and source objects should in ONE pack"), - )) + Some(Ok(output::Entry { + id: oid.to_owned(), + kind: output::entry::Kind::DeltaRef { + object_index: *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), + }, + decompressed_size: decompressed_size as usize, + compressed_data: compressed_delta.to_vec(), + })) }; // Find existing delta if let Some(entry) = find_existing_delta() { @@ -452,14 +454,16 @@ mod iter_from_counts { .map_err(|e| Error::NewEntry(e.into()))?; deflate.flush().map_err(|e| Error::NewEntry(e.into()))?; let compressed_delta = deflate.into_inner(); - entry::from_delta_ref( - count, - compressed_delta, - delta_data.len(), - *oid_index_mapping - .get(source_oid) - .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack - ) + Ok(output::Entry { + id: oid.to_owned(), + kind: output::entry::Kind::DeltaRef { + object_index: *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack + }, + decompressed_size: delta_data.len(), + compressed_data: compressed_delta, + }) } else { Ok(output::Entry::invalid()) } @@ -587,32 +591,6 @@ mod iter_from_counts { Some((compressed, pack_entry.decompressed_size)) } - // NOTE: copied from gix-pack/src/data/output/entry/mod.rs - mod entry { - use gix::odb::pack; - - use pack::data::output::{self, entry::Error, Entry}; - - /// Like [`output::Entry::from_base()`], but with type OfsDelta. - /// `compressed_delta_data` is already zlib-compressed delta data (header + instructions). - /// `decompressed_size` is the size of the delta data after decompression. - /// `object_index` is the absolute index to the base object. - pub fn from_delta_ref( - count: &output::Count, - compressed_delta_data: Vec, - decompressed_size: usize, - object_index: usize, - ) -> Result { - // FIXIT: too trivial to exists as a function - Ok(output::Entry { - id: count.id.to_owned(), - kind: output::entry::Kind::DeltaRef { object_index }, - decompressed_size, - compressed_data: compressed_delta_data, - }) - } - } - mod util { #[derive(Clone)] pub struct ChunkRanges { From 1dfa61cbe8d169759aba4fafd1649d5dedd6b43f Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 21:50:37 +0800 Subject: [PATCH 06/11] refactor: move count-to-entry into find_delta --- gitoxide-core/src/pack/delta_create.rs | 29 +++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index 75f82c1bc74..bd497a3992f 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -403,13 +403,9 @@ mod iter_from_counts { let db_find_cached = |oid, buf| db.try_find(oid, buf).map_err(Error::Find); let entry = if let Some(source_oid) = topo.get(&oid) { let mut find_existing_delta = || -> Option<_> { - let (_location, pack_entry) = count - .entry_pack_location - .as_ref() - .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))?; - let (compressed_delta, decompressed_size) = find_delta( + let (compressed_data, decompressed_size) = find_delta( count, - &pack_entry, + &db, source_oid, |pack_id, base_offset| { let offsets_oid_mapping = @@ -435,8 +431,8 @@ mod iter_from_counts { .get(source_oid) .expect("all target and source objects should in ONE pack"), }, - decompressed_size: decompressed_size as usize, - compressed_data: compressed_delta.to_vec(), + decompressed_size, + compressed_data, })) }; // Find existing delta @@ -552,14 +548,19 @@ mod iter_from_counts { } /// Returns `(compressed_delta_data, decompressed_size)` if the pack entry is a delta pointing to `source_oid`. - /// The compressed data is borrowed as-is from the pack (no decompression/recompression round-trip). - fn find_delta<'a>( + /// The compressed data is extracted as-is from the pack (no decompression/recompression round-trip). + fn find_delta( count: &output::Count, - entry: &'a pack::find::Entry, // FIXIT: use `db: Find` instead of `Entry` + db: &impl pack::Find, source_oid: &ObjectId, mut pack_offset_to_oid: impl FnMut(u32, u64) -> Option, target_version: pack::data::Version, - ) -> Option<(&'a [u8], u64)> { + ) -> Option<(Vec, usize)> { + let entry = count + .entry_pack_location + .as_ref() + .and_then(|l| db.entry_by_location(l))?; + if entry.version != target_version { return None; } @@ -587,8 +588,8 @@ mod iter_from_counts { return None; } - let compressed = &entry.data[pack_entry.data_offset as usize..]; - Some((compressed, pack_entry.decompressed_size)) + let compressed = entry.data[pack_entry.data_offset as usize..].to_vec(); + Some((compressed, pack_entry.decompressed_size as usize)) } mod util { From aac6ba9fe7a596f1ca7937bfd616e4b913c60559 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 30 Apr 2026 21:57:26 +0800 Subject: [PATCH 07/11] add comments for copy-paste actions --- gitoxide-core/src/pack/delta_create.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index bd497a3992f..0bfb56ac698 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -1,4 +1,3 @@ -// NOTE: copied from create.rs use std::{collections::HashMap, io, path::Path, time::Instant}; use anyhow::anyhow; @@ -10,6 +9,8 @@ use gix::{ use crate::OutputFormat; /// A general purpose context for many operations provided here +/// +/// NOTE: copied from create.rs pub struct Context { /// If `Some(threads)`, use this amount of `threads` to accelerate the counting phase at the cost of losing /// determinism as the order of objects during expansion changes with multiple threads unless no expansion is performed. @@ -38,6 +39,7 @@ pub struct Context { pub out: W, } +/// NOTE: part copied from create.rs pub fn delta_create( repository_path: impl AsRef, input: impl io::BufRead + Send + 'static, @@ -203,6 +205,7 @@ where Ok(()) } +/// NOTE: copied from create.rs fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> anyhow::Result<()> { match format { OutputFormat::Human => human_output(stats, out).map_err(Into::into), @@ -211,6 +214,7 @@ fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> a } } +/// NOTE: copied from create.rs fn human_output( Statistics { counts: @@ -263,20 +267,6 @@ struct Statistics { entries: pack::data::output::entry::iter_from_counts::Outcome, } -pub mod input_iteration { - use gix::{hash, traverse}; - #[derive(Debug, thiserror::Error)] - pub enum Error { - #[error("input objects couldn't be iterated completely")] - Iteration(#[from] traverse::commit::simple::Error), - #[error("An error occurred while reading hashes from standard input")] - InputLinesIo(#[from] std::io::Error), - #[error("Could not decode hex hash provided on standard input")] - HashDecode(#[from] hash::decode::Error), - } -} - -// NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs mod iter_from_counts { use std::{cmp::Ordering, collections::HashMap, io::Write, sync::Arc}; @@ -300,6 +290,7 @@ mod iter_from_counts { pub version: pack::data::Version, } + // NOTE: part copied from gix-pack/src/data/output/entry/iter_from_counts.rs pub fn iter_from_counts( mut counts: Vec, topo: HashMap, @@ -592,6 +583,7 @@ mod iter_from_counts { Some((compressed, pack_entry.decompressed_size as usize)) } + /// NOTE: except `apply_permutation`, copied from gix-pack/src/data/output/entry/iter_from_counts.rs mod util { #[derive(Clone)] pub struct ChunkRanges { @@ -643,6 +635,7 @@ mod iter_from_counts { } } } + /// NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs mod reduce { use std::marker::PhantomData; From 937d69cbd162c0f06c0af5b68ac122d065972138 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Fri, 1 May 2026 09:06:57 +0800 Subject: [PATCH 08/11] test(justfile): add journey-tests-delta-create to test and ci-journey-tests --- justfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/justfile b/justfile index 46569b44a7a..c3b63c67ce0 100755 --- a/justfile +++ b/justfile @@ -13,13 +13,13 @@ alias c := check alias nt := nextest # Run all tests, clippy, including journey tests, try building docs -test: clippy check doc unit-tests doc-tests journey-tests-pure journey-tests-small journey-tests-async journey-tests check-mode +test: clippy check doc unit-tests doc-tests journey-tests-pure journey-tests-small journey-tests-async journey-tests check-mode journey-tests-delta-create # Run all tests, without clippy, and try building docs ci-test: check doc unit-tests check-mode # Run all journey tests - should be run in a fresh clone or after `cargo clean` -ci-journey-tests: journey-tests-pure journey-tests-small journey-tests-async journey-tests +ci-journey-tests: journey-tests-pure journey-tests-small journey-tests-async journey-tests journey-tests-delta-create # Clean the `target` directory clear-target: From 37f19f88c1a6b30a2f67d1bf0e54b4fa02c4b92b Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Fri, 1 May 2026 10:27:52 +0800 Subject: [PATCH 09/11] add more comments to help review --- gitoxide-core/src/pack/delta_create.rs | 32 ++++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index 0bfb56ac698..b899b178266 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -10,7 +10,7 @@ use crate::OutputFormat; /// A general purpose context for many operations provided here /// -/// NOTE: copied from create.rs +/// NOTE: copied from create.rs, but removed `expansion` and `thin` fields pub struct Context { /// If `Some(threads)`, use this amount of `threads` to accelerate the counting phase at the cost of losing /// determinism as the order of objects during expansion changes with multiple threads unless no expansion is performed. @@ -39,7 +39,10 @@ pub struct Context { pub out: W, } -/// NOTE: part copied from create.rs +/// NOTE: copied from create.rs, but: +/// - rewrite `input` transform +/// - remove parallel speedup in building `counts` +/// - use rewriten `iter_from_counts` pub fn delta_create( repository_path: impl AsRef, input: impl io::BufRead + Send + 'static, @@ -260,6 +263,7 @@ fn human_output( Ok(()) } +/// NOTE: copied from create.rs #[derive(Default)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] struct Statistics { @@ -290,7 +294,8 @@ mod iter_from_counts { pub version: pack::data::Version, } - // NOTE: part copied from gix-pack/src/data/output/entry/iter_from_counts.rs + // NOTE: copied from gix-pack/src/data/output/entry/iter_from_counts.rs, + // but rewrote parameters in parallel::reduce::Stepwise::new pub fn iter_from_counts( mut counts: Vec, topo: HashMap, @@ -474,11 +479,15 @@ mod iter_from_counts { /// Topological sort `counts` in place, parents first. /// If there is a loop, returns Err(usize), meaning how many ObjectID are in loops indicated in the `to_parent`. + /// + /// # Panics + /// + /// Panics if any ObjectId in `to_parent` is not present in `counts`, which would indicate + /// that the parent-child relationship map references objects outside the given count set. fn topo_sort( counts: &mut [output::Count], to_parent: &std::collections::HashMap, ) -> Result<(), usize> { - // firstly sort `vertexes` as children first, then reverse `vertexex` use std::collections::HashMap; type CountIndex = usize; @@ -488,16 +497,20 @@ mod iter_from_counts { return Ok(()); } + // Firstly sort `vertexes` as children first via Kahn method... let oid_to_idx: HashMap = counts .iter() .enumerate() .map(|(idx, c)| (c.id.to_owned(), idx)) .collect(); - let mut idx_to_child_count: HashMap = (0..n).map(|c| (c, 0)).collect(); for (child, parent) in to_parent { - let child = oid_to_idx.get(child).unwrap(); - let parent = oid_to_idx.get(parent).unwrap(); + let child = oid_to_idx + .get(child) + .expect("child ObjectId in to_parent should exist in counts"); + let parent = oid_to_idx + .get(parent) + .expect("parent ObjectId in to_parent should exist in counts"); if idx_to_child_count.contains_key(child) { if let Some(count) = idx_to_child_count.get_mut(parent) { *count += 1; @@ -505,12 +518,10 @@ mod iter_from_counts { } } - // leaf vertex collection let mut stack: Vec = idx_to_child_count .iter() - .filter_map(|(&c, count)| (*count == 0).then_some(c)) + .filter_map(|(&c, count)| (*count == 0).then_some(c)) // Collect leaf vertex .collect(); - let mut sorted = Vec::with_capacity(n); while let Some(curr) = stack.pop() { if let Some(parent) = to_parent.get(&counts[curr].id) { @@ -528,6 +539,7 @@ mod iter_from_counts { match sorted.len().cmp(&n) { Ordering::Less => Err(n - sorted.len()), Ordering::Equal => { + // ...then reverse `vertexex`, and returns as parents first sorted.reverse(); util::apply_permutation(counts, &sorted); Ok(()) From 58d8e70f94929134651e9a0cd087b12c87da05a0 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Sat, 2 May 2026 09:10:19 +0800 Subject: [PATCH 10/11] parallel in building `counts` The `count::objects()` API requires `Box` which implicitly has a `'static` lifetime bound. The previous implementation used `.inspect()` to populate the `topo` HashMap lazily within the iterator chain, allowing input parsing and counting to overlap. Boxing this iterator as `'static` conflicts with `topo`'s non-static borrow. As a workaround, eagerly collect all parsed input into a Vec first, populate `topo` in a separate pass, then create the boxed iterator from the collected data. This trades the lazy streaming for an extra allocation. Co-Authored-By: Claude Opus 4.7 --- gitoxide-core/src/pack/delta_create.rs | 109 +++++++++++++++---------- 1 file changed, 66 insertions(+), 43 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index b899b178266..f7ef82b87d4 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -41,7 +41,6 @@ pub struct Context { /// NOTE: copied from create.rs, but: /// - rewrite `input` transform -/// - remove parallel speedup in building `counts` /// - use rewriten `iter_from_counts` pub fn delta_create( repository_path: impl AsRef, @@ -49,6 +48,7 @@ pub fn delta_create( output_directory: Option>, mut progress: P, Context { + nondeterministic_thread_count, thread_limit, statistics, pack_cache_size_in_bytes, @@ -62,71 +62,94 @@ where P: NestedProgress, P::SubProgress: 'static, { - type ObjectIdIter<'a> = - dyn Iterator), Box>> + Send + 'a; - let repo = gix::discover(repository_path)?.into_sync(); progress.init(Some(2), progress::steps()); let make_cancellation_err = || anyhow!("Cancelled by user"); let mut topo = HashMap::new(); - let (mut handle, input): (_, Box>) = { + let parsed_input: Vec), Box>> = { let mut progress = progress.add_child("iterating"); progress.init(None, progress::count("objects")); - let handle = repo.objects.into_shared_arc().to_cache_arc(); - ( - handle, - Box::new( - input - .lines() - .map(|line| { - line.map_err(|err| Box::new(err) as Box<_>).and_then(|line| { - let hex2oid = |hex: &str| { - ObjectId::from_hex(hex.as_bytes()) - .map_err(Into::>::into) - }; - if let Some((target, source)) = line.split_once(' ') { - Ok((hex2oid(target)?, Some(hex2oid(source)?))) - } else { - Ok((hex2oid(&line)?, None)) - } - }) - }) - .inspect(move |_| progress.inc()) - .inspect(|res| { - if let Ok((target, Some(source))) = res { - topo.insert(target.clone(), source.clone()); - } - }), - ), - ) + input + .lines() + .map(|line| { + line.map_err(|err| Box::new(err) as Box<_>).and_then(|line| { + let hex2oid = |hex: &str| { + ObjectId::from_hex(hex.as_bytes()) + .map_err(Into::>::into) + }; + if let Some((target, source)) = line.split_once(' ') { + Ok((hex2oid(target)?, Some(hex2oid(source)?))) + } else { + Ok((hex2oid(&line)?, None)) + } + }) + }) + .inspect(move |_| progress.inc()) + .collect() }; + for res in &parsed_input { + if let Ok((target, Some(source))) = res { + topo.insert(target.clone(), source.clone()); + } + } + let mut handle = repo.objects.into_shared_arc().to_cache_arc(); + let mut input: Box>> + Send> = + Box::new(parsed_input.into_iter().map(|res| res.map(|(target, _)| target))); let mut stats = Statistics::default(); let chunk_size = 1000; // What's a good value for this? let counts = { - // TODO: parallel speedup let mut progress = progress.add_child("counting"); progress.init(None, progress::count("objects")); + let may_use_multiple_threads = nondeterministic_thread_count.is_some(); + let thread_limit = if may_use_multiple_threads { + nondeterministic_thread_count.or(thread_limit) + } else { + Some(1) + }; + if nondeterministic_thread_count.is_some() && !may_use_multiple_threads { + progress.fail("Cannot use multi-threaded counting in tree-diff object expansion mode as it may yield way too many objects.".into()); + } + let (_, _, thread_count) = gix::parallel::optimize_chunk_size_and_thread_limit(50, None, thread_limit, None); let progress = progress::ThroughputOnDrop::new(progress); { - handle - .set_pack_cache(move || Box::new(pack::cache::lru::MemoryCappedHashmap::new(pack_cache_size_in_bytes))); + // Maybe should disable cache in some cases + handle.set_pack_cache(move || { + Box::new(pack::cache::lru::MemoryCappedHashmap::new( + pack_cache_size_in_bytes / thread_count, + )) + }); handle.set_object_cache(move || { Box::new(pack::cache::object::MemoryCappedHashmap::new( - object_cache_size_in_bytes, + object_cache_size_in_bytes / thread_count, )) }); } handle.prevent_pack_unload(); handle.ignore_replacements = true; - let (mut counts, count_stats) = pack::data::output::count::objects_unthreaded( - &handle, - &mut input.map(|res| res.map(|(target, _)| target)), - &progress, - &interrupt::IS_INTERRUPTED, - pack::data::output::count::objects::ObjectExpansion::AsIs, - )?; + let input_object_expansion = pack::data::output::count::objects::ObjectExpansion::AsIs; + let (mut counts, count_stats) = if may_use_multiple_threads { + pack::data::output::count::objects( + handle.clone(), + input, + &progress, + &interrupt::IS_INTERRUPTED, + pack::data::output::count::objects::Options { + thread_limit, + chunk_size, + input_object_expansion, + }, + )? + } else { + pack::data::output::count::objects_unthreaded( + &handle, + &mut input, + &progress, + &interrupt::IS_INTERRUPTED, + input_object_expansion, + )? + }; stats.counts = count_stats; counts.shrink_to_fit(); counts From c9697f689f6cc11cb6049b32ea160b4cfcf122a4 Mon Sep 17 00:00:00 2001 From: HairlessVillager Date: Thu, 14 May 2026 15:56:49 +0800 Subject: [PATCH 11/11] Fix format --- gitoxide-core/src/pack/delta_create.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gitoxide-core/src/pack/delta_create.rs b/gitoxide-core/src/pack/delta_create.rs index f7ef82b87d4..c2bc08adb8a 100644 --- a/gitoxide-core/src/pack/delta_create.rs +++ b/gitoxide-core/src/pack/delta_create.rs @@ -2,8 +2,8 @@ use std::{collections::HashMap, io, path::Path, time::Instant}; use anyhow::anyhow; use gix::{ - hash, hash::ObjectId, interrupt, odb::pack, parallel::InOrderIter, prelude::Finalize, progress, Count, - NestedProgress, Progress, + Count, NestedProgress, Progress, hash, hash::ObjectId, interrupt, odb::pack, parallel::InOrderIter, + prelude::Finalize, progress, }; use crate::OutputFormat; @@ -297,7 +297,7 @@ struct Statistics { mod iter_from_counts { use std::{cmp::Ordering, collections::HashMap, io::Write, sync::Arc}; - use gix::{hash::ObjectId, odb::pack, parallel, parallel::SequenceId, progress, Count, Progress}; + use gix::{Count, Progress, hash::ObjectId, odb::pack, parallel, parallel::SequenceId, progress}; use pack::data::output::{ self, @@ -330,7 +330,7 @@ mod iter_from_counts { version, }: Options, ) -> impl Iterator), Error>> - + parallel::reduce::Finalize> + + parallel::reduce::Finalize> where Find: pack::Find + Send + Clone + 'static, { @@ -674,8 +674,8 @@ mod iter_from_counts { mod reduce { use std::marker::PhantomData; - use super::pack::data::output; use super::Outcome; + use super::pack::data::output; use super::{parallel, parallel::SequenceId}; pub struct Statistics {