diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java
index 43baf02e53a77..adb05efe737fc 100644
--- a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java
+++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java
@@ -31,9 +31,13 @@ public interface TieredStoreNativeBridge {
void initLogger();
/**
- * Create a global {@code TieredObjectStore} with no remote stores.
+ * Create a global {@code TieredObjectStore} with an optional Foyer disk page cache.
* Remote stores are added later via {@link #addRemoteStore}.
*
+ *
The Foyer {@code FoyerCacheManager} is created inside the native layer.
+ * Pass {@code diskCacheBytes = 0} (or empty {@code diskCacheDir}) to disable
+ * the page cache entirely.
+ *
* @return {@code long[3]}: {@code [objectStoreDataPtr, objectStoreVtablePtr, registryPtr]}
*/
long[] createTieredObjectStore();
diff --git a/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib b/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib
index 2dc0c63440296..92a92d66a0352 100644
Binary files a/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib and b/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib differ
diff --git a/plugins/engine-datafusion/Cargo.toml b/plugins/engine-datafusion/Cargo.toml
index 9889595c669c2..346a7cacecc1f 100644
--- a/plugins/engine-datafusion/Cargo.toml
+++ b/plugins/engine-datafusion/Cargo.toml
@@ -19,6 +19,8 @@ arrow-array = "57.3.0"
arrow-schema = "57.3.0"
arrow-buffer = "57.3.0"
downcast-rs = "1.2"
+foyer = { version = "=0.11.5" }
+bytes = "1.9"
# JNI dependencies
diff --git a/plugins/engine-datafusion/jni/Cargo.toml b/plugins/engine-datafusion/jni/Cargo.toml
index a2538d76b13d9..50768d86f268d 100644
--- a/plugins/engine-datafusion/jni/Cargo.toml
+++ b/plugins/engine-datafusion/jni/Cargo.toml
@@ -68,6 +68,12 @@ url = { workspace = true }
# Liquid Cache for byte-level caching
liquid-cache-datafusion-local = { workspace = true }
+# Foyer hybrid disk cache for Parquet page caching
+foyer = { workspace = true }
+
+# serde_bytes: efficient Bytes serialization needed for Foyer's StorageValue bound
+serde_bytes = "0.11"
+
# Substrait support
substrait = { workspace = true }
diff --git a/plugins/engine-datafusion/jni/src/cache.rs b/plugins/engine-datafusion/jni/src/cache.rs
index 4172c0926d724..6c8416dd1828c 100644
--- a/plugins/engine-datafusion/jni/src/cache.rs
+++ b/plugins/engine-datafusion/jni/src/cache.rs
@@ -13,6 +13,7 @@ pub const ALL_CACHE_TYPES: &[&str] = &[CACHE_TYPE_METADATA, CACHE_TYPE_STATS];
pub const CACHE_TYPE_METADATA: &str = "METADATA";
pub const CACHE_TYPE_STATS: &str = "STATISTICS";
+
// Helper function to handle cache errors
#[allow(dead_code)]
fn handle_cache_error(env: &mut JNIEnv, operation: &str, error: &str) {
diff --git a/plugins/engine-datafusion/jni/src/tiered/foyer_cache.rs b/plugins/engine-datafusion/jni/src/tiered/foyer_cache.rs
new file mode 100644
index 0000000000000..a0418714b340b
--- /dev/null
+++ b/plugins/engine-datafusion/jni/src/tiered/foyer_cache.rs
@@ -0,0 +1,257 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+//! Foyer-backed **disk-only** page cache for Parquet column chunk byte ranges.
+//!
+//! ## Architecture
+//!
+//! `FoyerDiskPageCache` wraps Foyer's [`HybridCache`] configured with a zero-byte memory
+//! tier so that **all cached entries go directly to the local NVMe disk store**.
+//! This avoids heap pressure and lets the OS page cache and DataFusion's own memory
+//! management control RAM usage, while still avoiding repeated S3/GCS/Azure fetches
+//! for warm Parquet byte ranges.
+//!
+//! ```text
+//! DataFusion.get_range(file, 4096..8192)
+//! └── TieredObjectStore.get_range()
+//! ├── [FOYER-PAGE-CACHE] check L2-disk → HIT: return bytes (local NVMe I/O)
+//! └── MISS: local NVMe or remote S3/GCS read
+//! └── [FOYER-PAGE-CACHE] PUT → L2-disk (async)
+//! ```
+//!
+//! ## Key format
+//!
+//! Cache key = `":-"` (a plain String)
+//! Example: `"data/nodes/0/indices/UUID/0/index/parquet/_parquet_0.parquet:4096-8192"`
+//!
+//! ## Log prefix
+//!
+//! All log lines use the `[FOYER-PAGE-CACHE]` prefix so the caching flow can be easily
+//! grepped in the OpenSearch logs.
+
+use std::path::PathBuf;
+use std::sync::Arc;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use foyer::{HybridCache, HybridCacheBuilder, DirectFsDeviceOptionsBuilder, LruConfig};
+use vectorized_exec_spi::{log_debug, log_info, log_error};
+
+// ────────────────────────────────────────────────────────────────────
+// Value wrapper: Bytes does not implement serde, so wrap it.
+// ────────────────────────────────────────────────────────────────────
+
+/// Newtype wrapper around [`Bytes`] that implements `serde::Serialize/Deserialize`
+/// so it satisfies Foyer's `StorageValue` bound for disk persistence.
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct CachedBytes(#[serde(with = "serde_bytes")] Vec);
+
+impl CachedBytes {
+ pub fn from_bytes(b: Bytes) -> Self {
+ Self(b.to_vec())
+ }
+ pub fn into_bytes(self) -> Bytes {
+ Bytes::from(self.0)
+ }
+}
+
+// ────────────────────────────────────────────────────────────────────
+// Main cache struct
+// ────────────────────────────────────────────────────────────────────
+
+/// Foyer **disk-only** page cache for Parquet byte ranges.
+///
+/// Memory tier is disabled (0 bytes) — all entries are stored on the NVMe disk store
+/// bounded by `disk_capacity_bytes` in `disk_dir`.
+///
+/// Thread-safe and cheap to clone (inner `HybridCache` is `Arc`-backed).
+///
+/// **Important**: The `tokio::runtime::Runtime` used to build Foyer must stay alive
+/// for the entire lifetime of the `HybridCache`. Foyer spawns background I/O tasks
+/// on that runtime during `build().await`; dropping the runtime cancels those tasks,
+/// which causes `JoinError::Cancelled` panics in `foyer-storage`. We therefore keep
+/// the runtime as an `Arc` field so it is dropped only after the `HybridCache` itself.
+#[derive(Clone)]
+pub struct FoyerDiskPageCache {
+ inner: HybridCache,
+ /// The Tokio runtime that owns Foyer's background tasks.
+ /// Must outlive `inner` — Arc ensures it is dropped last.
+ _runtime: Arc,
+ disk_capacity_bytes: usize,
+ disk_dir: PathBuf,
+}
+
+impl std::fmt::Debug for FoyerDiskPageCache {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "FoyerDiskPageCache(disk={}B, dir={:?})",
+ self.disk_capacity_bytes, self.disk_dir)
+ }
+}
+
+impl FoyerDiskPageCache {
+ /// Build the cache synchronously by blocking a Tokio runtime.
+ ///
+ /// Called once at node startup. The `disk_dir` must be writable; Foyer creates
+ /// it if needed. The memory tier is set to 0 — all entries go straight to disk.
+ ///
+ /// # Arguments
+ /// * `disk_capacity_bytes` — disk budget (e.g. 10 GB)
+ /// * `disk_dir` — directory for Foyer disk files (local NVMe path)
+ pub fn new(disk_capacity_bytes: usize, disk_dir: impl Into) -> Self {
+ let disk_dir = disk_dir.into();
+
+ log_info!(
+ "[FOYER-PAGE-CACHE] initializing disk-only page cache: disk={}B, dir={}",
+ disk_capacity_bytes, disk_dir.display()
+ );
+
+ // Foyer's HybridCacheBuilder::build() is async — use a temporary Tokio runtime
+ // to block on it. We only do this once at startup so the overhead is acceptable.
+ let rt = tokio::runtime::Runtime::new()
+ .expect("[FOYER-PAGE-CACHE] failed to create bootstrap Tokio runtime");
+
+ let disk_dir_clone = disk_dir.clone();
+ let inner: HybridCache = rt.block_on(async move {
+ HybridCacheBuilder::new()
+ .with_name("foyer-parquet-page-cache")
+ // Memory tier = 0: all entries go directly to the disk store.
+ // Foyer requires at least 1 byte for the memory tier internally,
+ // so we use a minimal 1-byte value — in practice nothing lives there.
+ .memory(1)
+ .with_eviction_config(LruConfig { high_priority_pool_ratio: 0.0 })
+ .storage()
+ .with_device_config(
+ DirectFsDeviceOptionsBuilder::new(disk_dir_clone)
+ .with_capacity(disk_capacity_bytes)
+ .build()
+ )
+ .build()
+ .await
+ .expect("[FOYER-PAGE-CACHE] failed to build Foyer HybridCache")
+ });
+
+ log_info!(
+ "[FOYER-PAGE-CACHE] disk-only page cache ready: disk={}B, dir={}",
+ disk_capacity_bytes, disk_dir.display()
+ );
+
+ // CRITICAL: keep `rt` alive as an Arc field.
+ // Foyer spawns background store tasks on this runtime during build().
+ // If `rt` is dropped here, those tasks are cancelled → JoinError::Cancelled panic.
+ let runtime = Arc::new(rt);
+
+ Self { inner, _runtime: runtime, disk_capacity_bytes, disk_dir }
+ }
+
+ // ── Key helpers ────────────────────────────────────────────────
+
+ /// Build the string cache key from a file path and a byte range.
+ /// Format: `":-"`
+ pub fn make_key(path: &str, start: usize, end: usize) -> String {
+ format!("{}:{}-{}", path, start, end)
+ }
+
+ // ── Cache operations ───────────────────────────────────────────
+
+ /// Async lookup. Returns `Some(Bytes)` on disk hit, `None` on miss.
+ pub async fn get(&self, path: &str, start: usize, end: usize) -> Option {
+ let key = Self::make_key(path, start, end);
+ match self.inner.get(&key).await {
+ Ok(Some(entry)) => {
+ log_debug!(
+ "[FOYER-PAGE-CACHE] HIT (disk): path={}, range={}..{}, key={}",
+ path, start, end, key
+ );
+ Some(entry.value().clone().into_bytes())
+ }
+ Ok(None) => {
+ log_debug!(
+ "[FOYER-PAGE-CACHE] MISS: path={}, range={}..{}",
+ path, start, end
+ );
+ None
+ }
+ Err(e) => {
+ log_error!(
+ "[FOYER-PAGE-CACHE] error reading cache: path={}, range={}..{}, err={}",
+ path, start, end, e
+ );
+ None
+ }
+ }
+ }
+
+ /// Synchronous get — blocks on Foyer's own runtime.
+ ///
+ /// Use this from JNI callbacks that cannot be `async`.
+ pub fn get_blocking(&self, path: &str, start: usize, end: usize) -> Option {
+ self._runtime.block_on(self.get(path, start, end))
+ }
+
+ /// Remove a single cached entry by its pre-built Foyer key string.
+ ///
+ /// Used by `TieredObjectStore::evict_file_cache` to perform precise, per-key
+ /// eviction when a file is deleted or tiered out.
+ pub fn remove_key(&self, key: &str) {
+ log_debug!(
+ "[FOYER-PAGE-CACHE] remove_key: key={}",
+ key
+ );
+ self.inner.remove(key);
+ }
+
+ /// Insert a byte range into the disk cache.
+ /// Foyer writes to disk asynchronously in the background.
+ pub fn put(&self, path: impl Into, start: usize, end: usize, value: Bytes) {
+ let path = path.into();
+ let key = Self::make_key(&path, start, end);
+ let size = value.len();
+ log_debug!(
+ "[FOYER-PAGE-CACHE] PUT: path={}, range={}..{}, size={}B, key={}",
+ path, start, end, size, key
+ );
+ self.inner.insert(key, CachedBytes::from_bytes(value));
+ }
+
+ /// Evict all cached byte ranges for a given file path.
+ ///
+ /// Called when a Parquet file is deleted (merged/compacted/tiered out).
+ /// Because Foyer does not support prefix-based removal, this is a no-op with a log warning.
+ /// Precise eviction is handled by `TieredObjectStore::evict_file_cache()` which uses
+ /// the per-file key index.
+ pub fn evict_file(&self, path: &str) {
+ log_info!(
+ "[FOYER-PAGE-CACHE] evict_file: path={} — use TieredObjectStore.evict_file_cache() \
+ for precise key-index eviction.",
+ path
+ );
+ }
+
+ /// Clear the entire disk cache.
+ pub async fn clear(&self) {
+ log_info!("[FOYER-PAGE-CACHE] clearing all entries from disk cache");
+ if let Err(e) = self.inner.clear().await {
+ log_error!("[FOYER-PAGE-CACHE] error during clear: {}", e);
+ }
+ }
+
+ /// Synchronous clear variant for JNI.
+ /// Runs on Foyer's own Tokio runtime so the async clear can complete cleanly.
+ pub fn clear_blocking(&self) {
+ self._runtime.block_on(self.clear());
+ }
+
+ /// Returns the configured disk capacity in bytes.
+ pub fn disk_capacity_bytes(&self) -> usize {
+ self.disk_capacity_bytes
+ }
+
+ /// Returns the disk directory.
+ pub fn disk_dir(&self) -> &std::path::Path {
+ &self.disk_dir
+ }
+}
diff --git a/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs b/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs
new file mode 100644
index 0000000000000..22058b0328ff2
--- /dev/null
+++ b/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs
@@ -0,0 +1,142 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+//! [`FoyerCacheManager`] — the single owner of Foyer disk-based page caching for
+//! tiered Parquet reads.
+//!
+//! ## Responsibilities
+//!
+//! 1. **Cache lifecycle** — constructs and holds the [`FoyerDiskPageCache`].
+//! 2. **Key index** — maintains a `DashMap>` so that
+//! all cached byte ranges for a file can be evicted precisely when the file is
+//! deleted or tiered out, without waiting for LRU expiry.
+//! 3. **Unified API** — `get`, `put`, `evict_file`, `clear` are the only entry
+//! points; callers do not need to know about Foyer internals.
+//!
+//! ## Ownership
+//!
+//! `FoyerCacheManager` is constructed once (via the
+//! `nativeCreateFoyerCache` JNI function) and shared as an
+//! `Arc` into `TieredObjectStore`.
+//!
+//! ## Log prefix
+//!
+//! All log lines use `[FOYER-PAGE-CACHE]`.
+
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use bytes::Bytes;
+use dashmap::DashMap;
+use vectorized_exec_spi::{log_debug, log_info};
+
+use super::foyer_cache::FoyerDiskPageCache;
+
+/// Manages a Foyer disk-only page cache for Parquet byte ranges.
+///
+/// Owns both the [`FoyerDiskPageCache`] and the per-file key index that enables
+/// precise eviction when a file is deleted.
+#[derive(Debug)]
+pub struct FoyerCacheManager {
+ /// The underlying Foyer disk cache.
+ cache: FoyerDiskPageCache,
+
+ /// Maps normalised file path → list of Foyer key strings for that file.
+ ///
+ /// Populated on every `put()`. Drained by `evict_file()` so that all ranges
+ /// for a deleted file are removed from Foyer precisely — no LRU wait.
+ key_index: DashMap>,
+}
+
+impl FoyerCacheManager {
+ /// Create a new `FoyerCacheManager`.
+ ///
+ /// # Arguments
+ /// * `disk_capacity_bytes` — total disk budget for the Foyer cache (e.g. 10 GB)
+ /// * `disk_dir` — local NVMe directory for Foyer data files
+ pub fn new(disk_capacity_bytes: usize, disk_dir: impl Into) -> Self {
+ let cache = FoyerDiskPageCache::new(disk_capacity_bytes, disk_dir);
+ log_info!(
+ "[FOYER-PAGE-CACHE] FoyerCacheManager created: disk={}B, dir={}",
+ cache.disk_capacity_bytes(),
+ cache.disk_dir().display()
+ );
+ Self {
+ cache,
+ key_index: DashMap::new(),
+ }
+ }
+
+ // ── Read ──────────────────────────────────────────────────────
+
+ /// Async cache lookup. Returns `Some(Bytes)` on disk hit, `None` on miss.
+ pub async fn get(&self, path: &str, start: usize, end: usize) -> Option {
+ self.cache.get(path, start, end).await
+ }
+
+ // ── Write ─────────────────────────────────────────────────────
+
+ /// Insert a byte range into the cache and record its key in the index.
+ pub fn put(&self, path: impl Into, start: usize, end: usize, value: Bytes) {
+ let path = path.into();
+ let foyer_key = FoyerDiskPageCache::make_key(&path, start, end);
+ self.cache.put(path.clone(), start, end, value);
+ // Record in key index for later precise eviction
+ self.key_index
+ .entry(path)
+ .or_default()
+ .push(foyer_key);
+ }
+
+ // ── Eviction ──────────────────────────────────────────────────
+
+ /// Precisely evict all cached byte ranges for `path`.
+ ///
+ /// Looks up the key index for the file, then calls `remove_key` for each
+ /// entry — no LRU wait, no prefix-scan needed.
+ pub fn evict_file(&self, path: &str) {
+ if let Some((_, keys)) = self.key_index.remove(path) {
+ log_info!(
+ "[FOYER-PAGE-CACHE] evict_file: path={}, removing {} cached ranges",
+ path, keys.len()
+ );
+ for key in keys {
+ self.cache.remove_key(&key);
+ }
+ } else {
+ log_debug!(
+ "[FOYER-PAGE-CACHE] evict_file: path={} not in key index (no cached ranges)",
+ path
+ );
+ }
+ }
+
+ /// Clear the entire cache (all files, all ranges).
+ pub fn clear_blocking(&self) {
+ log_info!("[FOYER-PAGE-CACHE] FoyerCacheManager: clearing all entries");
+ self.key_index.clear();
+ self.cache.clear_blocking();
+ }
+
+ // ── Introspection ─────────────────────────────────────────────
+
+ /// Returns the configured disk capacity in bytes.
+ pub fn disk_capacity_bytes(&self) -> usize {
+ self.cache.disk_capacity_bytes()
+ }
+
+ /// Returns the disk directory used by Foyer.
+ pub fn disk_dir(&self) -> &std::path::Path {
+ self.cache.disk_dir()
+ }
+
+ /// Returns the number of files currently tracked in the key index.
+ pub fn indexed_file_count(&self) -> usize {
+ self.key_index.len()
+ }
+}
diff --git a/plugins/engine-datafusion/jni/src/tiered/mod.rs b/plugins/engine-datafusion/jni/src/tiered/mod.rs
index 45f87571a114a..6f2fe00d1b709 100644
--- a/plugins/engine-datafusion/jni/src/tiered/mod.rs
+++ b/plugins/engine-datafusion/jni/src/tiered/mod.rs
@@ -18,10 +18,14 @@
//! `TieredStoreNativeBridge` interface, which dispatches here.
pub mod file_registry;
+pub mod foyer_cache;
+pub mod foyer_cache_manager;
pub mod remote_object_store;
pub mod store_factory;
pub mod tiered_object_store;
+pub use foyer_cache_manager::FoyerCacheManager;
+
use jni::JNIEnv;
use jni::objects::{JClass, JString};
use jni::sys::{jint, jlong, jlongArray};
@@ -53,15 +57,42 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_TieredStoreNativeBridg
vectorized_exec_spi::logger::init_logger_from_env(&env);
}
-/// Create a global TieredObjectStore (no remote stores yet — added via addRemoteStore).
+/// Create a global TieredObjectStore with an optional Foyer page cache.
+/// No remote stores yet — added via addRemoteStore.
/// Returns long[3]: [objectStoreDataPtr, objectStoreVtablePtr, registryPtr].
+///
+/// `disk_cache_bytes == 0` disables the page cache entirely.
#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_TieredStoreNativeBridgeImpl_nativeCreateTieredObjectStore(
mut env: JNIEnv, _class: JClass,
+ disk_cache_bytes: jlong,
+ disk_cache_dir: JString,
) -> jlongArray {
- log_info!("[TieredStoreNativeBridge] createTieredObjectStore (multi-repo)");
+
+ // Build Foyer cache manager inline if cache is configured.
+ let foyer: Option> = if disk_cache_bytes > 0 {
+ let dir: String = match env.get_string(&disk_cache_dir) {
+ Ok(s) => s.into(),
+ Err(e) => {
+ let _ = env.throw_new("java/lang/IllegalArgumentException",
+ format!("Invalid disk_cache_dir: {:?}", e));
+ return std::ptr::null_mut();
+ }
+ };
+ log_info!(
+ "[TieredStoreNativeBridge] Foyer page cache enabled: disk={}B, dir={}",
+ disk_cache_bytes, dir
+ );
+ Some(Arc::new(FoyerCacheManager::new(disk_cache_bytes as usize, dir)))
+ } else {
+ log_info!("[TieredStoreNativeBridge] Foyer page cache disabled (disk_cache_bytes=0)");
+ None
+ };
+
+ log_info!("[TieredStoreNativeBridge] createTieredObjectStore (foyer={})",
+ if foyer.is_some() { "enabled" } else { "disabled" });
- let (tiered_store, registry) = TieredObjectStore::new();
+ let (tiered_store, registry) = TieredObjectStore::new_with_cache(foyer);
let store: Arc = Arc::new(tiered_store);
let raw: *const dyn ObjectStore = Arc::into_raw(store);
diff --git a/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs b/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs
index 9802166a8bade..74e7071002eef 100644
--- a/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs
+++ b/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs
@@ -6,7 +6,30 @@
* compatible open source license.
*/
+//! [`TieredObjectStore`] — a single `ObjectStore` implementation that combines:
+//!
+//! 1. **Foyer disk cache** (optional, via [`FoyerCacheManager`]): intercepts
+//! `get_range`/`get_ranges` to serve cached byte ranges from local NVMe
+//! before hitting any storage backend.
+//! 2. **Local NVMe dispatch**: files whose [`FileRegistry`] state is `Local` or
+//! `Both` are read from the local filesystem.
+//! 3. **Remote store dispatch**: files whose state is `Remote` or `Both` are
+//! fetched from the appropriate S3/GCS/Azure store via the registry.
+//!
+//! ## Cache ownership
+//!
+//! The cache is owned by a [`FoyerCacheManager`] (which holds both the Foyer
+//! `HybridCache` and the per-file key index for precise eviction). `TieredObjectStore`
+//! holds an `Option>` so cache-disabled deployments have
+//! zero overhead.
+//!
+//! ## Log prefixes
+//!
+//! Page-cache log lines use `[FOYER-PAGE-CACHE]`; routing lines use
+//! `[TieredObjectStore]`.
+
use async_trait::async_trait;
+use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{
@@ -15,31 +38,44 @@ use object_store::{
local::LocalFileSystem, path::Path as ObjectPath,
};
use std::fmt::{self, Display, Formatter};
+use std::ops::Range;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use vectorized_exec_spi::log_info;
use super::file_registry::{FileLocation, FileRegistry};
+use super::foyer_cache_manager::FoyerCacheManager;
/// Global tiered object store registered with DataFusion for the `file://` scheme.
///
/// Dispatches reads to local filesystem or the appropriate remote store based on
-/// per-file state in the [`FileRegistry`]. Supports multiple remote repositories —
-/// each file entry knows which repo it belongs to, and the registry holds a map
-/// of `repo_key → Arc`.
+/// per-file state in the [`FileRegistry`]. When a [`FoyerCacheManager`] is provided,
+/// all `get_range`/`get_ranges` calls are intercepted and served from the disk cache
+/// before hitting local NVMe or remote storage.
#[derive(Debug)]
pub struct TieredObjectStore {
local_fs: Arc,
registry: Arc,
remote_reads: AtomicU64,
passthrough_reads: AtomicU64,
+
+ /// Optional Foyer disk page cache manager.
+ /// When `Some`, `get_range`/`get_ranges` check the cache before any I/O.
+ foyer: Option>,
}
impl TieredObjectStore {
- /// Create a new TieredObjectStore. Remote stores are added later via
- /// [`FileRegistry::add_remote_store`] as new repositories are encountered.
+ /// Create a new TieredObjectStore without a page cache.
pub fn new() -> (Self, Arc) {
- log_info!("[TieredObjectStore] created");
+ Self::new_with_cache(None)
+ }
+
+ /// Create a new TieredObjectStore with an optional Foyer cache manager.
+ pub fn new_with_cache(foyer: Option>) -> (Self, Arc) {
+ log_info!(
+ "[TieredObjectStore] created (foyer_cache={})",
+ if foyer.is_some() { "enabled" } else { "disabled" }
+ );
let registry = Arc::new(FileRegistry::new());
let local_fs = LocalFileSystem::new_with_prefix("/")
.expect("Failed to create LocalFileSystem with root prefix");
@@ -48,30 +84,42 @@ impl TieredObjectStore {
registry: Arc::clone(®istry),
remote_reads: AtomicU64::new(0),
passthrough_reads: AtomicU64::new(0),
+ foyer,
};
(store, registry)
}
+
+ // ── Path normalisation ─────────────────────────────────────────
+
+ /// Strip leading `/` to produce the Foyer cache key prefix.
+ fn cache_path(location: &ObjectPath) -> String {
+ let s = location.as_ref();
+ if s.starts_with('/') { s[1..].to_string() } else { s.to_string() }
+ }
}
impl Display for TieredObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
- f, "TieredObjectStore(remote_reads={}, passthrough={}, tracked_files={}, remote_stores={})",
+ f,
+ "TieredObjectStore(remote_reads={}, passthrough={}, tracked_files={}, remote_stores={}, foyer_cache={})",
self.remote_reads.load(Ordering::Relaxed),
self.passthrough_reads.load(Ordering::Relaxed),
self.registry.file_count(),
self.registry.remote_store_count(),
+ if self.foyer.is_some() { "enabled" } else { "disabled" },
)
}
}
#[async_trait]
impl ObjectStore for TieredObjectStore {
- async fn put_opts(&self, location: &ObjectPath, payload: PutPayload, opts: PutOptions) -> Result {
+ async fn put_opts(
+ &self, location: &ObjectPath, payload: PutPayload, opts: PutOptions,
+ ) -> Result {
let path_str = location.as_ref();
let size = payload.content_length() as u64;
log_info!("[TieredObjectStore] put_opts WRITE_LOCAL: file={}, size={}", path_str, size);
-
let result = self.local_fs.put_opts(location, payload, opts).await;
if result.is_ok() {
self.registry.register_local(path_str, size);
@@ -79,20 +127,17 @@ impl ObjectStore for TieredObjectStore {
result
}
- async fn put_multipart_opts(&self, location: &ObjectPath, opts: PutMultipartOptions) -> Result> {
+ async fn put_multipart_opts(
+ &self, location: &ObjectPath, opts: PutMultipartOptions,
+ ) -> Result> {
log_info!("[TieredObjectStore] put_multipart_opts: file={}", location);
self.local_fs.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &ObjectPath, options: GetOptions) -> Result {
let path_str = location.as_ref();
-
- // Check if this file has a remote path and a corresponding remote store
let remote_info = self.registry.get_remote_path(path_str)
- .and_then(|rp| {
- self.registry.get_remote_store_for_file(path_str)
- .map(|store| (rp, store))
- });
+ .and_then(|rp| self.registry.get_remote_store_for_file(path_str).map(|s| (rp, s)));
if let Some((remote_path, remote_store)) = remote_info {
let count = self.remote_reads.fetch_add(1, Ordering::Relaxed) + 1;
@@ -102,34 +147,104 @@ impl ObjectStore for TieredObjectStore {
"[TieredObjectStore] get_opts #{} DISPATCHING_TO_REMOTE location={} remote_path={} repo={} registry_state={}",
count, location, remote_path, repo_key, loc
);
-
let remote_location = ObjectPath::from(remote_path.clone());
let result = remote_store.get_opts(&remote_location, options).await;
-
match &result {
- Ok(r) => log_info!(
- "[TieredObjectStore] get_opts #{} REMOTE_READ_SUCCESS remote_path={} bytes={}",
- count, remote_path, r.meta.size
- ),
- Err(e) => log_info!(
- "[TieredObjectStore] get_opts #{} REMOTE_READ_FAILED remote_path={} error={}",
- count, remote_path, e
- ),
+ Ok(r) => log_info!("[TieredObjectStore] get_opts #{} REMOTE_READ_SUCCESS remote_path={} bytes={}", count, remote_path, r.meta.size),
+ Err(e) => log_info!("[TieredObjectStore] get_opts #{} REMOTE_READ_FAILED remote_path={} error={}", count, remote_path, e),
}
result
} else {
let count = self.passthrough_reads.fetch_add(1, Ordering::Relaxed) + 1;
+ log_info!("[TieredObjectStore] get_opts #{} READ_FROM_LOCAL: {}", count, location);
+ self.local_fs.get_opts(location, options).await
+ }
+ }
+
+ // ── Range reads: intercepted by FoyerCacheManager (if configured) ─────
+
+ async fn get_range(&self, location: &ObjectPath, range: Range) -> Result {
+ let path_str = Self::cache_path(location);
+ let start = range.start as usize;
+ let end = range.end as usize;
+
+ if let Some(foyer) = &self.foyer {
+ if let Some(cached) = foyer.get(&path_str, start, end).await {
+ log_info!(
+ "[FOYER-PAGE-CACHE] get_range HIT: path={}, range={}..{}, size={}B",
+ path_str, start, end, cached.len()
+ );
+ return Ok(cached);
+ }
log_info!(
- "[TieredObjectStore] get_opts #{} READ_FROM_LOCAL (no remote path/store in registry): {}",
- count, location
+ "[FOYER-PAGE-CACHE] get_range MISS → backing store: path={}, range={}..{}",
+ path_str, start, end
);
- self.local_fs.get_opts(location, options).await
+ let bytes = self.backing_get_range(location, range).await?;
+ log_info!(
+ "[FOYER-PAGE-CACHE] get_range PUT: path={}, range={}..{}, size={}B",
+ path_str, start, end, bytes.len()
+ );
+ foyer.put(path_str, start, end, bytes.clone());
+ Ok(bytes)
+ } else {
+ self.backing_get_range(location, range).await
+ }
+ }
+
+ async fn get_ranges(&self, location: &ObjectPath, ranges: &[Range]) -> Result> {
+ let path_str = Self::cache_path(location);
+
+ if let Some(foyer) = &self.foyer {
+ let mut results: Vec