Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions gix-odb/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ impl<S> Cache<S> {
}
}

impl<S> Cache<crate::store::Handle<S>>
where
S: Deref<Target = crate::Store> + Clone,
{
/// Find an object and return its decoded bytes as a stream.
pub fn try_find_stream(
&self,
id: &gix_hash::oid,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, gix_object::find::Error> {
match self.pack_cache.as_ref().map(RefCell::borrow_mut) {
Some(mut pack_cache) => self.inner.try_find_stream(id, pack_cache.deref_mut()),
None => self.inner.try_find_stream(id, &mut gix_pack::cache::Never),
}
}
}

impl<S> From<S> for Cache<S>
where
S: gix_pack::Find,
Expand Down
70 changes: 70 additions & 0 deletions gix-odb/src/find.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,73 @@
use std::io::{self, Cursor, Read};

/// A streaming view over an object's decoded bytes.
pub struct Stream {
kind: gix_object::Kind,
size: u64,
inner: StreamInner,
}

enum StreamInner {
InMemory(Cursor<Vec<u8>>),
File(std::fs::File),
Loose(crate::store_impls::loose::find::StreamReader),
}

impl Stream {
/// Return the kind of the object yielded by this stream.
pub fn kind(&self) -> gix_object::Kind {
self.kind
}

/// Return the decoded object size in bytes.
pub fn size(&self) -> u64 {
self.size
}

/// Return an empty blob stream.
pub fn empty_blob() -> Self {
Self::from_bytes(gix_object::Kind::Blob, Vec::new())
}

pub(crate) fn from_bytes(kind: gix_object::Kind, data: Vec<u8>) -> Self {
Self {
kind,
size: data.len() as u64,
inner: StreamInner::InMemory(Cursor::new(data)),
}
}

pub(crate) fn from_file(kind: gix_object::Kind, size: u64, file: std::fs::File) -> Self {
Self {
kind,
size,
inner: StreamInner::File(file),
}
}

pub(crate) fn from_loose(
kind: gix_object::Kind,
size: u64,
reader: crate::store_impls::loose::find::StreamReader,
) -> Self {
Self {
kind,
size,
inner: StreamInner::Loose(reader),
}
}
}

impl Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match &mut self.inner {
StreamInner::InMemory(cursor) => cursor.read(buf),
StreamInner::File(file) => file.read(buf),
StreamInner::Loose(reader) => reader.read(buf),
}
}
}

/// An object header informing about object properties, without it being fully decoded in the process.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum Header {
Expand Down
20 changes: 20 additions & 0 deletions gix-odb/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ impl<T> Proxy<T> {
}
}

impl<S> Proxy<crate::Cache<crate::store::Handle<S>>>
where
S: Deref<Target = crate::Store> + Clone,
{
/// Find an object and return its decoded bytes as a stream.
pub fn try_find_stream(
&self,
id: &gix_hash::oid,
) -> Result<Option<(crate::find::Stream, Option<crate::pack::data::entry::Location>)>, gix_object::find::Error>
{
if let Some(map) = self.memory.as_ref() {
let map = map.borrow();
if let Some((kind, data)) = map.get(id) {
return Ok(Some((crate::find::Stream::from_bytes(*kind, data.clone()), None)));
}
}
self.inner.try_find_stream(id)
}
}

impl<T> Clone for Proxy<T>
where
T: Clone,
Expand Down
2 changes: 1 addition & 1 deletion gix-odb/src/store_impls/dynamic/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
fn try_find_cached_inner<'a, 'b>(
pub(crate) fn try_find_cached_inner<'a, 'b>(
&'b self,
mut id: &'b gix_hash::oid,
buffer: &'a mut Vec<u8>,
Expand Down
1 change: 1 addition & 0 deletions gix-odb/src/store_impls/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod find;
pub mod prefix;

mod header;
mod stream;

///
pub mod iter;
Expand Down
230 changes: 230 additions & 0 deletions gix-odb/src/store_impls/dynamic/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::{io::Seek, ops::Deref};

use gix_features::zlib;
use gix_hash::oid;
use gix_pack::cache::DecodeEntry;

use super::find::Error;
use crate::store::{find::error::DeltaBaseRecursion, handle, load_index};

impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
pub(crate) fn try_find_stream_inner<'b>(
&'b self,
mut id: &'b gix_hash::oid,
inflate: &mut zlib::Inflate,
pack_cache: &mut dyn DecodeEntry,
snapshot: &mut load_index::Snapshot,
recursion: Option<DeltaBaseRecursion<'_>>,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, Error> {
if let Some(r) = recursion {
if r.depth >= self.max_recursion_depth {
return Err(Error::DeltaBaseRecursionLimit {
max_depth: self.max_recursion_depth,
id: r.original_id.to_owned(),
});
}
} else if !self.ignore_replacements {
if let Ok(pos) = self
.store
.replacements
.binary_search_by(|(map_this, _)| map_this.as_ref().cmp(id))
{
id = self.store.replacements[pos].1.as_ref();
}
}

'outer: loop {
{
let marker = snapshot.marker;
for (idx, index) in snapshot.indices.iter_mut().enumerate() {
if let Some(handle::index_lookup::Outcome {
object_index: handle::IndexForObjectInPack { pack_id, pack_offset },
index_file,
pack: possibly_pack,
}) = index.lookup(id)
{
let pack = match possibly_pack {
Some(pack) => pack,
None => match self.store.load_pack(pack_id, marker)? {
Some(pack) => {
*possibly_pack = Some(pack);
possibly_pack.as_deref().expect("just put it in")
}
None => match self.store.load_one_index(self.refresh, snapshot.marker)? {
Some(new_snapshot) => {
*snapshot = new_snapshot;
self.clear_cache();
continue 'outer;
}
None => return Ok(None),
},
},
};
let resolved_pack_id = pack.id;
let entry = pack.entry(pack_offset)?;
let header_size = entry.header_size();
let result = {
let mut scratch = Vec::new();
let mut temp = tempfile::tempfile()?;
let result = match pack.decode_entry_to_write(
entry,
&mut scratch,
inflate,
&mut temp,
&|id, _out| {
index_file.pack_offset_by_id(id).and_then(|pack_offset| {
pack.entry(pack_offset)
.ok()
.map(gix_pack::data::decode::entry::ResolvedBase::InPack)
})
},
pack_cache,
) {
Ok(outcome) => Ok((outcome, temp)),
Err(gix_pack::data::decode::Error::DeltaBaseUnresolved(base_id)) => {
let mut buf = Vec::new();
let obj_kind = self
.try_find_cached_inner(
&base_id,
&mut buf,
inflate,
pack_cache,
snapshot,
recursion
.map(DeltaBaseRecursion::inc_depth)
.or_else(|| DeltaBaseRecursion::new(id).into()),
)
.map_err(|err| Error::DeltaBaseLookup {
err: Box::new(err),
base_id,
id: id.to_owned(),
})?
.ok_or_else(|| Error::DeltaBaseMissing {
base_id,
id: id.to_owned(),
})?
.0
.kind;
let handle::index_lookup::Outcome {
object_index:
handle::IndexForObjectInPack {
pack_id: _,
pack_offset,
},
index_file,
pack: possibly_pack,
} = match snapshot.indices[idx].lookup(id) {
Some(res) => res,
None => {
let mut out = None;
for index in &mut snapshot.indices {
out = index.lookup(id);
if out.is_some() {
break;
}
}

out.unwrap_or_else(|| {
panic!(
"could not find object {id} in any index after looking up one of its base objects {base_id}"
)
})
}
};
let pack = possibly_pack
.as_ref()
.expect("pack to still be available like just now");
let entry = pack.entry(pack_offset)?;
let mut scratch = Vec::new();
let mut temp = tempfile::tempfile()?;
pack.decode_entry_to_write(
entry,
&mut scratch,
inflate,
&mut temp,
&|id, out| {
index_file
.pack_offset_by_id(id)
.and_then(|pack_offset| {
pack.entry(pack_offset)
.ok()
.map(gix_pack::data::decode::entry::ResolvedBase::InPack)
})
.or_else(|| {
(id == base_id).then(|| {
out.resize(buf.len(), 0);
out.copy_from_slice(buf.as_slice());
gix_pack::data::decode::entry::ResolvedBase::OutOfPack {
kind: obj_kind,
end: out.len(),
}
})
})
},
pack_cache,
)
.map(|outcome| (outcome, temp))
}
Err(err) => Err(err),
}?;
result
};
let (outcome, mut temp) = result;
temp.rewind()?;
let res = (
crate::find::Stream::from_file(outcome.kind, outcome.object_size, temp),
Some(gix_pack::data::entry::Location {
pack_id: resolved_pack_id,
pack_offset,
entry_size: outcome.compressed_size + header_size,
Comment on lines +180 to +182
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Recompute pack location after delta-base retry

When decode_entry_to_write() falls into the DeltaBaseUnresolved retry path, the object is looked up and decoded again after recursive base resolution, and that recursive lookup can reorder or refresh snapshot.indices. In that case, the successful retry may come from a different pack entry than the initial lookup, but the returned location still uses the originally captured pack_id/header_size, so callers can receive a mismatched Location (pack_id, pack_offset, entry_size) for the bytes they actually streamed.

Useful? React with 👍 / 👎.

}),
);

if idx != 0 {
snapshot.indices.swap(0, idx);
}
return Ok(Some(res));
}
}
}

for lodb in snapshot.loose_dbs.iter() {
if lodb.contains(id) {
return lodb
.try_find_stream(id)
.map(|obj| obj.map(|obj| (obj, None)))
.map_err(Into::into);
}
}

match self.store.load_one_index(self.refresh, snapshot.marker)? {
Some(new_snapshot) => {
*snapshot = new_snapshot;
self.clear_cache();
}
None => return Ok(None),
}
}
}
}

impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
/// Try to find the object identified by `id` in any backing store and return it as a readable stream,
/// along with its pack location if it came from a pack.
pub fn try_find_stream(
&self,
id: &oid,
pack_cache: &mut dyn DecodeEntry,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, gix_object::find::Error> {
let mut snapshot = self.snapshot.borrow_mut();
let mut inflate = self.inflate.borrow_mut();
self.try_find_stream_inner(id, &mut inflate, pack_cache, &mut snapshot, None)
.map_err(|err| Box::new(err) as _)
}
}
Loading
Loading