diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index 4e2234ed176f..73ed201ac483 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -81,6 +81,28 @@ impl FullReader { .contains(key) } + /// Derive the `OpStat` used for cache fill from the reader's `OpRead`, so the + /// stat observes the same conditions (version, if-match, ...) as the request. + fn stat_args(&self) -> OpStat { + let mut op = OpStat::new(); + if let Some(v) = self.args.version() { + op = op.with_version(v); + } + if let Some(v) = self.args.if_match() { + op = op.with_if_match(v); + } + if let Some(v) = self.args.if_none_match() { + op = op.with_if_none_match(v); + } + if let Some(v) = self.args.if_modified_since() { + op = op.with_if_modified_since(v); + } + if let Some(v) = self.args.if_unmodified_since() { + op = op.with_if_unmodified_since(v); + } + op + } + async fn fallback_open( &self, range: BytesRange, @@ -129,11 +151,13 @@ impl FullReader { let inner = self.inner.clone(); let size_limit = self.size_limit.clone(); let path_clone = path_str.clone(); + let stat_args = self.stat_args(); + let read_args = self.args.clone(); async move { // read the metadata first, if it's too large, do not cache let metadata = inner .accessor - .stat(&path_clone, OpStat::default()) + .stat(&path_clone, stat_args) .await .map_err(FetchError::from_error)? .into_metadata(); @@ -146,7 +170,7 @@ impl FullReader { // fetch the ENTIRE object from remote. let (_, reader) = inner .accessor - .read(&path_clone, OpRead::default()) + .read(&path_clone, read_args) .await .map_err(FetchError::from_error)?; let (_, mut stream) = reader diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index ac22cfc2fd06..5c2c02f3cf76 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -295,6 +295,8 @@ mod tests { stat_calls: AtomicUsize, open_calls: AtomicUsize, read_calls: AtomicUsize, + last_stat_args: Mutex>, + last_read_args: Mutex>, } impl MockReadState { @@ -304,6 +306,8 @@ mod tests { stat_calls: AtomicUsize::new(0), open_calls: AtomicUsize::new(0), read_calls: AtomicUsize::new(0), + last_stat_args: Mutex::new(None), + last_read_args: Mutex::new(None), } } @@ -379,12 +383,14 @@ mod tests { am.into() } - async fn stat(&self, _: &str, _: OpStat) -> Result { + async fn stat(&self, _: &str, args: OpStat) -> Result { self.state.stat_calls.fetch_add(1, Ordering::Relaxed); + *self.state.last_stat_args.lock().unwrap() = Some(args); Ok(RpStat::new(self.state.metadata())) } - async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { + async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + *self.state.last_read_args.lock().unwrap() = Some(args); Ok(( self.state.rp_read(), MockReadReader { @@ -445,6 +451,28 @@ mod tests { assert_eq!(state.read_calls.load(Ordering::Relaxed), 0); } + #[tokio::test] + async fn test_cache_fill_preserves_read_args() { + let cache = memory_cache().await; + let accessor = FoyerLayer::new(cache) + .with_size_limit(0..100) + .layer(MockReadAccessor::new("0123456789")); + let state = accessor.inner.accessor.state.clone(); + + let args = OpRead::default().with_version("v1").with_if_match("etag-1"); + let (_, reader) = LayeredAccess::read(&accessor, "test", args).await.unwrap(); + let (_, mut stream) = reader.open(BytesRange::new(0, None)).await.unwrap(); + stream.read_all().await.unwrap(); + + let stat_args = state.last_stat_args.lock().unwrap().clone().unwrap(); + assert_eq!(stat_args.version(), Some("v1")); + assert_eq!(stat_args.if_match(), Some("etag-1")); + + let read_args = state.last_read_args.lock().unwrap().clone().unwrap(); + assert_eq!(read_args.version(), Some("v1")); + assert_eq!(read_args.if_match(), Some("etag-1")); + } + #[tokio::test] async fn test() { let dir = tempfile::tempdir().unwrap();