Skip to content

Sort the documents according to the indexSort during the refresh time#21468

Open
chaitanya588 wants to merge 2 commits into
opensearch-project:mainfrom
chaitanya588:flush_with_sorting
Open

Sort the documents according to the indexSort during the refresh time#21468
chaitanya588 wants to merge 2 commits into
opensearch-project:mainfrom
chaitanya588:flush_with_sorting

Conversation

@chaitanya588
Copy link
Copy Markdown
Contributor

Description

Description

[Describe what this change achieves]

  1. Server layer (interfaces & contracts)
  • Writer.flush() → Writer.flush(FlushInput) — the flush signature now accepts a parameter object so context can flow from the orchestrator to any writer without changing the interface again in the future.
  • FlushInput — new record with an optional sortPermutation field (long[][] where [0] = old row IDs, [1] = new row IDs) and a FlushInput.EMPTY constant for callers that have no context to pass.
  • FileInfos — added a sortPermutation field so the primary writer's flush result can carry the mapping upward to the orchestrator. Existing callers using FileInfos.empty() or the builder are unaffected since the field defaults to null.
  • DataFormatAwareEngine — updated to pass FlushInput.EMPTY when flushing writers.
  1. Rust layer (sort permutation capture)
  • writer.rs — rewrite_row_ids now captures the old→new row ID mapping before overwriting ___row_id with sequential values. The mapping is stored in a new SORT_PERMUTATION static DashMap keyed by output filename.
  • ffm.rs — new parquet_get_sort_permutation FFM export that returns the cached permutation via out-pointers (count_out, old_row_ids_out, new_row_ids_out) and removes it from the cache (single-use retrieval).
  1. Parquet Java layer (sort permutation retrieval)
  • RustBridge — added GET_SORT_PERMUTATION method handle and getSortPermutationWithSize() that allocates buffers based on the known row count and reads the permutation from Rust.
  • NativeParquetWriter — after finalizeWriter, retrieves the sort permutation via RustBridge.getSortPermutationWithSize() and exposes it through getSortPermutation().
  • VSRManager — delegates getSortPermutation() to the underlying NativeParquetWriter.
  • ParquetWriter.flush(FlushInput) — ignores the FlushInput (Parquet is the producer, not consumer of the permutation) and populates FileInfos.sortPermutation from vsrManager.getSortPermutation().
  1. Composite engine layer (orchestration)
  • CompositeWriter.flush(FlushInput) — flushes primary (Parquet) first, extracts sortPermutation from the returned FileInfos, then builds a new FlushInput(sortPermutation) and passes it uniformly to every secondary writer. No instanceof checks, no special-casing.
  1. Lucene engine layer (sorted write)
  • LuceneWriter.flush(FlushInput) — checks flushInput.hasSortPermutation() and delegates to either flushUnsorted() (original path: force merge, commit, return file infos) or flushSorted() (new path: force merge the unsorted segment, open a DirectoryReader, wrap each CodecReader with RowIdRemappingCodecReader, create a new IndexWriter with a custom RowIdMappingSortField as the index sort, call addIndexes(CodecReader...) which triggers Lucene's IndexSort to reorder documents by the remapped ___row_id values, return the sorted segment's file infos).

Note: All the Inner classes RowIdMappingSortField, RemappedNumericDocValues, RowIdRemappingCodecReader, RowIdRemappingDocValuesProducer — will get added with all the testing details as part of a follow up LuceneMerge PR.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit c933031.

PathLineSeverityDescription
sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java281mediumUnchecked raw pointer dereference via MemorySegment.ofAddress(permAddr) with size derived entirely from untrusted Rust output. If the native code returns a crafted address or length, this could cause out-of-bounds memory reads or JVM crashes. The finally block does free the memory, but there is no bounds validation on permLen before the reinterpret call.
sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java267mediumrewriteSegmentInfoWithSort manually deletes and rewrites Lucene .si segment metadata files outside of the IndexWriter lifecycle after the writer is already closed. This bypasses Lucene's internal integrity checks (checksums, atomic rename) and could silently produce a corrupt or undetectable-as-modified index if combined with a tampered permutation.
sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs544mediumrewrite_row_ids emits multiple log_info! calls tagged [RUST] that print full schema column names, batch sizes, and whether the sensitive __row_id__ column is present. Verbose schema disclosure in production logs could aid an attacker with log access in mapping internal data layout. This is unusual for production code and appears to be debug instrumentation left in deliberately.
sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriterCodec.java112lowAn unused field nextDocId = 0 is declared in the anonymous DocValuesConsumer but never incremented or used. While benign here, leaving dead tracking state in a security-sensitive codec intercept class (which rewrites row ID values) is an anomaly worth noting.
server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java125lowTest still uses the old row ID field name '_row_id' on line 125 while the rest of the diff migrates to '__row_id__'. Inconsistency in test coverage around a field whose values are silently rewritten during sort-on-flush may mask incorrect behavior in the permutation path.

The table above displays the top 10 most important findings.

Total: 5 | Critical: 0 | High: 0 | Medium: 3 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@chaitanya588 chaitanya588 reopened this May 7, 2026
@chaitanya588 chaitanya588 force-pushed the flush_with_sorting branch 4 times, most recently from 29ca47a to 6a19587 Compare May 8, 2026 14:12
*/
default int newToOld(int newDocId) {
return newDocId;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a single signature only?
1We can convert RowIdMapping in merge flow to Map<Generation(Long), RowIdMapping>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, lets not implement these methods in the interface. We should implement these in each class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the interface to meet merge reordered use case (i.e including both newToOld and oldToNew). Also, changed the the RowMapping to capture the mapping details only for one generation.

@chaitanya588 chaitanya588 force-pushed the flush_with_sorting branch from 6a19587 to 384f605 Compare May 9, 2026 12:18
*/
default int newToOld(int newDocId) {
return newDocId;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, lets not implement these methods in the interface. We should implement these in each class.

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unused

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Force merge to exactly 1 segment to maintain 1:1 mapping with other formats.
// If sort permutation is provided, configure the reorder merge policy
if (flushInput.hasRowIdMapping()) {
configureSortedMerge(flushInput.rowIdMapping());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that there are multiple segments before this step [as lucene may internally decide to flush]. We should ensure that there is only segment before this is kicked in? Or would reorder take care of the same?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current reorder logic is going to take care a cases where more than 1 segment is present within this flow. I added a test case to cover this (testSortedFlushWithMultipleInternalSegments).

if (rewriteRowIds == false) {
return delegate.docValuesFormat();
}
DocValuesFormat delegateFormat = delegate.docValuesFormat();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into a separate class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the changes.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

PR Code Suggestions ✨

Latest suggestions up to 3f08261

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate row ID bounds strictly

The bounds check old_row_id < num_rows silently skips invalid row IDs, which could
hide data corruption or logic errors. If an old_row_id is out of bounds, it
indicates a serious problem that should be reported rather than ignored. Consider
returning an error or logging a warning when this condition occurs.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [574-579]

 for new_pos in 0..num_rows {
     let old_row_id = old_row_ids.value(new_pos) as usize;
-    if old_row_id < num_rows {
-        mapping[old_row_id] = new_pos as i64;
+    if old_row_id >= num_rows {
+        return Err(format!("Invalid row ID {} at position {} (num_rows={})", old_row_id, new_pos, num_rows).into());
     }
+    mapping[old_row_id] = new_pos as i64;
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that silently skipping out-of-bounds row IDs could hide data corruption. Returning an error when old_row_id >= num_rows would make the code more robust and help catch bugs early. This is a valuable improvement for data integrity.

Medium
Verify IndexSort matches expected sort

The condition checks if IndexSort is null before rewriting, but this may not
correctly handle cases where the segment already has a different IndexSort. If the
segment has an IndexSort that doesn't match the expected sort, this code will skip
rewriting, potentially causing inconsistencies. Consider verifying that the existing
IndexSort matches the expected sort before skipping the rewrite.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [218-220]

-if (segmentInfo.info.getIndexSort() == null) {
+Sort expectedSort = new Sort(new SortedNumericSortField(LuceneDocumentInput.ROW_ID_FIELD, SortField.Type.LONG));
+if (segmentInfo.info.getIndexSort() == null || !segmentInfo.info.getIndexSort().equals(expectedSort)) {
     rewriteSegmentInfoWithSort(segmentInfos, segmentInfo);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that checking only for null IndexSort may not handle cases where a segment has a different IndexSort. However, the current code path is designed for child writers that operate without IndexSort, so the null check is appropriate. The suggestion adds defensive validation that could prevent subtle bugs if the code is modified in the future.

Medium
Verify element type matches cast

*The cast as mut i64 assumes the permutation vector contains i64 elements, but this
may not match the actual element type. Verify that row_id_mapping is indeed Vec to
prevent undefined behavior from type mismatches.

sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs [152-156]

 if let Some(perm) = result.row_id_mapping {
     let len = perm.len();
-    let boxed = perm.into_boxed_slice();
+    let boxed: Box<[i64]> = perm.into_boxed_slice();
     *sort_perm_len_out = len as i64;
     *sort_perm_ptr_out = Box::into_raw(boxed) as *mut i64 as i64;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential type safety issue where the cast to *mut i64 should match the actual element type of row_id_mapping. Adding an explicit type annotation helps ensure type correctness and prevents undefined behavior.

Medium
General
Return consistent error value

Returning the input newId when it's out of bounds is inconsistent with getNewRowId
which returns -1 for invalid inputs. This inconsistency could lead to subtle bugs
where callers don't realize they received an invalid mapping. Consider returning -1
or throwing an exception for out-of-bounds access to maintain consistency.

server/src/main/java/org/opensearch/index/engine/dataformat/PackedRowIdMapping.java [94-103]

 public long newToOld(long newId) {
     if (newToOld == null) {
         throw new UnsupportedOperationException("Reverse mapping (newToOld) is not supported for this instance");
     }
     int idx = (int) newId;
     if (idx < 0 || idx >= size) {
-        return newId;
+        return -1L;
     }
     return newToOld.get(idx);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out an inconsistency: getNewRowId returns -1 for out-of-bounds inputs, while newToOld returns the input value. Returning -1 for consistency would make the API more predictable and reduce the risk of bugs where callers don't realize they received an invalid mapping.

Medium
Add validation for pointer safety

The function silently ignores invalid inputs but doesn't validate that mapping_ptr
and mapping_len match an actual allocation. Consider adding debug assertions or
documentation to clarify caller responsibilities for passing valid pointers.

sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs [508-515]

 pub unsafe extern "C" fn parquet_free_row_id_mapping(
     mapping_ptr: i64,
     mapping_len: i64,
 ) {
+    debug_assert!(mapping_ptr != 0 && mapping_len > 0, "Invalid mapping pointer or length");
     if mapping_ptr != 0 && mapping_len > 0 {
         let _ = Box::from_raw(slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize));
     }
 }
Suggestion importance[1-10]: 5

__

Why: Adding debug_assert! provides useful validation during development, but the function already checks for invalid inputs before proceeding. The suggestion improves debugging capabilities without changing runtime behavior in release builds.

Low

Previous suggestions

Suggestions up to commit 3f08261
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix memory deallocation for boxed slice

*The reconstruction of the Box from raw parts is incorrect. You should reconstruct a
Box<[i64]> directly from the fat pointer, not from a slice reference. Use
Box::from_raw(slice::from_raw_parts_mut(...) as mut [i64]) to properly deallocate
the memory.

sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs [512-514]

 pub unsafe extern "C" fn parquet_free_row_id_mapping(
     mapping_ptr: i64,
     mapping_len: i64,
 ) {
     if mapping_ptr != 0 && mapping_len > 0 {
-        let _ = Box::from_raw(slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize));
+        let slice_ptr = slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize) as *mut [i64];
+        let _ = Box::from_raw(slice_ptr);
     }
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical memory safety issue. The current code incorrectly reconstructs a Box from a slice reference rather than from the fat pointer itself. The suggested fix properly reconstructs Box<[i64]> from the raw pointer, which is essential for correct memory deallocation and preventing undefined behavior.

High
Use atomic file operations for metadata

Deleting the segment info file before rewriting creates a window where the segment
metadata is missing. If the write fails after deletion, the segment becomes
unreadable. Use atomic file operations or write to a temporary file first, then
rename.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [265-292]

 private void rewriteSegmentInfoWithSort(SegmentInfos segmentInfos, SegmentCommitInfo segmentCommitInfo) throws IOException {
-    ...
+    SegmentInfo originalInfo = segmentCommitInfo.info;
+    Sort sort = new Sort(new SortedNumericSortField(LuceneDocumentInput.ROW_ID_FIELD, SortField.Type.LONG));
+
+    SegmentInfo sortedInfo = new SegmentInfo(...);
+    sortedInfo.setFiles(originalInfo.files());
+
+    String siFileName = originalInfo.name + ".si";
+    String tempFileName = siFileName + ".tmp";
+    
+    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
     directory.deleteFile(siFileName);
-    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
+    directory.rename(tempFileName, siFileName);
+    
     ...
 }
Suggestion importance[1-10]: 8

__

Why: Critical observation about the non-atomic file operation at line 289. Deleting the .si file before writing creates a window where segment metadata is missing. If the write fails, the segment becomes unreadable. The suggestion to use atomic operations (write to temp, then rename) is a standard pattern for preventing data loss. This is a significant reliability issue.

Medium
Validate row ID bounds in mapping

The permutation mapping construction silently ignores out-of-bounds row IDs without
logging or error handling. This can lead to incorrect mappings when row IDs are not
sequential or contain duplicates. Add validation to ensure all row IDs are within
bounds and unique.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [574-580]

 for new_pos in 0..num_rows {
     let old_row_id = old_row_ids.value(new_pos) as usize;
-    if old_row_id < num_rows {
-        mapping[old_row_id] = new_pos as i64;
+    if old_row_id >= num_rows {
+        return Err(format!("Invalid row ID {} at position {} (num_rows={})", old_row_id, new_pos, num_rows).into());
     }
+    mapping[old_row_id] = new_pos as i64;
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern about silently ignoring out-of-bounds row IDs. The current code only checks if old_row_id < num_rows but doesn't handle the error case. Adding explicit error handling would prevent silent data corruption when row IDs are malformed. However, this is error handling rather than a critical bug, so the score is moderate.

Medium
Verify pointer type casting safety

**The cast from mut [i64] to mut i64 may be unsafe if the slice element type doesn't
match the expected pointer type. Verify that perm contains i64 elements, or use
as_mut_ptr() on the boxed slice before converting to raw pointer to ensure proper
pointer arithmetic.

sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs [152-156]

 if let Some(perm) = result.row_id_mapping {
     let len = perm.len();
     let boxed = perm.into_boxed_slice();
     *sort_perm_len_out = len as i64;
-    *sort_perm_ptr_out = Box::into_raw(boxed) as *mut i64 as i64;
+    let ptr = Box::into_raw(boxed) as *mut i64;
+    *sort_perm_ptr_out = ptr as i64;
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion adds an intermediate variable for clarity but doesn't fundamentally change the pointer casting logic. The original code's casting from Box<[i64]> to *mut i64 to i64 is already correct for FFI purposes, and the suggested change provides minimal improvement.

Low
General
Validate mapping presence for generation

When rowIdMappings is null or the generation is not found, the code passes null
mapping to the codec reader. This causes sequential row ID assignment using
nextRowIdOffset, which may not match the expected behavior when merging segments
with existing row IDs. Verify this is intentional or add explicit handling.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/RowIdRemappingOneMerge.java [47-55]

 public CodecReader wrapForMerge(CodecReader reader) throws IOException {
     CodecReader wrapped = super.wrapForMerge(reader);
     long generation = resolveGeneration(wrapped);
-    RowIdMapping mapping = rowIdMappings != null ? rowIdMappings.get(generation) : null;
+    RowIdMapping mapping = null;
+    if (rowIdMappings != null) {
+        mapping = rowIdMappings.get(generation);
+        if (mapping == null) {
+            throw new IllegalStateException("No row ID mapping found for generation " + generation);
+        }
+    }
     int offset = nextRowIdOffset;
     nextRowIdOffset += wrapped.maxDoc();
     return new RowIdRemappingCodecReader(wrapped, mapping, offset);
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid question about the null mapping behavior. However, the code at line 54 shows that null mapping is intentionally supported (sequential assignment via offset). The suggestion to throw an exception would break this fallback behavior. The concern is valid but the proposed fix is incorrect, as it would reject legitimate cases where sequential assignment is appropriate.

Low
Suggestions up to commit 9294e20
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate slice bounds before access

The slice constructor does not validate that offset + length does not exceed
sourceArray.length, which could cause an ArrayIndexOutOfBoundsException. Add bounds
checking to ensure the slice parameters are valid before accessing the array.

server/src/main/java/org/opensearch/index/engine/dataformat/PackedRowIdMapping.java [55-64]

 public PackedRowIdMapping(long[] sourceArray, int offset, int length, boolean reverseSupported) {
     Objects.requireNonNull(sourceArray, "sourceArray cannot be null");
+    if (offset < 0 || length < 0 || offset + length > sourceArray.length) {
+        throw new IllegalArgumentException("Invalid slice parameters: offset=" + offset + ", length=" + length + ", arrayLength=" + sourceArray.length);
+    }
     this.size = length;
     this.reverseSupported = reverseSupported;
 
     PackedLongValues.Builder forwardBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
     for (int i = offset; i < offset + length; i++) {
         forwardBuilder.add(sourceArray[i]);
     }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a missing bounds check that could lead to an ArrayIndexOutOfBoundsException. Adding validation for offset and length before accessing the array is a critical improvement to prevent runtime errors. The improved code is accurate and directly addresses the issue.

Medium
Validate row ID mapping reconstruction

The permutation reconstruction logic in sort_large_file may produce incorrect
mappings when original_row_id values are not sequential or when chunks contain
duplicate row IDs. The code assumes original_row_id is a valid index into
flat_mapping, but this may not hold if row IDs are non-sequential. Consider
validating that original_row_id values are within bounds and unique, or use a
HashMap to map original row IDs to their new positions.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [462-477]

+use std::collections::HashMap;
+
 let mut flat_mapping = vec![0i64; total_rows];
-for i in 0..total_rows {
-    flat_mapping[i] = i as i64;
-}
+let mut row_id_to_new_pos: HashMap<i64, i64> = HashMap::new();
 
 let mut pos = 0usize;
 for chunk_ids in &chunk_row_ids {
     for &original_row_id in chunk_ids {
-        let orig_idx = original_row_id as usize;
-        if orig_idx < total_rows && pos < total_rows {
-            flat_mapping[orig_idx] = merge_output.mapping[pos];
+        if pos < total_rows {
+            row_id_to_new_pos.insert(original_row_id, merge_output.mapping[pos]);
         }
         pos += 1;
     }
 }
 
+for i in 0..total_rows {
+    flat_mapping[i] = *row_id_to_new_pos.get(&(i as i64)).unwrap_or(&(i as i64));
+}
+
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential issue where original_row_id values may not be sequential or unique, which could lead to incorrect mappings. Using a HashMap is a valid approach to handle non-sequential row IDs. However, the current code does include bounds checking (if orig_idx < total_rows), which mitigates some risk. The suggestion improves robustness but is not critical if row IDs are guaranteed to be sequential.

Medium
Prevent segment metadata corruption window

Deleting the .si file before rewriting it creates a window where the segment
metadata is missing. If the process crashes or the write fails after deletion, the
segment becomes unreadable. Consider writing to a temporary file first, then
atomically renaming it to replace the original, or verify that Lucene's write method
handles this atomically.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [286-292]

-private void rewriteSegmentInfoWithSort(SegmentInfos segmentInfos, SegmentCommitInfo segmentCommitInfo) throws IOException {
-    ...
-    String siFileName = originalInfo.name + ".si";
-    directory.deleteFile(siFileName);
+String siFileName = originalInfo.name + ".si";
+String tempSiFileName = siFileName + ".tmp";
 
-    // Rewrite the .si file with sort metadata
-    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
+// Write to temporary file first
+originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
 
+// Atomically replace the original file
+directory.deleteFile(siFileName);
+directory.rename(tempSiFileName, siFileName);
+
Suggestion importance[1-10]: 6

__

Why: The suggestion raises a valid concern about the window where the .si file is deleted before being rewritten. However, the improved code is incorrect because Lucene's write method does not write to a temporary file by default, and the suggested rename operation is not part of the Lucene API. A better approach would be to verify if Lucene's write method handles atomicity internally or to use a try-finally block to ensure cleanup on failure.

Low
Ensure writer flush before reading

Closing the IndexWriter before reading SegmentInfos may cause a race condition if
the writer's background threads have not finished flushing all data to disk. This
could result in reading incomplete or stale segment metadata. Consider calling
indexWriter.commit() and ensuring all pending operations complete before closing, or
verify that close() guarantees all data is flushed.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [199-209]

+indexWriter.commit();
 indexWriter.close();
 
 // Verify the invariant: exactly 1 segment with docCount documents
 SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory);
Suggestion importance[1-10]: 3

__

Why: The suggestion is incorrect. The code already calls indexWriter.commit() at line 200 before closing the writer at line 205. The commit() call ensures all data is flushed to disk before reading SegmentInfos. Adding another commit() call after close() would be redundant and incorrect, as the writer is already closed.

Low
Suggestions up to commit ef53768
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add bounds validation for array slice

Missing bounds validation for offset and length parameters can cause
ArrayIndexOutOfBoundsException. Validate that offset >= 0, length >= 0, and offset +
length <= sourceArray.length before accessing the array.

server/src/main/java/org/opensearch/index/engine/dataformat/PackedRowIdMapping.java [55-63]

 public PackedRowIdMapping(long[] sourceArray, int offset, int length, boolean reverseSupported) {
     Objects.requireNonNull(sourceArray, "sourceArray cannot be null");
+    if (offset < 0 || length < 0 || offset + length > sourceArray.length) {
+        throw new IllegalArgumentException("Invalid offset/length: offset=" + offset + ", length=" + length + ", arrayLength=" + sourceArray.length);
+    }
     this.size = length;
     this.reverseSupported = reverseSupported;
 
     PackedLongValues.Builder forwardBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
     for (int i = offset; i < offset + length; i++) {
         forwardBuilder.add(sourceArray[i]);
     }
Suggestion importance[1-10]: 7

__

Why: Valid suggestion. The constructor at lines 55-82 lacks bounds validation for offset and length parameters, which could lead to ArrayIndexOutOfBoundsException. Adding validation as suggested would improve robustness and provide clearer error messages.

Medium
Validate long-to-int cast for row IDs

Casting long to int in oldToNew can cause data loss or incorrect mappings if row IDs
exceed Integer.MAX_VALUE. Validate the range before casting or use a different
approach that supports long values.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [369-371]

-static class ReorderingOneMerge extends MergePolicy.OneMerge {
-    private final RowIdMapping mapping;
+@Override
+public int oldToNew(int docID) {
+    long newRowId = mapping.getNewRowId(docID);
+    if (newRowId < 0 || newRowId > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Row ID out of int range: " + newRowId);
+    }
+    return (int) newRowId;
+}
 
-    ReorderingOneMerge(List<SegmentCommitInfo> segments, RowIdMapping mapping) {
-        super(segments);
-        this.mapping = mapping;
-    }
-
-    @Override
-    public Sorter.DocMap reorder(CodecReader reader, Directory dir, Executor executor) throws IOException {
-        return new Sorter.DocMap() {
-            @Override
-            public int oldToNew(int docID) {
-                return (int) mapping.getNewRowId(docID);
-            }
-
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential issue with casting long to int at line 370. However, in the context of Lucene segments, document IDs are inherently limited to int range (Lucene's maxDoc is an int), so this is unlikely to be a practical issue. Still, adding validation would make the code more defensive and explicit about assumptions.

Low
Use atomic file operations for metadata

Deleting the .si file before rewriting creates a window where the segment metadata
is incomplete. If the write fails after deletion, the segment becomes corrupted. Use
atomic file operations or write to a temporary file first, then rename.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [289-293]

 private void rewriteSegmentInfoWithSort(SegmentInfos segmentInfos, SegmentCommitInfo segmentCommitInfo) throws IOException {
     SegmentInfo originalInfo = segmentCommitInfo.info;
     Sort sort = new Sort(new SortedNumericSortField(LuceneDocumentInput.ROW_ID_FIELD, SortField.Type.LONG));
 
-    // Reconstruct SegmentInfo with the IndexSort declared
     SegmentInfo sortedInfo = new SegmentInfo(
         originalInfo.dir,
         ...
     );
     sortedInfo.setFiles(originalInfo.files());
 
-    // Delete the existing .si file before rewriting
+    // Write to temporary file first, then replace atomically
     String siFileName = originalInfo.name + ".si";
+    String tempFileName = siFileName + ".tmp";
+    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
     directory.deleteFile(siFileName);
+    directory.rename(tempFileName, siFileName);
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies a potential issue where deleting the .si file before rewriting creates a window for corruption. However, the proposed solution using rename is incomplete and may not work as intended with Lucene's Directory API. A more robust approach would involve writing to a temporary name first, but the suggestion's implementation details are not fully correct.

Low
Suggestions up to commit b5ea63e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove duplicate forceMerge call

The code calls forceMerge(1, true) twice consecutively. The first call completes the
merge before the reorder policy is configured, making the second merge ineffective.
Remove the first forceMerge call so the reorder policy is configured before merging.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [184-193]

-indexWriter.forceMerge(1, true);
-
-
 // If sort permutation is provided, configure the reorder merge policy
 if (flushInput.hasRowIdMapping()) {
     configureSortedMerge(flushInput.rowIdMapping());
 }
 
 // Common path: forceMerge to 1 segment, commit, build FileInfos
 indexWriter.forceMerge(1, true);
Suggestion importance[1-10]: 10

__

Why: The code calls forceMerge(1, true) twice consecutively at lines 184 and 193. The first call completes the merge before the reorder policy is configured (lines 188-190), making the second merge ineffective. This is a critical bug that prevents the sort permutation from being applied correctly.

High
Fix hardcoded writer generation attribute

The setMergeInfo method hardcodes the writer generation to 0 instead of using the
actual generation from the merge context. This breaks generation tracking for sorted
segments. Store the generation in ReorderingOneMerge constructor and use it here.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [380-385]

+private final long writerGeneration;
+
+ReorderingOneMerge(List<SegmentCommitInfo> segments, RowIdMapping mapping, long writerGeneration) {
+    super(segments);
+    this.mapping = mapping;
+    this.writerGeneration = writerGeneration;
+}
+
 @Override
 public void setMergeInfo(SegmentCommitInfo info) {
     super.setMergeInfo(info);
     if (info != null) {
-        info.info.putAttribute(WRITER_GENERATION_ATTRIBUTE, String.valueOf(0));
+        info.info.putAttribute(WRITER_GENERATION_ATTRIBUTE, String.valueOf(writerGeneration));
     }
 }
Suggestion importance[1-10]: 9

__

Why: The setMergeInfo method hardcodes the writer generation to 0 instead of using the actual generation. This breaks generation tracking for sorted segments, which is critical for correlating segments with their source data.

High
Remove reorderDone flag blocking subsequent merges

The reorderDone flag prevents subsequent merges after the first one completes. If
forceMerge is called multiple times on the same writer instance, the second call
will skip merging entirely. Remove the flag or reset it after each merge completes
to allow multiple flush operations.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [308-327]

 static class ReorderingMergePolicy extends MergePolicy {
     private final RowIdMapping mapping;
-    private volatile boolean reorderDone = false;
-    ...
+
+    ReorderingMergePolicy(RowIdMapping mapping) {
+        this.mapping = mapping;
+    }
+
     @Override
     public MergeSpecification findForcedMerges(...) {
-        if (reorderDone) {
-            return null; // already reordered, stop the loop
+        List<SegmentCommitInfo> segments = new ArrayList<>();
+        for (int i = 0; i < segmentInfos.size(); i++) {
+            segments.add(segmentInfos.info(i));
         }
-        reorderDone = true;
-        ...
+        if (segments.isEmpty()) {
+            return null;
+        }
+        MergeSpecification spec = new MergeSpecification();
+        spec.add(new ReorderingOneMerge(segments, mapping));
+        return spec;
     }
+    ...
 }
Suggestion importance[1-10]: 8

__

Why: The reorderDone flag at line 310 prevents subsequent merges after the first one completes. If forceMerge is called multiple times on the same writer instance, the second call will skip merging entirely, which could cause data loss or incorrect behavior.

Medium
Fix incorrect permutation reconstruction logic

The permutation reconstruction logic assumes chunk_row_ids contains original row IDs
in sorted order, but it actually contains the row IDs as they appear in each chunk
after sorting. This produces incorrect mappings. Use merge_output.mapping directly
as the permutation since it already maps old positions to new positions.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [458-481]

-let mut pos = 0usize;
-for chunk_ids in &chunk_row_ids {
-    for &original_row_id in chunk_ids {
-        let orig_idx = original_row_id as usize;
-        if orig_idx < total_rows && pos < total_rows {
-            flat_mapping[orig_idx] = merge_output.mapping[pos];
-        }
-        pos += 1;
-    }
-}
+let row_id_mapping = if !merge_output.mapping.is_empty() {
+    log_info!("sort_large_file: produced {} permutation entries for {}", merge_output.mapping.len(), output_filename);
+    Some(merge_output.mapping)
+} else {
+    None
+};
Suggestion importance[1-10]: 7

__

Why: The permutation reconstruction logic assumes chunk_row_ids contains original row IDs in sorted order, but the logic at lines 466-474 appears to build the mapping incorrectly. The suggestion to use merge_output.mapping directly may be more correct, though this requires verification of what merge_output.mapping actually contains.

Medium

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for b5ea63e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ef53768: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

PR Reviewer Guide 🔍

(Review updated until commit 3f08261)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The IndexWriter is closed before rewriting segment metadata. If rewriteSegmentInfoWithSort fails after line 205, the writer remains closed but the method continues execution. This could leave the segment in an inconsistent state (partially rewritten metadata) with no way to recover since the writer is already closed. Consider moving the close call after all metadata operations complete, or wrap the rewrite in try-catch to handle failures appropriately.

indexWriter.close();

// Verify the invariant: exactly 1 segment with docCount documents
SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory);
assert segmentInfos.size() == 1 : "Expected exactly 1 segment after force merge, got " + segmentInfos.size();

SegmentCommitInfo segmentInfo = segmentInfos.info(0);
assert segmentInfo.info.maxDoc() == docCount : "Expected " + docCount + " docs in segment, got " + segmentInfo.info.maxDoc();

// Stamp the IndexSort on the segment metadata post-commit so that
// addIndexes(Directory...) on the shared writer sees matching sort.
// The segment is always sorted by __row_id__ — either naturally (docs
// written sequentially) or via OneMerge.reorder() + row ID rewrite.
if (segmentInfo.info.getIndexSort() == null) {
    rewriteSegmentInfoWithSort(segmentInfos, segmentInfo);
}
Possible Issue

The code deletes the .si file before rewriting it. If the rewrite operation fails after deletion (e.g., codec.segmentInfoFormat().write throws), the original .si file is lost and the segment becomes unreadable. This creates a window where segment corruption can occur. Consider writing to a temporary file first, then atomically replacing the original, or ensure the write succeeds before deleting.

String siFileName = originalInfo.name + ".si";
directory.deleteFile(siFileName);

// Rewrite the .si file with sort metadata
originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
Possible Issue

When building the reverse mapping, if sourceArray contains duplicate values (multiple old positions mapping to the same new position), only the last old position is stored in newToOldArray. This silently discards earlier mappings. If the input represents a valid permutation this won't occur, but if the input is malformed or represents a many-to-one mapping, the reverse lookup will be incorrect. Consider validating that sourceArray is a valid permutation or documenting this assumption clearly.

long[] newToOldArray = new long[length];
for (int i = 0; i < length; i++) {
    int newPos = (int) sourceArray[offset + i];
    if (newPos >= 0 && newPos < length) {
        newToOldArray[newPos] = i;
    }
}
Resource Leak

If an exception is thrown between reading permAddr/permLen and the finally block that calls FREE_ROW_ID_MAPPING, the native memory is leaked. The try block starts at line 243 but the finally at line 285 only executes if no exception escapes before that point. If new PackedRowIdMapping or toArray throws, the native memory remains allocated. Wrap the entire block in try-finally to ensure cleanup.

long permLen = sortPermLenOut.get(ValueLayout.JAVA_LONG, 0);
RowIdMapping rowIdMapping = null;
if (permAddr != 0 && permLen > 0) {
    try {
        long[] mappingArray = MemorySegment.ofAddress(permAddr)
            .reinterpret(permLen * ValueLayout.JAVA_LONG.byteSize())
            .toArray(ValueLayout.JAVA_LONG);
        rowIdMapping = new PackedRowIdMapping(mappingArray, true);
    } finally {
        NativeCall.invokeVoid(FREE_ROW_ID_MAPPING, permAddr, permLen);
    }

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 9294e20: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 10, 2026

Codecov Report

❌ Patch coverage is 66.66667% with 18 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.54%. Comparing base (36809cc) to head (9294e20).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
...ch/index/engine/dataformat/PackedRowIdMapping.java 63.15% 12 Missing and 2 partials ⚠️
.../opensearch/index/engine/dataformat/FileInfos.java 60.00% 2 Missing ⚠️
...opensearch/index/engine/dataformat/MergeInput.java 60.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21468      +/-   ##
============================================
+ Coverage     73.50%   73.54%   +0.04%     
- Complexity    74644    74674      +30     
============================================
  Files          5980     5980              
  Lines        338777   338797      +20     
  Branches      48848    48853       +5     
============================================
+ Hits         249011   249162     +151     
+ Misses        69946    69816     -130     
+ Partials      19820    19819       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3f08261

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3f08261: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3f08261

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3f08261: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants