Skip to content

Fix: use NativeCacheManagerHandle instead of raw cache manager pointer#21577

Open
HarishNarasimhanK wants to merge 1 commit intoopensearch-project:mainfrom
HarishNarasimhanK:main
Open

Fix: use NativeCacheManagerHandle instead of raw cache manager pointer#21577
HarishNarasimhanK wants to merge 1 commit intoopensearch-project:mainfrom
HarishNarasimhanK:main

Conversation

@HarishNarasimhanK
Copy link
Copy Markdown
Contributor

@HarishNarasimhanK HarishNarasimhanK commented May 9, 2026

Description

Follow-up to #21225. Addresses review comments.

Changes

  • Combined memory_tracker + total_memory into single MemoryState mutex in statistics_cache.rs
  • Reuse RuntimeManager.io_runtime handle instead of Runtime::new() per file in metadata_cache_put
  • Validate eviction type setting to only accept LRU/LFU
  • Consolidate CACHE_SETTINGS into single list
  • Make CacheManager.runtimeHandle private final
  • Wrap cacheManagerPtr in NativeCacheManagerHandle (extends ConsumableNativeHandle)
  • Fix missing cache_manager_ptr argument in local_exec_test.rs

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Reviewer Guide 🔍

(Review updated until commit a8a33af)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Resource Leak

If NativeBridge.createGlobalRuntime throws an exception, rootAllocator is never initialized but doStop() may later attempt to close it, causing a NullPointerException. The allocator should be created before the runtime or properly guarded in cleanup.

this.rootAllocator = new RootAllocator(memoryPoolLimit);
Possible Deadlock

In put(), memory_state.lock() is acquired at line 357, then policy.lock() at line 366. In evict_entries(), policy.lock() is acquired at line 252, then memory_state.lock() at line 260. If two threads call these methods concurrently, they can deadlock by acquiring locks in opposite order.

let current_size = self.memory_state.lock()
    .map(|s| s.total)
    .unwrap_or(0);

let eviction_candidates = {
    let size_limit = self.size_limit.load(Ordering::Relaxed);
    let threshold = (size_limit as f64 * self.eviction_threshold) as usize;
    if current_size + memory_size > threshold {
        let target_eviction = (current_size + memory_size) - (size_limit as f64 * 0.6) as usize;
        if let Ok(policy_guard) = self.policy.lock() {

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Code Suggestions ✨

Latest suggestions up to a8a33af

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate runtime pointer before marking consumed

The exception handling should verify that createGlobalRuntime succeeded before
marking the handle as consumed. If createGlobalRuntime returns 0 or throws an
exception, the cache handle should be cleaned up without marking it consumed, as
ownership was never transferred.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java [84-95]

 try {
     long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
+    if (ptr == 0) {
+        throw new IllegalStateException("Failed to create global runtime");
+    }
     if (cacheHandle != null) {
         cacheHandle.markConsumed();
     }
     this.runtimeHandle = new NativeRuntimeHandle(ptr);
 } catch (Exception e) {
     if (cacheHandle != null) {
         cacheHandle.close();
     }
     throw e;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that validating the runtime pointer before marking the handle as consumed would improve robustness. However, the existing exception handling already ensures cleanup occurs if createGlobalRuntime fails, so this is a defensive improvement rather than fixing a critical bug.

Medium
General
Propagate lock errors in cache insertion

Lock acquisition failures should be handled explicitly rather than defaulting to 0.
A poisoned lock indicates corrupted state, and proceeding with a default value could
allow cache insertions that violate memory limits or cause incorrect eviction
decisions.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [357-359]

 let current_size = self.memory_state.lock()
-    .map(|s| s.total)
-    .unwrap_or(0);
+    .map_err(|e| CacheError::PolicyLockError {
+        reason: format!("Failed to acquire memory_state lock: {}", e),
+    })?
+    .total;
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that lock poisoning should be handled more explicitly. However, the put method signature doesn't return a Result, so propagating the error with ? would require changing the method signature, which the suggestion doesn't address. The impact is moderate as it improves error handling consistency.

Low

Previous suggestions

Suggestions up to commit e346378
CategorySuggestion                                                                                                                                    Impact
General
Prevent potential runtime deadlock

Using block_on on a runtime handle can cause deadlock if called from within an async
context on the same runtime. Consider verifying the calling context or using
spawn_blocking to avoid potential blocking issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/custom_cache_manager.rs [374-380]

-fn metadata_cache_put(&self, file_path: &str, rt_handle: &tokio::runtime::Handle) -> Result<bool, String> {
-    if !file_path.to_lowercase().ends_with(".parquet") {
-        return Ok(false);
-    }
-    ...
-    let _parquet_metadata = rt_handle.block_on(async {
+let _parquet_metadata = tokio::task::block_in_place(|| {
+    rt_handle.block_on(async {
         let df_metadata = DFParquetMetadata::new(store.as_ref(), object_meta)
             .with_file_metadata_cache(Some(metadata_cache));
 
         df_metadata.fetch_metadata().await
             .map_err(|e| format!("Failed to fetch metadata: {}", e))
-    })?;
+    })
+})?;
Suggestion importance[1-10]: 7

__

Why: The suggestion to use block_in_place is important for preventing potential deadlocks when block_on is called from an async context. This addresses a real concurrency issue that could cause runtime hangs, though the severity depends on the calling context which isn't fully visible in the diff.

Medium
Handle poisoned mutex explicitly

The lock acquisition should use map_err or expect to handle potential poisoned mutex
errors explicitly. Silent fallback to 0 could mask critical synchronization failures
that should be surfaced.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [319-322]

 let memory_size = {
-    let state = self.memory_state.lock();
-    state.map(|s| s.tracker.get(&key).copied().unwrap_or(0)).unwrap_or(0)
+    let state = self.memory_state.lock()
+        .expect("memory_state lock poisoned");
+    state.tracker.get(&key).copied().unwrap_or(0)
 };
Suggestion importance[1-10]: 5

__

Why: The suggestion to use expect instead of silently falling back to 0 is valid for better error visibility. However, poisoned mutexes are rare in practice and the current fallback behavior prevents crashes. The improvement is moderate as it aids debugging but doesn't fix a functional issue.

Low
Check consumption state before cleanup

The cacheHandle.markConsumed() call should occur before assigning this.runtimeHandle
to prevent a potential race condition. If an exception occurs during
NativeRuntimeHandle construction, the cache handle will be marked as consumed but
the runtime handle won't be set, leading to inconsistent state.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java [84-95]

 try {
     long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
     if (cacheHandle != null) {
         cacheHandle.markConsumed();
     }
     this.runtimeHandle = new NativeRuntimeHandle(ptr);
 } catch (Exception e) {
-    if (cacheHandle != null) {
+    if (cacheHandle != null && !cacheHandle.isConsumed()) {
         cacheHandle.close();
     }
     throw e;
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to check isConsumed() before cleanup is reasonable but has limited impact. The current code already handles the case correctly by closing the handle only if an exception occurs before markConsumed() is called. The suggested check adds defensive programming but doesn't fix a critical bug.

Low
Suggestions up to commit b516d60
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential double-free of cache handle

The cacheHandle.close() call in the catch block may attempt to free a pointer that
was already transferred to the native runtime if createGlobalRuntime succeeded but a
subsequent operation failed. Consider checking if the runtime was successfully
created before closing the cache handle to avoid double-free issues.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java [84-95]

+long ptr = 0;
 try {
-    long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
+    ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
     if (cacheHandle != null) {
         cacheHandle.markConsumed();
     }
     this.runtimeHandle = new NativeRuntimeHandle(ptr);
 } catch (Exception e) {
-    if (cacheHandle != null) {
+    if (cacheHandle != null && ptr == 0) {
         cacheHandle.close();
     }
     throw e;
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a critical issue where cacheHandle.close() could be called after ownership has been transferred to the native runtime via createGlobalRuntime, potentially causing a double-free. The proposed fix of checking ptr == 0 before closing ensures the handle is only freed if runtime creation failed.

Medium
Prevent unbounded cache growth on lock failure

If the memory_state lock is poisoned, this returns 0, which could allow unbounded
cache growth since the eviction logic would think the cache is empty. This could
lead to memory exhaustion.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [357-359]

 let current_size = self.memory_state.lock()
     .map(|s| s.total)
-    .unwrap_or(0);
+    .map_err(|e| {
+        // Consider propagating error or using last known size
+        self.size_limit.load(Ordering::Relaxed)
+    })
+    .unwrap_or_else(|fallback| fallback);
Suggestion importance[1-10]: 6

__

Why: The suggestion identifies a valid concern about poisoned locks leading to incorrect memory tracking. However, the proposed solution of using size_limit as a fallback is questionable - it would trigger aggressive eviction even when the cache might be nearly empty. A better approach would be to propagate the error or use a more conservative fallback.

Low
General
Handle poisoned lock in memory tracking

The lock acquisition result is not properly handled with ? or explicit error
handling. If the lock is poisoned, this silently returns 0, which could mask
concurrency issues and lead to incorrect memory tracking.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [319-322]

 let memory_size = {
-    let state = self.memory_state.lock();
-    state.map(|s| s.tracker.get(&key).copied().unwrap_or(0)).unwrap_or(0)
+    let state = self.memory_state.lock().map_err(|e| {
+        // Log the error appropriately
+        0
+    }).unwrap_or(0);
+    state.tracker.get(&key).copied().unwrap_or(0)
 };
Suggestion importance[1-10]: 3

__

Why: While the suggestion addresses lock poisoning, the proposed improved_code is syntactically incorrect and doesn't properly handle the error. The current code's approach of returning 0 on lock failure is a reasonable fallback for memory size queries, though better error logging could be beneficial.

Low
Suggestions up to commit fc4bab5
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential double-free on exception

The cacheHandle.close() call in the catch block may cause a double-free if
createGlobalRuntime partially succeeds but throws an exception after consuming the
pointer. Consider checking if the handle was already consumed before closing it in
the exception handler.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java [84-95]

 try {
     long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
     if (cacheHandle != null) {
         cacheHandle.markConsumed();
     }
     this.runtimeHandle = new NativeRuntimeHandle(ptr);
 } catch (Exception e) {
-    if (cacheHandle != null) {
+    if (cacheHandle != null && !cacheHandle.isConsumed()) {
         cacheHandle.close();
     }
     throw e;
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential double-free issue. If createGlobalRuntime consumes the pointer but then throws an exception, the catch block would attempt to close an already-consumed handle. The suggested check for isConsumed() prevents this critical resource management bug.

Medium
Fail fast on poisoned lock

If the memory_state lock is poisoned, this returns 0, potentially allowing unbounded
cache growth. This could lead to memory exhaustion as the eviction logic would never
trigger when it should.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [357-359]

 let current_size = self.memory_state.lock()
     .map(|s| s.total)
-    .unwrap_or(0);
+    .expect("memory_state lock poisoned - cache integrity compromised");
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid concern about poisoned locks potentially allowing unbounded cache growth. However, using expect may be too aggressive for production code. A better approach would be to return an error from the put method rather than panicking, allowing the caller to handle the failure gracefully.

Low
General
Handle poisoned lock explicitly

The lock acquisition result is not properly handled with ? or explicit error
handling. If the lock is poisoned, this silently returns 0, which could mask
critical synchronization issues and lead to incorrect memory tracking.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [319-322]

 let memory_size = {
-    let state = self.memory_state.lock();
-    state.map(|s| s.tracker.get(&key).copied().unwrap_or(0)).unwrap_or(0)
+    let state = self.memory_state.lock().map_err(|e| {
+        // Log the error appropriately
+        0
+    }).unwrap_or(0);
+    state.tracker.get(&key).copied().unwrap_or(0)
 };
Suggestion importance[1-10]: 3

__

Why: While the suggestion addresses lock poisoning, the proposed code is incorrect (it doesn't compile as written). The current implementation's silent fallback to 0 is a reasonable defensive approach for a cache hit scenario, though explicit logging would improve observability.

Low
Suggestions up to commit c160cc8
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent race condition in eviction logic

The memory_state lock is acquired and immediately released before calculating
eviction candidates. This creates a race condition where total could change between
reading it and selecting eviction candidates. Hold the lock throughout the eviction
decision logic to ensure consistency.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [357-370]

-let current_size = self.memory_state.lock()
-    .map(|s| s.total)
-    .unwrap_or(0);
-
 let eviction_candidates = {
+    let state = self.memory_state.lock().ok();
+    let current_size = state.as_ref().map(|s| s.total).unwrap_or(0);
     let size_limit = self.size_limit.load(Ordering::Relaxed);
-    ...
+    let threshold = (size_limit as f64 * self.eviction_threshold) as usize;
+    if current_size + memory_size > threshold {
+        ...
+    } else { vec![] }
 };
Suggestion importance[1-10]: 7

__

Why: Valid concern about the race condition between reading total and selecting eviction candidates. Holding the memory_state lock throughout the eviction decision would ensure consistency, though it increases lock contention. This is a meaningful correctness improvement.

Medium
Mark handle consumed immediately after extraction

The cacheHandle.getPointer() call occurs before markConsumed(), creating a window
where the handle could be closed prematurely if an exception occurs between these
calls. Move the markConsumed() call immediately after getPointer() to ensure the
handle remains valid until the runtime takes ownership.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java [77-87]

 long cacheManagerPtr = 0L;
 NativeCacheManagerHandle cacheHandle = null;
 if (clusterSettings != null) {
     cacheHandle = CacheUtils.createCacheConfig(clusterSettings);
     cacheManagerPtr = cacheHandle.getPointer();
+    cacheHandle.markConsumed();
 }
 
 long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, cacheManagerPtr, spillDirectory, spillMemoryLimit);
-if (cacheHandle != null) {
-    cacheHandle.markConsumed();
-}
Suggestion importance[1-10]: 3

__

Why: While moving markConsumed() immediately after getPointer() could reduce the window for exceptions, the current code already handles cleanup properly with the null check. The improvement is marginal since createGlobalRuntime is the ownership transfer point.

Low
General
Handle lock result properly

The lock() call returns a LockResult which should be handled with map_err or ? to
properly propagate lock poisoning errors. Using map().unwrap_or(0) silently ignores
lock failures, which could mask critical synchronization issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/statistics_cache.rs [319-322]

 let memory_size = {
-    let state = self.memory_state.lock();
-    state.map(|s| s.tracker.get(&key).copied().unwrap_or(0)).unwrap_or(0)
+    let state = self.memory_state.lock().ok();
+    state.and_then(|s| s.tracker.get(&key).copied()).unwrap_or(0)
 };
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that lock poisoning should be handled more explicitly. Using .ok() and and_then() is cleaner than chaining map().unwrap_or(), though the functional outcome is similar since both return 0 on failure.

Low

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Persistent review updated to latest commit fc4bab5

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for fc4bab5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Persistent review updated to latest commit b516d60

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for b516d60: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@HarishNarasimhanK HarishNarasimhanK marked this pull request as ready for review May 9, 2026 16:47
@HarishNarasimhanK HarishNarasimhanK requested a review from a team as a code owner May 9, 2026 16:47
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Persistent review updated to latest commit e346378

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

✅ Gradle check result for e346378: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 9, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.53%. Comparing base (36809cc) to head (a8a33af).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21577      +/-   ##
============================================
+ Coverage     73.50%   73.53%   +0.03%     
+ Complexity    74644    74640       -4     
============================================
  Files          5980     5980              
  Lines        338777   338777              
  Branches      48848    48848              
============================================
+ Hits         249011   249114     +103     
+ Misses        69946    69812     -134     
- Partials      19820    19851      +31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Harish Narasimhan <hxarishk@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a8a33af

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for a8a33af: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant