From 62c5a33d026879a1f8264567c36f9f46a80dd0d8 Mon Sep 17 00:00:00 2001 From: Vishwas Garg Date: Thu, 9 Apr 2026 12:42:40 +0530 Subject: [PATCH] Enable Foyer disk based cache for Parquet reads --- .../jni/TieredStoreNativeBridge.java | 6 +- .../libparquet_dataformat_jni.dylib | Bin 13518111 -> 13518110 bytes plugins/engine-datafusion/Cargo.toml | 2 + plugins/engine-datafusion/jni/Cargo.toml | 6 + plugins/engine-datafusion/jni/src/cache.rs | 1 + .../jni/src/tiered/foyer_cache.rs | 257 ++++++++++++++++++ .../jni/src/tiered/foyer_cache_manager.rs | 142 ++++++++++ .../engine-datafusion/jni/src/tiered/mod.rs | 37 ++- .../jni/src/tiered/tiered_object_store.rs | 240 +++++++++++++--- .../datafusion/DataFusionPlugin.java | 4 +- .../datafusion/core/DataFusionRuntimeEnv.java | 12 + .../jni/TieredStoreNativeBridgeImpl.java | 66 ++++- .../search/cache/TieredCacheSettings.java | 84 ++++++ 13 files changed, 811 insertions(+), 46 deletions(-) create mode 100644 plugins/engine-datafusion/jni/src/tiered/foyer_cache.rs create mode 100644 plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/TieredCacheSettings.java diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java index 43baf02e53a77..adb05efe737fc 100644 --- a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/TieredStoreNativeBridge.java @@ -31,9 +31,13 @@ public interface TieredStoreNativeBridge { void initLogger(); /** - * Create a global {@code TieredObjectStore} with no remote stores. + * Create a global {@code TieredObjectStore} with an optional Foyer disk page cache. * Remote stores are added later via {@link #addRemoteStore}. * + *

The Foyer {@code FoyerCacheManager} is created inside the native layer. + * Pass {@code diskCacheBytes = 0} (or empty {@code diskCacheDir}) to disable + * the page cache entirely. + * * @return {@code long[3]}: {@code [objectStoreDataPtr, objectStoreVtablePtr, registryPtr]} */ long[] createTieredObjectStore(); diff --git a/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib b/modules/parquet-data-format/src/main/resources/native/macos-aarch64/libparquet_dataformat_jni.dylib index 2dc0c63440296128b0a4e0fd1781187a5322dc01..92a92d66a0352feb2ca3f494a194da7cb94068a8 100644 GIT binary patch delta 651 zcmWN=2UiUM0D#eBl`XRODQ{O%3mX}nA zlsS(EG^7!YX+j90G^H8MX+cX`(V8}d(Ux|E)1D4=q!XR#LRY%cogVb07rhA(K_q?X zOFyFMPc#D<$RGwYgrN*$I3pOzC`L1ev5aFpF-%}0lbFmDVwp-D@g$H)63I+sIy0Eb zEM}8JDsxC9oebtOk4)yXfQ2k#F-us=vdXONKv6J|<>g-Cm0so5UgNc1=k?y;jo#$V z-r}v^=I!3$o!;f$-s5Zsz1RD^-v^xIgFfV3=lQVneZ;uHg)VZjkGjOieB38|(xpD- z(>~*~KIikk;ETTG%f8~PzUJ#LbGdK$rf>PS@A$6o`MxXszz_Y%kNw0?{mjq(!Y}>G wm459vuJT(~`<>tWgKPZJpZwWh{MFz5-9P-(wf^Pb{^L6T^}niM;D1GAJysJ4RR910 delta 653 zcmWN=2UiUM0D#exk&LpkLVAjX>@6#+tdNnJz4v$}D}0Fv*|Th!8Ig>XBKtS_?m4%( zpfa??xrESwhBTrvp)?_krZl5DEeI!qmb9WZZD>m*?PyO2I?{nXCCuez(O)u#A24Plw~Y0&kP1a!t;ZHOt0`tukvcI@mjC*dT;PX zZ}Mhu@m6p1cJJ^`@A7W%@m>d=!#91)#V+w}-|=1F^L;:-"` (a plain String) +//! Example: `"data/nodes/0/indices/UUID/0/index/parquet/_parquet_0.parquet:4096-8192"` +//! +//! ## Log prefix +//! +//! All log lines use the `[FOYER-PAGE-CACHE]` prefix so the caching flow can be easily +//! grepped in the OpenSearch logs. + +use std::path::PathBuf; +use std::sync::Arc; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use foyer::{HybridCache, HybridCacheBuilder, DirectFsDeviceOptionsBuilder, LruConfig}; +use vectorized_exec_spi::{log_debug, log_info, log_error}; + +// ──────────────────────────────────────────────────────────────────── +// Value wrapper: Bytes does not implement serde, so wrap it. +// ──────────────────────────────────────────────────────────────────── + +/// Newtype wrapper around [`Bytes`] that implements `serde::Serialize/Deserialize` +/// so it satisfies Foyer's `StorageValue` bound for disk persistence. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CachedBytes(#[serde(with = "serde_bytes")] Vec); + +impl CachedBytes { + pub fn from_bytes(b: Bytes) -> Self { + Self(b.to_vec()) + } + pub fn into_bytes(self) -> Bytes { + Bytes::from(self.0) + } +} + +// ──────────────────────────────────────────────────────────────────── +// Main cache struct +// ──────────────────────────────────────────────────────────────────── + +/// Foyer **disk-only** page cache for Parquet byte ranges. +/// +/// Memory tier is disabled (0 bytes) — all entries are stored on the NVMe disk store +/// bounded by `disk_capacity_bytes` in `disk_dir`. +/// +/// Thread-safe and cheap to clone (inner `HybridCache` is `Arc`-backed). +/// +/// **Important**: The `tokio::runtime::Runtime` used to build Foyer must stay alive +/// for the entire lifetime of the `HybridCache`. Foyer spawns background I/O tasks +/// on that runtime during `build().await`; dropping the runtime cancels those tasks, +/// which causes `JoinError::Cancelled` panics in `foyer-storage`. We therefore keep +/// the runtime as an `Arc` field so it is dropped only after the `HybridCache` itself. +#[derive(Clone)] +pub struct FoyerDiskPageCache { + inner: HybridCache, + /// The Tokio runtime that owns Foyer's background tasks. + /// Must outlive `inner` — Arc ensures it is dropped last. + _runtime: Arc, + disk_capacity_bytes: usize, + disk_dir: PathBuf, +} + +impl std::fmt::Debug for FoyerDiskPageCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "FoyerDiskPageCache(disk={}B, dir={:?})", + self.disk_capacity_bytes, self.disk_dir) + } +} + +impl FoyerDiskPageCache { + /// Build the cache synchronously by blocking a Tokio runtime. + /// + /// Called once at node startup. The `disk_dir` must be writable; Foyer creates + /// it if needed. The memory tier is set to 0 — all entries go straight to disk. + /// + /// # Arguments + /// * `disk_capacity_bytes` — disk budget (e.g. 10 GB) + /// * `disk_dir` — directory for Foyer disk files (local NVMe path) + pub fn new(disk_capacity_bytes: usize, disk_dir: impl Into) -> Self { + let disk_dir = disk_dir.into(); + + log_info!( + "[FOYER-PAGE-CACHE] initializing disk-only page cache: disk={}B, dir={}", + disk_capacity_bytes, disk_dir.display() + ); + + // Foyer's HybridCacheBuilder::build() is async — use a temporary Tokio runtime + // to block on it. We only do this once at startup so the overhead is acceptable. + let rt = tokio::runtime::Runtime::new() + .expect("[FOYER-PAGE-CACHE] failed to create bootstrap Tokio runtime"); + + let disk_dir_clone = disk_dir.clone(); + let inner: HybridCache = rt.block_on(async move { + HybridCacheBuilder::new() + .with_name("foyer-parquet-page-cache") + // Memory tier = 0: all entries go directly to the disk store. + // Foyer requires at least 1 byte for the memory tier internally, + // so we use a minimal 1-byte value — in practice nothing lives there. + .memory(1) + .with_eviction_config(LruConfig { high_priority_pool_ratio: 0.0 }) + .storage() + .with_device_config( + DirectFsDeviceOptionsBuilder::new(disk_dir_clone) + .with_capacity(disk_capacity_bytes) + .build() + ) + .build() + .await + .expect("[FOYER-PAGE-CACHE] failed to build Foyer HybridCache") + }); + + log_info!( + "[FOYER-PAGE-CACHE] disk-only page cache ready: disk={}B, dir={}", + disk_capacity_bytes, disk_dir.display() + ); + + // CRITICAL: keep `rt` alive as an Arc field. + // Foyer spawns background store tasks on this runtime during build(). + // If `rt` is dropped here, those tasks are cancelled → JoinError::Cancelled panic. + let runtime = Arc::new(rt); + + Self { inner, _runtime: runtime, disk_capacity_bytes, disk_dir } + } + + // ── Key helpers ──────────────────────────────────────────────── + + /// Build the string cache key from a file path and a byte range. + /// Format: `":-"` + pub fn make_key(path: &str, start: usize, end: usize) -> String { + format!("{}:{}-{}", path, start, end) + } + + // ── Cache operations ─────────────────────────────────────────── + + /// Async lookup. Returns `Some(Bytes)` on disk hit, `None` on miss. + pub async fn get(&self, path: &str, start: usize, end: usize) -> Option { + let key = Self::make_key(path, start, end); + match self.inner.get(&key).await { + Ok(Some(entry)) => { + log_debug!( + "[FOYER-PAGE-CACHE] HIT (disk): path={}, range={}..{}, key={}", + path, start, end, key + ); + Some(entry.value().clone().into_bytes()) + } + Ok(None) => { + log_debug!( + "[FOYER-PAGE-CACHE] MISS: path={}, range={}..{}", + path, start, end + ); + None + } + Err(e) => { + log_error!( + "[FOYER-PAGE-CACHE] error reading cache: path={}, range={}..{}, err={}", + path, start, end, e + ); + None + } + } + } + + /// Synchronous get — blocks on Foyer's own runtime. + /// + /// Use this from JNI callbacks that cannot be `async`. + pub fn get_blocking(&self, path: &str, start: usize, end: usize) -> Option { + self._runtime.block_on(self.get(path, start, end)) + } + + /// Remove a single cached entry by its pre-built Foyer key string. + /// + /// Used by `TieredObjectStore::evict_file_cache` to perform precise, per-key + /// eviction when a file is deleted or tiered out. + pub fn remove_key(&self, key: &str) { + log_debug!( + "[FOYER-PAGE-CACHE] remove_key: key={}", + key + ); + self.inner.remove(key); + } + + /// Insert a byte range into the disk cache. + /// Foyer writes to disk asynchronously in the background. + pub fn put(&self, path: impl Into, start: usize, end: usize, value: Bytes) { + let path = path.into(); + let key = Self::make_key(&path, start, end); + let size = value.len(); + log_debug!( + "[FOYER-PAGE-CACHE] PUT: path={}, range={}..{}, size={}B, key={}", + path, start, end, size, key + ); + self.inner.insert(key, CachedBytes::from_bytes(value)); + } + + /// Evict all cached byte ranges for a given file path. + /// + /// Called when a Parquet file is deleted (merged/compacted/tiered out). + /// Because Foyer does not support prefix-based removal, this is a no-op with a log warning. + /// Precise eviction is handled by `TieredObjectStore::evict_file_cache()` which uses + /// the per-file key index. + pub fn evict_file(&self, path: &str) { + log_info!( + "[FOYER-PAGE-CACHE] evict_file: path={} — use TieredObjectStore.evict_file_cache() \ + for precise key-index eviction.", + path + ); + } + + /// Clear the entire disk cache. + pub async fn clear(&self) { + log_info!("[FOYER-PAGE-CACHE] clearing all entries from disk cache"); + if let Err(e) = self.inner.clear().await { + log_error!("[FOYER-PAGE-CACHE] error during clear: {}", e); + } + } + + /// Synchronous clear variant for JNI. + /// Runs on Foyer's own Tokio runtime so the async clear can complete cleanly. + pub fn clear_blocking(&self) { + self._runtime.block_on(self.clear()); + } + + /// Returns the configured disk capacity in bytes. + pub fn disk_capacity_bytes(&self) -> usize { + self.disk_capacity_bytes + } + + /// Returns the disk directory. + pub fn disk_dir(&self) -> &std::path::Path { + &self.disk_dir + } +} diff --git a/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs b/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs new file mode 100644 index 0000000000000..22058b0328ff2 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/tiered/foyer_cache_manager.rs @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! [`FoyerCacheManager`] — the single owner of Foyer disk-based page caching for +//! tiered Parquet reads. +//! +//! ## Responsibilities +//! +//! 1. **Cache lifecycle** — constructs and holds the [`FoyerDiskPageCache`]. +//! 2. **Key index** — maintains a `DashMap>` so that +//! all cached byte ranges for a file can be evicted precisely when the file is +//! deleted or tiered out, without waiting for LRU expiry. +//! 3. **Unified API** — `get`, `put`, `evict_file`, `clear` are the only entry +//! points; callers do not need to know about Foyer internals. +//! +//! ## Ownership +//! +//! `FoyerCacheManager` is constructed once (via the +//! `nativeCreateFoyerCache` JNI function) and shared as an +//! `Arc` into `TieredObjectStore`. +//! +//! ## Log prefix +//! +//! All log lines use `[FOYER-PAGE-CACHE]`. + +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::Bytes; +use dashmap::DashMap; +use vectorized_exec_spi::{log_debug, log_info}; + +use super::foyer_cache::FoyerDiskPageCache; + +/// Manages a Foyer disk-only page cache for Parquet byte ranges. +/// +/// Owns both the [`FoyerDiskPageCache`] and the per-file key index that enables +/// precise eviction when a file is deleted. +#[derive(Debug)] +pub struct FoyerCacheManager { + /// The underlying Foyer disk cache. + cache: FoyerDiskPageCache, + + /// Maps normalised file path → list of Foyer key strings for that file. + /// + /// Populated on every `put()`. Drained by `evict_file()` so that all ranges + /// for a deleted file are removed from Foyer precisely — no LRU wait. + key_index: DashMap>, +} + +impl FoyerCacheManager { + /// Create a new `FoyerCacheManager`. + /// + /// # Arguments + /// * `disk_capacity_bytes` — total disk budget for the Foyer cache (e.g. 10 GB) + /// * `disk_dir` — local NVMe directory for Foyer data files + pub fn new(disk_capacity_bytes: usize, disk_dir: impl Into) -> Self { + let cache = FoyerDiskPageCache::new(disk_capacity_bytes, disk_dir); + log_info!( + "[FOYER-PAGE-CACHE] FoyerCacheManager created: disk={}B, dir={}", + cache.disk_capacity_bytes(), + cache.disk_dir().display() + ); + Self { + cache, + key_index: DashMap::new(), + } + } + + // ── Read ────────────────────────────────────────────────────── + + /// Async cache lookup. Returns `Some(Bytes)` on disk hit, `None` on miss. + pub async fn get(&self, path: &str, start: usize, end: usize) -> Option { + self.cache.get(path, start, end).await + } + + // ── Write ───────────────────────────────────────────────────── + + /// Insert a byte range into the cache and record its key in the index. + pub fn put(&self, path: impl Into, start: usize, end: usize, value: Bytes) { + let path = path.into(); + let foyer_key = FoyerDiskPageCache::make_key(&path, start, end); + self.cache.put(path.clone(), start, end, value); + // Record in key index for later precise eviction + self.key_index + .entry(path) + .or_default() + .push(foyer_key); + } + + // ── Eviction ────────────────────────────────────────────────── + + /// Precisely evict all cached byte ranges for `path`. + /// + /// Looks up the key index for the file, then calls `remove_key` for each + /// entry — no LRU wait, no prefix-scan needed. + pub fn evict_file(&self, path: &str) { + if let Some((_, keys)) = self.key_index.remove(path) { + log_info!( + "[FOYER-PAGE-CACHE] evict_file: path={}, removing {} cached ranges", + path, keys.len() + ); + for key in keys { + self.cache.remove_key(&key); + } + } else { + log_debug!( + "[FOYER-PAGE-CACHE] evict_file: path={} not in key index (no cached ranges)", + path + ); + } + } + + /// Clear the entire cache (all files, all ranges). + pub fn clear_blocking(&self) { + log_info!("[FOYER-PAGE-CACHE] FoyerCacheManager: clearing all entries"); + self.key_index.clear(); + self.cache.clear_blocking(); + } + + // ── Introspection ───────────────────────────────────────────── + + /// Returns the configured disk capacity in bytes. + pub fn disk_capacity_bytes(&self) -> usize { + self.cache.disk_capacity_bytes() + } + + /// Returns the disk directory used by Foyer. + pub fn disk_dir(&self) -> &std::path::Path { + self.cache.disk_dir() + } + + /// Returns the number of files currently tracked in the key index. + pub fn indexed_file_count(&self) -> usize { + self.key_index.len() + } +} diff --git a/plugins/engine-datafusion/jni/src/tiered/mod.rs b/plugins/engine-datafusion/jni/src/tiered/mod.rs index 45f87571a114a..6f2fe00d1b709 100644 --- a/plugins/engine-datafusion/jni/src/tiered/mod.rs +++ b/plugins/engine-datafusion/jni/src/tiered/mod.rs @@ -18,10 +18,14 @@ //! `TieredStoreNativeBridge` interface, which dispatches here. pub mod file_registry; +pub mod foyer_cache; +pub mod foyer_cache_manager; pub mod remote_object_store; pub mod store_factory; pub mod tiered_object_store; +pub use foyer_cache_manager::FoyerCacheManager; + use jni::JNIEnv; use jni::objects::{JClass, JString}; use jni::sys::{jint, jlong, jlongArray}; @@ -53,15 +57,42 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_TieredStoreNativeBridg vectorized_exec_spi::logger::init_logger_from_env(&env); } -/// Create a global TieredObjectStore (no remote stores yet — added via addRemoteStore). +/// Create a global TieredObjectStore with an optional Foyer page cache. +/// No remote stores yet — added via addRemoteStore. /// Returns long[3]: [objectStoreDataPtr, objectStoreVtablePtr, registryPtr]. +/// +/// `disk_cache_bytes == 0` disables the page cache entirely. #[no_mangle] pub extern "system" fn Java_org_opensearch_datafusion_jni_TieredStoreNativeBridgeImpl_nativeCreateTieredObjectStore( mut env: JNIEnv, _class: JClass, + disk_cache_bytes: jlong, + disk_cache_dir: JString, ) -> jlongArray { - log_info!("[TieredStoreNativeBridge] createTieredObjectStore (multi-repo)"); + + // Build Foyer cache manager inline if cache is configured. + let foyer: Option> = if disk_cache_bytes > 0 { + let dir: String = match env.get_string(&disk_cache_dir) { + Ok(s) => s.into(), + Err(e) => { + let _ = env.throw_new("java/lang/IllegalArgumentException", + format!("Invalid disk_cache_dir: {:?}", e)); + return std::ptr::null_mut(); + } + }; + log_info!( + "[TieredStoreNativeBridge] Foyer page cache enabled: disk={}B, dir={}", + disk_cache_bytes, dir + ); + Some(Arc::new(FoyerCacheManager::new(disk_cache_bytes as usize, dir))) + } else { + log_info!("[TieredStoreNativeBridge] Foyer page cache disabled (disk_cache_bytes=0)"); + None + }; + + log_info!("[TieredStoreNativeBridge] createTieredObjectStore (foyer={})", + if foyer.is_some() { "enabled" } else { "disabled" }); - let (tiered_store, registry) = TieredObjectStore::new(); + let (tiered_store, registry) = TieredObjectStore::new_with_cache(foyer); let store: Arc = Arc::new(tiered_store); let raw: *const dyn ObjectStore = Arc::into_raw(store); diff --git a/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs b/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs index 9802166a8bade..74e7071002eef 100644 --- a/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs +++ b/plugins/engine-datafusion/jni/src/tiered/tiered_object_store.rs @@ -6,7 +6,30 @@ * compatible open source license. */ +//! [`TieredObjectStore`] — a single `ObjectStore` implementation that combines: +//! +//! 1. **Foyer disk cache** (optional, via [`FoyerCacheManager`]): intercepts +//! `get_range`/`get_ranges` to serve cached byte ranges from local NVMe +//! before hitting any storage backend. +//! 2. **Local NVMe dispatch**: files whose [`FileRegistry`] state is `Local` or +//! `Both` are read from the local filesystem. +//! 3. **Remote store dispatch**: files whose state is `Remote` or `Both` are +//! fetched from the appropriate S3/GCS/Azure store via the registry. +//! +//! ## Cache ownership +//! +//! The cache is owned by a [`FoyerCacheManager`] (which holds both the Foyer +//! `HybridCache` and the per-file key index for precise eviction). `TieredObjectStore` +//! holds an `Option>` so cache-disabled deployments have +//! zero overhead. +//! +//! ## Log prefixes +//! +//! Page-cache log lines use `[FOYER-PAGE-CACHE]`; routing lines use +//! `[TieredObjectStore]`. + use async_trait::async_trait; +use bytes::Bytes; use futures::stream::BoxStream; use futures::StreamExt; use object_store::{ @@ -15,31 +38,44 @@ use object_store::{ local::LocalFileSystem, path::Path as ObjectPath, }; use std::fmt::{self, Display, Formatter}; +use std::ops::Range; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use vectorized_exec_spi::log_info; use super::file_registry::{FileLocation, FileRegistry}; +use super::foyer_cache_manager::FoyerCacheManager; /// Global tiered object store registered with DataFusion for the `file://` scheme. /// /// Dispatches reads to local filesystem or the appropriate remote store based on -/// per-file state in the [`FileRegistry`]. Supports multiple remote repositories — -/// each file entry knows which repo it belongs to, and the registry holds a map -/// of `repo_key → Arc`. +/// per-file state in the [`FileRegistry`]. When a [`FoyerCacheManager`] is provided, +/// all `get_range`/`get_ranges` calls are intercepted and served from the disk cache +/// before hitting local NVMe or remote storage. #[derive(Debug)] pub struct TieredObjectStore { local_fs: Arc, registry: Arc, remote_reads: AtomicU64, passthrough_reads: AtomicU64, + + /// Optional Foyer disk page cache manager. + /// When `Some`, `get_range`/`get_ranges` check the cache before any I/O. + foyer: Option>, } impl TieredObjectStore { - /// Create a new TieredObjectStore. Remote stores are added later via - /// [`FileRegistry::add_remote_store`] as new repositories are encountered. + /// Create a new TieredObjectStore without a page cache. pub fn new() -> (Self, Arc) { - log_info!("[TieredObjectStore] created"); + Self::new_with_cache(None) + } + + /// Create a new TieredObjectStore with an optional Foyer cache manager. + pub fn new_with_cache(foyer: Option>) -> (Self, Arc) { + log_info!( + "[TieredObjectStore] created (foyer_cache={})", + if foyer.is_some() { "enabled" } else { "disabled" } + ); let registry = Arc::new(FileRegistry::new()); let local_fs = LocalFileSystem::new_with_prefix("/") .expect("Failed to create LocalFileSystem with root prefix"); @@ -48,30 +84,42 @@ impl TieredObjectStore { registry: Arc::clone(®istry), remote_reads: AtomicU64::new(0), passthrough_reads: AtomicU64::new(0), + foyer, }; (store, registry) } + + // ── Path normalisation ───────────────────────────────────────── + + /// Strip leading `/` to produce the Foyer cache key prefix. + fn cache_path(location: &ObjectPath) -> String { + let s = location.as_ref(); + if s.starts_with('/') { s[1..].to_string() } else { s.to_string() } + } } impl Display for TieredObjectStore { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( - f, "TieredObjectStore(remote_reads={}, passthrough={}, tracked_files={}, remote_stores={})", + f, + "TieredObjectStore(remote_reads={}, passthrough={}, tracked_files={}, remote_stores={}, foyer_cache={})", self.remote_reads.load(Ordering::Relaxed), self.passthrough_reads.load(Ordering::Relaxed), self.registry.file_count(), self.registry.remote_store_count(), + if self.foyer.is_some() { "enabled" } else { "disabled" }, ) } } #[async_trait] impl ObjectStore for TieredObjectStore { - async fn put_opts(&self, location: &ObjectPath, payload: PutPayload, opts: PutOptions) -> Result { + async fn put_opts( + &self, location: &ObjectPath, payload: PutPayload, opts: PutOptions, + ) -> Result { let path_str = location.as_ref(); let size = payload.content_length() as u64; log_info!("[TieredObjectStore] put_opts WRITE_LOCAL: file={}, size={}", path_str, size); - let result = self.local_fs.put_opts(location, payload, opts).await; if result.is_ok() { self.registry.register_local(path_str, size); @@ -79,20 +127,17 @@ impl ObjectStore for TieredObjectStore { result } - async fn put_multipart_opts(&self, location: &ObjectPath, opts: PutMultipartOptions) -> Result> { + async fn put_multipart_opts( + &self, location: &ObjectPath, opts: PutMultipartOptions, + ) -> Result> { log_info!("[TieredObjectStore] put_multipart_opts: file={}", location); self.local_fs.put_multipart_opts(location, opts).await } async fn get_opts(&self, location: &ObjectPath, options: GetOptions) -> Result { let path_str = location.as_ref(); - - // Check if this file has a remote path and a corresponding remote store let remote_info = self.registry.get_remote_path(path_str) - .and_then(|rp| { - self.registry.get_remote_store_for_file(path_str) - .map(|store| (rp, store)) - }); + .and_then(|rp| self.registry.get_remote_store_for_file(path_str).map(|s| (rp, s))); if let Some((remote_path, remote_store)) = remote_info { let count = self.remote_reads.fetch_add(1, Ordering::Relaxed) + 1; @@ -102,34 +147,104 @@ impl ObjectStore for TieredObjectStore { "[TieredObjectStore] get_opts #{} DISPATCHING_TO_REMOTE location={} remote_path={} repo={} registry_state={}", count, location, remote_path, repo_key, loc ); - let remote_location = ObjectPath::from(remote_path.clone()); let result = remote_store.get_opts(&remote_location, options).await; - match &result { - Ok(r) => log_info!( - "[TieredObjectStore] get_opts #{} REMOTE_READ_SUCCESS remote_path={} bytes={}", - count, remote_path, r.meta.size - ), - Err(e) => log_info!( - "[TieredObjectStore] get_opts #{} REMOTE_READ_FAILED remote_path={} error={}", - count, remote_path, e - ), + Ok(r) => log_info!("[TieredObjectStore] get_opts #{} REMOTE_READ_SUCCESS remote_path={} bytes={}", count, remote_path, r.meta.size), + Err(e) => log_info!("[TieredObjectStore] get_opts #{} REMOTE_READ_FAILED remote_path={} error={}", count, remote_path, e), } result } else { let count = self.passthrough_reads.fetch_add(1, Ordering::Relaxed) + 1; + log_info!("[TieredObjectStore] get_opts #{} READ_FROM_LOCAL: {}", count, location); + self.local_fs.get_opts(location, options).await + } + } + + // ── Range reads: intercepted by FoyerCacheManager (if configured) ───── + + async fn get_range(&self, location: &ObjectPath, range: Range) -> Result { + let path_str = Self::cache_path(location); + let start = range.start as usize; + let end = range.end as usize; + + if let Some(foyer) = &self.foyer { + if let Some(cached) = foyer.get(&path_str, start, end).await { + log_info!( + "[FOYER-PAGE-CACHE] get_range HIT: path={}, range={}..{}, size={}B", + path_str, start, end, cached.len() + ); + return Ok(cached); + } log_info!( - "[TieredObjectStore] get_opts #{} READ_FROM_LOCAL (no remote path/store in registry): {}", - count, location + "[FOYER-PAGE-CACHE] get_range MISS → backing store: path={}, range={}..{}", + path_str, start, end ); - self.local_fs.get_opts(location, options).await + let bytes = self.backing_get_range(location, range).await?; + log_info!( + "[FOYER-PAGE-CACHE] get_range PUT: path={}, range={}..{}, size={}B", + path_str, start, end, bytes.len() + ); + foyer.put(path_str, start, end, bytes.clone()); + Ok(bytes) + } else { + self.backing_get_range(location, range).await + } + } + + async fn get_ranges(&self, location: &ObjectPath, ranges: &[Range]) -> Result> { + let path_str = Self::cache_path(location); + + if let Some(foyer) = &self.foyer { + let mut results: Vec> = vec![None; ranges.len()]; + let mut miss_indices: Vec = Vec::new(); + + for (i, range) in ranges.iter().enumerate() { + let start = range.start as usize; + let end = range.end as usize; + if let Some(cached) = foyer.get(&path_str, start, end).await { + log_info!( + "[FOYER-PAGE-CACHE] get_ranges HIT [{}/{}]: path={}, range={}..{}, size={}B", + i + 1, ranges.len(), path_str, start, end, cached.len() + ); + results[i] = Some(cached); + } else { + miss_indices.push(i); + } + } + + if miss_indices.is_empty() { + log_info!("[FOYER-PAGE-CACHE] get_ranges ALL HIT: path={}, {} ranges", path_str, ranges.len()); + return Ok(results.into_iter().map(|b| b.unwrap()).collect()); + } + + log_info!( + "[FOYER-PAGE-CACHE] get_ranges PARTIAL MISS: path={}, {}/{} ranges need fetch", + path_str, miss_indices.len(), ranges.len() + ); + let miss_ranges: Vec> = miss_indices.iter().map(|&i| ranges[i].clone()).collect(); + let fetched = self.backing_get_ranges(location, &miss_ranges).await?; + + for (miss_idx, fetched_bytes) in miss_indices.iter().zip(fetched.into_iter()) { + let range = &ranges[*miss_idx]; + let start = range.start as usize; + let end = range.end as usize; + log_info!( + "[FOYER-PAGE-CACHE] get_ranges PUT: path={}, range={}..{}, size={}B", + path_str, start, end, fetched_bytes.len() + ); + foyer.put(path_str.clone(), start, end, fetched_bytes.clone()); + results[*miss_idx] = Some(fetched_bytes); + } + + Ok(results.into_iter().map(|b| b.unwrap()).collect()) + } else { + self.backing_get_ranges(location, ranges).await } } async fn head(&self, location: &ObjectPath) -> Result { let path_str = location.as_ref(); - match self.local_fs.head(location).await { Ok(meta) => { self.registry.register(path_str, FileLocation::Local, meta.size as u64); @@ -138,13 +253,8 @@ impl ObjectStore for TieredObjectStore { Ok(meta) } Err(local_err) => { - // Try remote if we have a remote store for this file let remote_info = self.registry.get_remote_path(path_str) - .and_then(|rp| { - self.registry.get_remote_store_for_file(path_str) - .map(|store| (rp, store)) - }); - + .and_then(|rp| self.registry.get_remote_store_for_file(path_str).map(|s| (rp, s))); if let Some((remote_path, remote_store)) = remote_info { let remote_loc = ObjectPath::from(remote_path); match remote_store.head(&remote_loc).await { @@ -169,6 +279,13 @@ impl ObjectStore for TieredObjectStore { async fn delete(&self, location: &ObjectPath) -> Result<()> { let path_str = location.as_ref(); + let cache_path = Self::cache_path(location); + + // Evict Foyer cache entries for this file before deleting from disk. + if let Some(foyer) = &self.foyer { + foyer.evict_file(&cache_path); + } + if self.registry.can_delete_local(path_str) { log_info!("[TieredObjectStore] delete: {} (on remote, no active reads — safe to delete local)", path_str); let result = self.local_fs.delete(location).await; @@ -187,11 +304,9 @@ impl ObjectStore for TieredObjectStore { fn list(&self, prefix: Option<&ObjectPath>) -> BoxStream<'static, Result> { log_info!("[TieredObjectStore] list: prefix={:?}", prefix); - let local_fs = self.local_fs.clone(); let registry = self.registry.clone(); let prefix_owned = prefix.cloned(); - let stream = async_stream::stream! { let mut local_stream = local_fs.list(prefix_owned.as_ref()); while let Some(result) = local_stream.next().await { @@ -208,7 +323,6 @@ impl ObjectStore for TieredObjectStore { } registry.log_summary(); }; - Box::pin(stream) } @@ -224,3 +338,51 @@ impl ObjectStore for TieredObjectStore { self.local_fs.copy_if_not_exists(from, to).await } } + +// ── Private backing-store helpers (bypass cache) ────────────────────────────── + +impl TieredObjectStore { + async fn backing_get_range(&self, location: &ObjectPath, range: Range) -> Result { + let path_str = location.as_ref(); + let remote_info = self.registry.get_remote_path(path_str) + .and_then(|rp| self.registry.get_remote_store_for_file(path_str).map(|s| (rp, s))); + + if let Some((remote_path, remote_store)) = remote_info { + let count = self.remote_reads.fetch_add(1, Ordering::Relaxed) + 1; + log_info!( + "[TieredObjectStore] get_range #{} REMOTE: path={}, remote={}, range={}..{}", + count, path_str, remote_path, range.start, range.end + ); + remote_store.get_range(&ObjectPath::from(remote_path), range).await + } else { + let count = self.passthrough_reads.fetch_add(1, Ordering::Relaxed) + 1; + log_info!( + "[TieredObjectStore] get_range #{} LOCAL: path={}, range={}..{}", + count, path_str, range.start, range.end + ); + self.local_fs.get_range(location, range).await + } + } + + async fn backing_get_ranges(&self, location: &ObjectPath, ranges: &[Range]) -> Result> { + let path_str = location.as_ref(); + let remote_info = self.registry.get_remote_path(path_str) + .and_then(|rp| self.registry.get_remote_store_for_file(path_str).map(|s| (rp, s))); + + if let Some((remote_path, remote_store)) = remote_info { + let count = self.remote_reads.fetch_add(1, Ordering::Relaxed) + 1; + log_info!( + "[TieredObjectStore] get_ranges #{} REMOTE: path={}, remote={}, {} ranges", + count, path_str, remote_path, ranges.len() + ); + remote_store.get_ranges(&ObjectPath::from(remote_path), ranges).await + } else { + let count = self.passthrough_reads.fetch_add(1, Ordering::Relaxed) + 1; + log_info!( + "[TieredObjectStore] get_ranges #{} LOCAL: path={}, {} ranges", + count, path_str, ranges.len() + ); + self.local_fs.get_ranges(location, ranges).await + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java index d1be71c6b9267..6f1fac27c4933 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java @@ -29,6 +29,7 @@ import org.opensearch.datafusion.search.DatafusionReaderManager; import org.opensearch.datafusion.search.DatafusionSearcher; import org.opensearch.datafusion.search.cache.CacheSettings; +import org.opensearch.datafusion.search.cache.TieredCacheSettings; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; @@ -208,7 +209,8 @@ public List> getSettings() { settingList.add(DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION); settingList.addAll(Stream.of( CacheSettings.CACHE_SETTINGS, - CacheSettings.CACHE_ENABLED) + CacheSettings.CACHE_ENABLED, + TieredCacheSettings.TIERED_CACHE_SETTINGS) .flatMap(x -> x.stream()).collect(Collectors.toList())); return settingList; diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/DataFusionRuntimeEnv.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/DataFusionRuntimeEnv.java index ae0e9fb6f3910..3d5e44f0febc1 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/DataFusionRuntimeEnv.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/DataFusionRuntimeEnv.java @@ -14,9 +14,12 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.datafusion.jni.NativeBridge; +import org.opensearch.datafusion.jni.TieredStoreNativeBridgeImpl; import org.opensearch.datafusion.jni.handle.GlobalRuntimeHandle; import org.opensearch.datafusion.search.cache.CacheManager; import org.opensearch.datafusion.search.cache.CacheUtils; +import org.opensearch.vectorized.execution.jni.SharedNativeLibrary; +import org.opensearch.vectorized.execution.jni.TieredStoreNativeBridge; /** * DataFusion runtime environment manager. @@ -55,6 +58,15 @@ public DataFusionRuntimeEnv(ClusterService clusterService, String spill_dir) { long memoryLimit = clusterService.getClusterSettings().get(DATAFUSION_MEMORY_POOL_CONFIGURATION).getBytes(); long spillLimit = clusterService.getClusterSettings().get(DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION).getBytes(); long cacheManagerConfigPtr = CacheUtils.createCacheConfig(clusterService.getClusterSettings()); + + // Inject ClusterService into TieredStoreNativeBridgeImpl so it can read + // TieredCacheSettings when createTieredObjectStore() is called later. + TieredStoreNativeBridge bridge = SharedNativeLibrary.get( + TieredStoreNativeBridge.REGISTRY_KEY, TieredStoreNativeBridge.class); + if (bridge instanceof TieredStoreNativeBridgeImpl) { + ((TieredStoreNativeBridgeImpl) bridge).setClusterService(clusterService); + } + NativeBridge.initTokioRuntimeManager(Runtime.getRuntime().availableProcessors()); NativeBridge.startTokioRuntimeMonitoring(); // TODO : do we need this control in java ? this.runtimeHandle = new GlobalRuntimeHandle(memoryLimit, cacheManagerConfigPtr, spill_dir, spillLimit); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/TieredStoreNativeBridgeImpl.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/TieredStoreNativeBridgeImpl.java index 001d818b8f874..1be070414647d 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/TieredStoreNativeBridgeImpl.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/TieredStoreNativeBridgeImpl.java @@ -8,6 +8,10 @@ package org.opensearch.datafusion.jni; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.datafusion.search.cache.TieredCacheSettings; import org.opensearch.vectorized.execution.jni.TieredStoreNativeBridge; /** @@ -17,17 +21,74 @@ * classloader, which owns the {@code .so}. The tiered-storage module * calls through the {@link TieredStoreNativeBridge} interface, which * dispatches to these native methods. + *

+ * {@link #setClusterService(ClusterService)} must be called once after the + * static initialiser registers this instance, so that {@link #createTieredObjectStore()} + * can read the Foyer page-cache settings ({@link TieredCacheSettings}). */ public final class TieredStoreNativeBridgeImpl implements TieredStoreNativeBridge { + private static final Logger logger = LogManager.getLogger(TieredStoreNativeBridgeImpl.class); + + /** + * Set after construction by {@code DataFusionRuntimeEnv} once the + * {@code ClusterService} is available. Volatile for safe publication. + */ + private volatile ClusterService clusterService; + + /** + * Inject the {@link ClusterService} so that {@link #createTieredObjectStore()} + * can read {@link TieredCacheSettings} at store-creation time. + * Called once from {@code DataFusionRuntimeEnv} constructor. + */ + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + @Override public void initLogger() { nativeInitLogger(); } + /** + * Creates a {@code TieredObjectStore} with an optional Foyer disk page cache. + * Reads {@link TieredCacheSettings} from {@code ClusterSettings} internally + * and passes {@code diskCacheBytes} / {@code diskCacheDir} to the native layer. + * Callers in {@code modules/tiered-storage} see only the no-arg interface. + */ @Override public long[] createTieredObjectStore() { - return nativeCreateTieredObjectStore(); + long diskCacheBytes = 0L; + String diskCacheDir = ""; + + if (clusterService != null) { + try { + var settings = clusterService.getClusterSettings(); + boolean cacheEnabled = settings.get(TieredCacheSettings.PAGE_CACHE_ENABLED); + if (cacheEnabled) { + diskCacheBytes = settings.get(TieredCacheSettings.PAGE_CACHE_DISK_CAPACITY).getBytes(); + diskCacheDir = settings.get(TieredCacheSettings.PAGE_CACHE_DIR); + } + logger.info( + "[TieredStoreNativeBridgeImpl] createTieredObjectStore: " + + "cacheEnabled={}, diskBytes={}, diskDir={}", + cacheEnabled, diskCacheBytes, diskCacheDir + ); + } catch (Exception e) { + logger.warn( + "[TieredStoreNativeBridgeImpl] Failed to read TieredCacheSettings — " + + "page cache disabled: {}", + e.getMessage() + ); + } + } else { + logger.warn( + "[TieredStoreNativeBridgeImpl] ClusterService not set — " + + "page cache disabled. Call setClusterService() before createTieredObjectStore()." + ); + } + + return nativeCreateTieredObjectStore(diskCacheBytes, diskCacheDir); } @Override @@ -118,7 +179,8 @@ public int registryPendingDeleteCount(long registryPtr) { // --- Native method declarations (resolved from DataFusion's .so) --- private static native void nativeInitLogger(); - private static native long[] nativeCreateTieredObjectStore(); + /** Rust: nativeCreateTieredObjectStore(diskCacheBytes, diskCacheDir) — params read from ClusterSettings above */ + private static native long[] nativeCreateTieredObjectStore(long diskCacheBytes, String diskCacheDir); private static native void nativeAddRemoteStore(long registryPtr, String repoKey, String storeType, String configJson); private static native void nativeDestroyTieredObjectStore(long dataPtr, long vtablePtr); private static native void nativeDestroyFileRegistry(long registryPtr); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/TieredCacheSettings.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/TieredCacheSettings.java new file mode 100644 index 0000000000000..9945ba8eb271e --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/TieredCacheSettings.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search.cache; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; + +/** + * Settings for the Foyer disk-backed page cache that is embedded inside + * {@code TieredObjectStore}. + * + *

These settings are deliberately separated from {@link CacheSettings} + * (which owns Parquet metadata / statistics caches) because the page cache + * belongs to the tiered-storage read path, not to the DataFusion generic + * cache manager. + * + *

The values are read once at {@code TieredObjectStore} construction time + * and passed directly to the native + * {@code nativeCreateTieredObjectStore(diskCacheBytes, diskCacheDir)} JNI call. + * No Java-side cache handle is held. + */ +public final class TieredCacheSettings { + + private TieredCacheSettings() {} + + // ── Enabled flag ──────────────────────────────────────────────────────── + + public static final String PAGE_CACHE_ENABLED_KEY = "datafusion.tiered.page.cache.enabled"; + + /** + * Master switch for the Foyer page cache inside {@code TieredObjectStore}. + * When {@code false} the store performs all reads without caching. + */ + public static final Setting PAGE_CACHE_ENABLED = + Setting.boolSetting(PAGE_CACHE_ENABLED_KEY, true, + Setting.Property.NodeScope, Setting.Property.Dynamic); + + // ── Disk budget ────────────────────────────────────────────────────────── + + public static final String PAGE_CACHE_DISK_CAPACITY_KEY = "datafusion.tiered.page.cache.disk.capacity"; + + /** + * Total disk space allocated to the Foyer page cache (e.g. {@code 10gb}). + * Foyer manages eviction within this budget automatically. + */ + public static final Setting PAGE_CACHE_DISK_CAPACITY = + new Setting<>(PAGE_CACHE_DISK_CAPACITY_KEY, "10gb", + (s) -> ByteSizeValue.parseBytesSizeValue( + s, new ByteSizeValue(1, ByteSizeUnit.GB), PAGE_CACHE_DISK_CAPACITY_KEY), + Setting.Property.NodeScope, Setting.Property.Dynamic); + + // ── Disk directory ─────────────────────────────────────────────────────── + + public static final String PAGE_CACHE_DIR_KEY = "datafusion.tiered.page.cache.dir"; + + /** + * Local NVMe directory where Foyer stores its cache data files. + * The directory must be writable by the OpenSearch process. + */ + public static final Setting PAGE_CACHE_DIR = new Setting<>( + PAGE_CACHE_DIR_KEY, + "/tmp/foyer-page-cache", + Function.identity(), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + // ── Convenience list for Plugin.getSettings() ──────────────────────────── + + public static final List> TIERED_CACHE_SETTINGS = Arrays.asList( + PAGE_CACHE_ENABLED, + PAGE_CACHE_DISK_CAPACITY, + PAGE_CACHE_DIR + ); +}