From c308380474b4771625d27a56636ffe3f423f6ef5 Mon Sep 17 00:00:00 2001 From: vmandke Date: Sat, 9 May 2026 23:02:28 +0530 Subject: [PATCH 1/2] gix-pack: replace in_parallel_with_slice with a global queue for delta resolution Replaces: resolve::deltas Modifies: resolve::delta_mt Previously each thread was assigned a fixed subtree via in_parallel_with_slice, causing load imbalance when one root had a deep chain of deltas. Thread synchronization also used a busy wait of poll_interval. This PR rewrites deltas_mt to use a Workqueue with Condvar blocking, so all threads pull work evenly regardless of tree shape. Co-authored-by: Claude Code --- gix-pack/src/cache/delta/traverse/mod.rs | 62 +-- gix-pack/src/cache/delta/traverse/resolve.rs | 449 +++++++------------ 2 files changed, 173 insertions(+), 338 deletions(-) diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index e43b99a711d..04fcc0eac0b 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -1,7 +1,6 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use gix_features::{ - parallel::in_parallel_with_slice, progress::{self, DynNestedProgress, Progress}, threading, threading::{Mutable, OwnShared}, @@ -139,47 +138,24 @@ where let (mut root_items, mut child_items_vec) = self.take_root_and_child(); let child_items = ItemSliceSync::new(&mut child_items_vec); let child_items = &child_items; - in_parallel_with_slice( - &mut root_items, - thread_limit, - { - { - let object_progress = object_progress.clone(); - move |thread_index| resolve::State { - delta_bytes: Vec::::with_capacity(4096), - fully_resolved_delta_bytes: Vec::::with_capacity(4096), - progress: Box::new( - threading::lock(&object_progress).add_child(format!("thread {thread_index}")), - ), - resolve: resolve.clone(), - modify_base: inspect_object.clone(), - child_items, - } - } - }, - { - move |node, state, threads_left, should_interrupt| { - // SAFETY: This invariant is upheld since `child_items` and `node` come from the same Tree. - // This means we can rely on Tree's invariant that node.children will be the only `children` array in - // for nodes in this tree that will contain any of those children. - #[allow(unsafe_code)] - unsafe { - resolve::deltas( - object_counter.clone(), - size_counter.clone(), - node, - state, - resolve_data, - object_hash.len_in_bytes(), - threads_left, - should_interrupt, - ) - } - } - }, - || (!should_interrupt.load(Ordering::Relaxed)).then(|| std::time::Duration::from_millis(50)), - |_| (), - )?; + + // SAFETY: root_items and child_items come from the same Tree — the Tree invariant holds. + #[allow(unsafe_code)] + unsafe { + resolve::resolve_all_nodes( + &mut root_items, + child_items, + thread_limit, + object_counter, + size_counter, + &gix_features::progress::Discard, + resolve, + resolve_data, + inspect_object, + object_hash.len_in_bytes(), + should_interrupt, + )?; + } threading::lock(&object_progress).show_throughput(start); size_progress.show_throughput(start); diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index d93780f222d..e7c226e60b0 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -1,9 +1,6 @@ -use std::{ - collections::BTreeMap, - sync::atomic::{AtomicBool, AtomicIsize, Ordering}, -}; +use std::sync::atomic::{AtomicBool, Ordering}; -use gix_features::{progress::Progress, threading, zlib}; +use gix_features::{progress::Progress, zlib}; use crate::{ cache::delta::{ @@ -43,11 +40,6 @@ mod root { } impl<'a, T: Send> Node<'a, T> { - /// Returns the offset into the pack at which the `Node`s data is located. - pub fn offset(&self) -> u64 { - self.item.offset - } - /// Returns the slice into the data pack at which the pack entry is located. pub fn entry_slice(&self) -> crate::data::EntryRange { self.item.offset..self.item.next_offset @@ -58,11 +50,6 @@ mod root { &mut self.item.data } - /// Returns true if this node has children, e.g. is not a leaf in the tree. - pub fn has_children(&self) -> bool { - !self.item.children().is_empty() - } - /// Transform this `Node` into an iterator over its children. /// /// Children are `Node`s referring to pack entries whose base object is this pack entry. @@ -81,38 +68,87 @@ mod root { } } -pub(super) struct State<'items, F, MBFN, T: Send> { - pub delta_bytes: Vec, - pub fully_resolved_delta_bytes: Vec, - pub progress: Box, - pub resolve: F, - pub modify_base: MBFN, - pub child_items: &'items ItemSliceSync<'items, Item>, +/// (delta depth, node to resolve, parent's resolved bytes `None` for level-0 roots) +type WorkItem<'a, T> = ( + u16, + root::Node<'a, T>, + Option)>>, +); + +struct WorkQueue<'a, T: Send> { + items: std::sync::Mutex>>, + cvar: std::sync::Condvar, + /// Indicator that items are still being processed + in_progress: std::sync::atomic::AtomicUsize, +} + +impl<'a, T: Send> WorkQueue<'a, T> { + fn new(nodes: Vec>) -> Self { + WorkQueue { + items: std::sync::Mutex::new(nodes), + cvar: std::sync::Condvar::new(), + in_progress: std::sync::atomic::AtomicUsize::new(0), + } + } + + /// Blocks until an item is available, returns `None` only when all work is done. + fn pop(&self) -> Option> { + let mut guard = self.items.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + loop { + if let Some(v) = guard.pop() { + self.in_progress.fetch_add(1, Ordering::SeqCst); + return Some(v); + } + if self.in_progress.load(Ordering::SeqCst) == 0 { + return None; + } + // Nothing available yet but work is still in-progress — wait for a push or finish. + guard = self.cvar.wait(guard).unwrap_or_else(std::sync::PoisonError::into_inner); + } + } + + /// Push a batch of items and wake waiting workers. + fn push_items(&self, children: Vec>, num_threads: usize) { + if children.is_empty() { + return; + } + let n = children.len(); + let mut guard = self.items.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + guard.extend(children); + drop(guard); + for _ in 0..n.min(num_threads) { + self.cvar.notify_one(); + } + } + + /// Mark one item done. Must be called for every `pop`, including error paths. + fn finish_item(&self) { + let _guard = self.items.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + self.in_progress.fetch_sub(1, Ordering::SeqCst); + self.cvar.notify_all(); + } } +/// Resolve all delta objects in parallel using a work-stealing queue. +/// /// SAFETY: `item.children` must uniquely reference elements in child_items that no other currently alive /// item does. All child_items must also have unique children. -/// /// This safety invariant can be reliably upheld by making sure `item` comes from a Tree and `child_items` /// was constructed using that Tree's child_items. This works since Tree has this invariant as well: all /// child_items are referenced at most once (really, exactly once) by a node in the tree. #[allow(clippy::too_many_arguments, unsafe_code)] -#[deny(unsafe_op_in_unsafe_fn)] // this is a big function, require unsafe for the one small unsafe op we have -pub(super) unsafe fn deltas( +#[deny(unsafe_op_in_unsafe_fn)] +pub(super) unsafe fn resolve_all_nodes( + items: &mut [Item], + child_items: &ItemSliceSync<'_, Item>, + thread_limit: Option, objects: gix_features::progress::StepShared, size: gix_features::progress::StepShared, - item: &mut Item, - State { - delta_bytes, - fully_resolved_delta_bytes, - progress, - resolve, - modify_base, - child_items, - }: &mut State<'_, F, MBFN, T>, + progress: &dyn Progress, + resolve: F, resolve_data: &R, + modify_base: MBFN, hash_len: usize, - threads_left: &AtomicIsize, should_interrupt: &AtomicBool, ) -> Result<(), Error> where @@ -122,150 +158,36 @@ where MBFN: FnMut(&mut T, &dyn Progress, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { - let mut decompressed_bytes_by_pack_offset = BTreeMap::new(); - let mut inflate = zlib::Inflate::default(); - let mut decompress_from_resolver = |slice: EntryRange, out: &mut Vec| -> Result<(data::Entry, u64), Error> { - let bytes = resolve(slice.clone(), resolve_data).ok_or(Error::ResolveFailed { - pack_offset: slice.start, - })?; - let entry = data::Entry::from_bytes(bytes, slice.start, hash_len)?; - let compressed = &bytes[entry.header_size()..]; - let decompressed_len = entry.decompressed_size as usize; - decompress_all_at_once_with(&mut inflate, compressed, decompressed_len, out)?; - Ok((entry, slice.end)) - }; - - // each node is a base, and its children always start out as deltas which become a base after applying them. - // These will be pushed onto our stack until all are processed - let root_level = 0; - // SAFETY: This invariant is required from the caller - #[allow(unsafe_code)] - let root_node = unsafe { root::Node::new(item, child_items) }; - let mut nodes: Vec<_> = vec![(root_level, root_node)]; - while let Some((level, mut base)) = nodes.pop() { - if should_interrupt.load(Ordering::Relaxed) { - return Err(Error::Interrupted); - } - let (base_entry, entry_end, base_bytes) = if level == root_level { - let mut buf = Vec::new(); - let (a, b) = decompress_from_resolver(base.entry_slice(), &mut buf)?; - (a, b, buf) - } else { - decompressed_bytes_by_pack_offset - .remove(&base.offset()) - .expect("we store the resolved delta buffer when done") - }; - - // anything done here must be repeated further down for leaf-nodes. - // This way we avoid retaining their decompressed memory longer than needed (they have no children, - // thus their memory can be released right away, using 18% less peak memory on the linux kernel). - { - modify_base( - base.data(), - progress, - Context { - entry: &base_entry, - entry_end, - decompressed: &base_bytes, - level, - }, - ) - .map_err(|err| Box::new(err) as Box)?; - objects.fetch_add(1, Ordering::Relaxed); - size.fetch_add(base_bytes.len(), Ordering::Relaxed); - } - - for mut child in base.into_child_iter() { - let (mut child_entry, entry_end) = decompress_from_resolver(child.entry_slice(), delta_bytes)?; - let (base_size, consumed) = data::delta::decode_header_size(delta_bytes)?; - let mut header_ofs = consumed; - assert_eq!( - base_bytes.len(), - base_size as usize, - "recorded base size in delta does match the actual one" - ); - let (result_size, consumed) = data::delta::decode_header_size(&delta_bytes[consumed..])?; - header_ofs += consumed; - - fully_resolved_delta_bytes.resize(result_size as usize, 0); - data::delta::apply(&base_bytes, fully_resolved_delta_bytes, &delta_bytes[header_ofs..])?; - - // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers - // at all - child_entry.header = base_entry.header; // assign the actual object type, instead of 'delta' - if child.has_children() { - decompressed_bytes_by_pack_offset.insert( - child.offset(), - (child_entry, entry_end, std::mem::take(fully_resolved_delta_bytes)), - ); - nodes.push((level + 1, child)); - } else { - modify_base( - child.data(), - &progress, - Context { - entry: &child_entry, - entry_end, - decompressed: fully_resolved_delta_bytes, - level: level + 1, - }, - ) - .map_err(|err| Box::new(err) as Box)?; - objects.fetch_add(1, Ordering::Relaxed); - size.fetch_add(base_bytes.len(), Ordering::Relaxed); - } - } - - // After the first round, see if we can use additional threads, and if so we enter multi-threaded mode. - // In it we will keep using new threads as they become available while using this thread for coordination. - // We optimize for a low memory footprint as we are likely to get here if long delta-chains with large objects are involved. - // Try to avoid going into threaded mode if there isn't more than one unit of work anyway. - if nodes.len() > 1 { - if let Ok(initial_threads) = - threads_left.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available| { - (threads_available > 0).then_some(0) - }) - { - // Assure no memory is held here. - *delta_bytes = Vec::new(); - *fully_resolved_delta_bytes = Vec::new(); - return deltas_mt( - initial_threads, - decompressed_bytes_by_pack_offset, - objects, - size, - &progress, - nodes, - resolve.clone(), - resolve_data, - modify_base.clone(), - hash_len, - threads_left, - should_interrupt, - ); - } - } - } - - Ok(()) + let num_threads = gix_features::parallel::num_threads(thread_limit); + let nodes = items + .iter_mut() + .map(|item| (0u16, unsafe { root::Node::new(item, child_items) }, None)) + .collect(); + deltas_mt( + num_threads, + objects, + size, + progress, + nodes, + resolve, + resolve_data, + modify_base, + hash_len, + should_interrupt, + ) } -/// * `initial_threads` is the threads we may spawn, not accounting for our own thread which is still considered used by the parent -/// system. Since this thread will take a controlling function, we may spawn one more than that. In threaded mode, we will finish -/// all remaining work. #[allow(clippy::too_many_arguments)] fn deltas_mt( - mut threads_to_create: isize, - decompressed_bytes_by_pack_offset: BTreeMap)>, + num_threads: usize, objects: gix_features::progress::StepShared, size: gix_features::progress::StepShared, progress: &dyn Progress, - nodes: Vec<(u16, root::Node<'_, T>)>, + nodes: Vec>, resolve: F, resolve_data: &R, modify_base: MBFN, hash_len: usize, - threads_left: &AtomicIsize, should_interrupt: &AtomicBool, ) -> Result<(), Error> where @@ -275,21 +197,15 @@ where MBFN: FnMut(&mut T, &dyn Progress, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { - let nodes = gix_features::threading::Mutable::new(nodes); - let decompressed_bytes_by_pack_offset = gix_features::threading::Mutable::new(decompressed_bytes_by_pack_offset); - threads_to_create += 1; // ourselves - let mut returned_ourselves = false; + let queue = WorkQueue::new(nodes); gix_features::parallel::threads(|s| -> Result<(), Error> { - let mut threads = Vec::new(); - let poll_interval = std::time::Duration::from_millis(100); - loop { - for tid in 0..threads_to_create { - let thread = gix_features::parallel::build_thread() + let threads = (0..num_threads) + .map(|tid| { + gix_features::parallel::build_thread() .name(format!("gix-pack.traverse_deltas.{tid}")) .spawn_scoped(s, { - let nodes = &nodes; - let decompressed_bytes_by_pack_offset = &decompressed_bytes_by_pack_offset; + let queue = &queue; let resolve = resolve.clone(); let mut modify_base = modify_base.clone(); let objects = &objects; @@ -311,143 +227,86 @@ where Ok((entry, slice.end)) }; - loop { - let (level, mut base) = match threading::lock(nodes).pop() { - Some(v) => v, - None => break, - }; - if should_interrupt.load(Ordering::Relaxed) { - return Err(Error::Interrupted); - } - let (base_entry, entry_end, base_bytes) = if level == 0 { - let mut buf = Vec::new(); - let (a, b) = decompress_from_resolver(base.entry_slice(), &mut buf)?; - (a, b, buf) - } else { - threading::lock(decompressed_bytes_by_pack_offset) - .remove(&base.offset()) - .expect("we store the resolved delta buffer when done") - }; + while let Some((level, mut base, parent_arc)) = queue.pop() { + let result: Result<(), Error> = (|| { + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } - // anything done here must be repeated further down for leaf-nodes. - // This way we avoid retaining their decompressed memory longer than needed (they have no children, - // thus their memory can be released right away, using 18% less peak memory on the linux kernel). - { + let base_arc: std::sync::Arc<(data::Entry, u64, Vec)> = + if let Some(parent) = parent_arc { + let (mut entry, entry_end) = + decompress_from_resolver(base.entry_slice(), &mut delta_bytes)?; + let (base_size, consumed) = data::delta::decode_header_size(&delta_bytes)?; + let mut header_ofs = consumed; + assert_eq!( + parent.2.len(), + base_size as usize, + "recorded base size in delta does not match the actual one" + ); + let (result_size, consumed) = + data::delta::decode_header_size(&delta_bytes[consumed..])?; + header_ofs += consumed; + fully_resolved_delta_bytes.resize(result_size as usize, 0); + data::delta::apply( + &parent.2, + &mut fully_resolved_delta_bytes, + &delta_bytes[header_ofs..], + )?; + // FIXME: this actually invalidates the "pack_offset()" computation + entry.header = parent.0.header; // inherit real object type + std::sync::Arc::new(( + entry, + entry_end, + std::mem::take(&mut fully_resolved_delta_bytes), + )) + } else { + let mut buf = Vec::new(); + let (entry, entry_end) = + decompress_from_resolver(base.entry_slice(), &mut buf)?; + std::sync::Arc::new((entry, entry_end, buf)) + }; + + let (base_entry, entry_end, base_bytes) = &*base_arc; modify_base( base.data(), progress, Context { - entry: &base_entry, - entry_end, - decompressed: &base_bytes, + entry: base_entry, + entry_end: *entry_end, + decompressed: base_bytes, level, }, ) .map_err(|err| Box::new(err) as Box)?; objects.fetch_add(1, Ordering::Relaxed); size.fetch_add(base_bytes.len(), Ordering::Relaxed); - } - for mut child in base.into_child_iter() { - let (mut child_entry, entry_end) = - decompress_from_resolver(child.entry_slice(), &mut delta_bytes)?; - let (base_size, consumed) = data::delta::decode_header_size(&delta_bytes)?; - let mut header_ofs = consumed; - assert_eq!( - base_bytes.len(), - base_size as usize, - "recorded base size in delta does match the actual one" - ); - let (result_size, consumed) = - data::delta::decode_header_size(&delta_bytes[consumed..])?; - header_ofs += consumed; + let children = base + .into_child_iter() + .map(|child| (level + 1, child, Some(std::sync::Arc::clone(&base_arc)))) + .collect(); + queue.push_items(children, num_threads); - fully_resolved_delta_bytes.resize(result_size as usize, 0); - data::delta::apply( - &base_bytes, - &mut fully_resolved_delta_bytes, - &delta_bytes[header_ofs..], - )?; + Ok(()) + })(); - // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers - // at all - child_entry.header = base_entry.header; // assign the actual object type, instead of 'delta' - if child.has_children() { - threading::lock(decompressed_bytes_by_pack_offset).insert( - child.offset(), - (child_entry, entry_end, std::mem::take(&mut fully_resolved_delta_bytes)), - ); - threading::lock(nodes).push((level + 1, child)); - } else { - modify_base( - child.data(), - progress, - Context { - entry: &child_entry, - entry_end, - decompressed: &fully_resolved_delta_bytes, - level: level + 1, - }, - ) - .map_err(|err| Box::new(err) as Box)?; - objects.fetch_add(1, Ordering::Relaxed); - size.fetch_add(base_bytes.len(), Ordering::Relaxed); - } - } + queue.finish_item(); + result?; } Ok(()) } - })?; - threads.push(thread); - } - if threads_left - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available: isize| { - (threads_available > 0).then(|| { - threads_to_create = threads_available.min(threading::lock(&nodes).len() as isize); - threads_available - threads_to_create }) - }) - .is_err() - { - threads_to_create = 0; - } + }) + .collect::, _>>()?; - // What we really want to do is either wait for one of our threads to go down - // or for another scheduled thread to become available. Unfortunately we can't do that, - // but may instead find a good way to set the polling interval instead of hard-coding it. - std::thread::sleep(poll_interval); - // Get out of threads are already starving or they would be starving soon as no work is left. - // - // Lint: ScopedJoinHandle is not the same depending on active features and is not exposed in some cases. - #[allow(clippy::redundant_closure_for_method_calls)] - if threads.iter().any(|t| t.is_finished()) { - let mut running_threads = Vec::new(); - for thread in threads.drain(..) { - if thread.is_finished() { - match thread.join() { - Ok(Err(err)) => return Err(err), - Ok(Ok(())) => { - if !returned_ourselves { - returned_ourselves = true; - } else { - threads_left.fetch_add(1, Ordering::SeqCst); - } - } - Err(err) => { - std::panic::resume_unwind(err); - } - } - } else { - running_threads.push(thread); - } - } - if running_threads.is_empty() && threading::lock(&nodes).is_empty() { - break; - } - threads = running_threads; + for thread in threads { + match thread.join() { + Ok(Ok(())) => {} + Ok(Err(err)) => return Err(err), + Err(payload) => std::panic::resume_unwind(payload), } } - Ok(()) }) } From 60e081abe2146994a1aba4a71c31a436f071dc49 Mon Sep 17 00:00:00 2001 From: vmandke Date: Sun, 10 May 2026 10:13:29 +0530 Subject: [PATCH 2/2] Address review comment (1) --- gix-pack/src/cache/delta/traverse/resolve.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index e7c226e60b0..beda29ec14b 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -121,7 +121,6 @@ impl<'a, T: Send> WorkQueue<'a, T> { } } - /// Mark one item done. Must be called for every `pop`, including error paths. fn finish_item(&self) { let _guard = self.items.lock().unwrap_or_else(std::sync::PoisonError::into_inner); self.in_progress.fetch_sub(1, Ordering::SeqCst); @@ -129,6 +128,15 @@ impl<'a, T: Send> WorkQueue<'a, T> { } } +/// RAII guard that calls `finish_item` on drop, ensuring the in-progress counter +/// is decremented even if the worker panics. +struct FinishOnDrop<'q, 'a, T: Send>(&'q WorkQueue<'a, T>); +impl<'q, 'a, T: Send> Drop for FinishOnDrop<'q, 'a, T> { + fn drop(&mut self) { + self.0.finish_item(); + } +} + /// Resolve all delta objects in parallel using a work-stealing queue. /// /// SAFETY: `item.children` must uniquely reference elements in child_items that no other currently alive @@ -228,6 +236,7 @@ where }; while let Some((level, mut base, parent_arc)) = queue.pop() { + let _finish = FinishOnDrop(queue); let result: Result<(), Error> = (|| { if should_interrupt.load(Ordering::Relaxed) { return Err(Error::Interrupted); @@ -291,7 +300,6 @@ where Ok(()) })(); - queue.finish_item(); result?; } Ok(())