Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions core/layers/foyer/src/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ impl<A: Access> FullReader<A> {
.contains(key)
}

/// Derive the `OpStat` used for cache fill from the reader's `OpRead`, so the
/// stat observes the same conditions (version, if-match, ...) as the request.
fn stat_args(&self) -> OpStat {
let mut op = OpStat::new();
if let Some(v) = self.args.version() {
op = op.with_version(v);
}
if let Some(v) = self.args.if_match() {
op = op.with_if_match(v);
}
if let Some(v) = self.args.if_none_match() {
op = op.with_if_none_match(v);
}
if let Some(v) = self.args.if_modified_since() {
op = op.with_if_modified_since(v);
}
if let Some(v) = self.args.if_unmodified_since() {
op = op.with_if_unmodified_since(v);
}
op
}

async fn fallback_open(
&self,
range: BytesRange,
Expand Down Expand Up @@ -129,11 +151,13 @@ impl<A: Access> FullReader<A> {
let inner = self.inner.clone();
let size_limit = self.size_limit.clone();
let path_clone = path_str.clone();
let stat_args = self.stat_args();
let read_args = self.args.clone();
async move {
// read the metadata first, if it's too large, do not cache
let metadata = inner
.accessor
.stat(&path_clone, OpStat::default())
.stat(&path_clone, stat_args)
.await
.map_err(FetchError::from_error)?
.into_metadata();
Expand All @@ -146,7 +170,7 @@ impl<A: Access> FullReader<A> {
// fetch the ENTIRE object from remote.
let (_, reader) = inner
.accessor
.read(&path_clone, OpRead::default())
.read(&path_clone, read_args)
.await
.map_err(FetchError::from_error)?;
let (_, mut stream) = reader
Expand Down
32 changes: 30 additions & 2 deletions core/layers/foyer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ mod tests {
stat_calls: AtomicUsize,
open_calls: AtomicUsize,
read_calls: AtomicUsize,
last_stat_args: Mutex<Option<OpStat>>,
last_read_args: Mutex<Option<OpRead>>,
}

impl MockReadState {
Expand All @@ -304,6 +306,8 @@ mod tests {
stat_calls: AtomicUsize::new(0),
open_calls: AtomicUsize::new(0),
read_calls: AtomicUsize::new(0),
last_stat_args: Mutex::new(None),
last_read_args: Mutex::new(None),
}
}

Expand Down Expand Up @@ -379,12 +383,14 @@ mod tests {
am.into()
}

async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
async fn stat(&self, _: &str, args: OpStat) -> Result<RpStat> {
self.state.stat_calls.fetch_add(1, Ordering::Relaxed);
*self.state.last_stat_args.lock().unwrap() = Some(args);
Ok(RpStat::new(self.state.metadata()))
}

async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
*self.state.last_read_args.lock().unwrap() = Some(args);
Ok((
self.state.rp_read(),
MockReadReader {
Expand Down Expand Up @@ -445,6 +451,28 @@ mod tests {
assert_eq!(state.read_calls.load(Ordering::Relaxed), 0);
}

#[tokio::test]
async fn test_cache_fill_preserves_read_args() {
let cache = memory_cache().await;
let accessor = FoyerLayer::new(cache)
.with_size_limit(0..100)
.layer(MockReadAccessor::new("0123456789"));
let state = accessor.inner.accessor.state.clone();

let args = OpRead::default().with_version("v1").with_if_match("etag-1");
let (_, reader) = LayeredAccess::read(&accessor, "test", args).await.unwrap();
let (_, mut stream) = reader.open(BytesRange::new(0, None)).await.unwrap();
stream.read_all().await.unwrap();

let stat_args = state.last_stat_args.lock().unwrap().clone().unwrap();
assert_eq!(stat_args.version(), Some("v1"));
assert_eq!(stat_args.if_match(), Some("etag-1"));

let read_args = state.last_read_args.lock().unwrap().clone().unwrap();
assert_eq!(read_args.version(), Some("v1"));
assert_eq!(read_args.if_match(), Some("etag-1"));
}

#[tokio::test]
async fn test() {
let dir = tempfile::tempdir().unwrap();
Expand Down
Loading