Skip to content

Avoiding repeated encoding and compression operation for the sort column included writes#21464

Merged
mgodwan merged 1 commit intoopensearch-project:mainfrom
chaitanya588:merge_parquet_op_final
May 9, 2026
Merged

Avoiding repeated encoding and compression operation for the sort column included writes#21464
mgodwan merged 1 commit intoopensearch-project:mainfrom
chaitanya588:merge_parquet_op_final

Conversation

@chaitanya588
Copy link
Copy Markdown
Contributor

@chaitanya588 chaitanya588 commented May 4, 2026

Summary

When sort columns are configured, the writer now stages incoming batches as Arrow IPC instead of Parquet, avoiding redundant Parquet encoding and compression before the sort-on-close step. For large files, batches are individually sorted and written as Parquet chunks, then merged via the existing streaming k-way merge. The per-batch sort uses Arrow's RowConverter for compact byte-comparable row encoding.

Details

Arrow IPC staging for sorted writes

Previously, all data was written as Parquet regardless of whether sort columns were configured. On finalize, the Parquet file was read back (decoded), sorted, and re-written as Parquet — a full encode-decode-encode cycle. Now, when sort columns are present, a WriterVariant::Ipc path writes batches as Arrow IPC (raw in-memory Arrow buffers with minimal framing). On finalize, the IPC file is read back with near-zero deserialization cost, sorted, and written as the final Parquet file — a single encode step.

When no sort columns are configured, the WriterVariant::Parquet path is used as before, and finalize simply renames the temp file to the final path.

Small and large file sort strategies

The sort-on-close path (sort_and_rewrite_parquet) uses a size-based threshold (sort_in_memory_threshold_bytes, default 32 MB) to choose between two strategies:

  • Small files (sort_small_file): All IPC batches are read into memory, concatenated, sorted, row IDs rewritten, and written as a single Parquet file.
  • Large files (sort_large_file): Each IPC batch is sliced into sort_batch_size chunks (to bound memory since IPC batches can be arbitrarily large), each chunk is sorted individually and written as a temporary Parquet chunk file, then the existing streaming k-way merge produces the final globally-sorted Parquet output. Chunk files are cleaned up after the merge.

RowConverter-based per-batch sort

The sort_batch function now uses Arrow's RowConverter instead of lexsort_to_indices. RowConverter converts sort columns into compact byte-comparable rows where sort direction and null ordering are baked into the encoding. Sorting then reduces to comparing opaque byte sequences via sort_unstable_by, followed by a take to reorder all columns. This approach handles all Arrow data types uniformly without a per-type dispatch in the sort comparator.

CRC32 integration

The IPC staging path integrates with the existing CrcWriter-based CRC32 computation. Since IPC batches are not Parquet-encoded, the CRC is computed during the final Parquet write step (write_final_file for small files, merge_sorted for large files) rather than during the initial staging write. The WriterState.crc_handle field is Option<CrcHandle>Some for the Parquet variant, None for IPC.

Testing

Existing tests cover both sorted (IPC path) and unsorted (Parquet path) writer lifecycles, including concurrent writers, empty data, multi-batch sorting, ascending/descending sort, and mixed IPC+Parquet coexistence. New integration tests validate the IPC staging cleanup, sorted output correctness, and concurrent sorted writer behavior.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@chaitanya588 chaitanya588 marked this pull request as ready for review May 4, 2026 05:04
@chaitanya588 chaitanya588 requested a review from a team as a code owner May 4, 2026 05:04
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit cfde86d.

PathLineSeverityDescription
sandbox/libs/dataformat-native/rust/Cargo.toml21highNew dependency added: arrow-ipc = "57.3.0". Per mandatory policy, all dependency additions must be flagged for maintainer verification regardless of apparent legitimacy. The version aligns with existing arrow-* crates (57.3.0) and the crate is from the apache/arrow-rs ecosystem, but artifact authenticity cannot be verified without manual inspection of the resolved crate source.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@chaitanya588 chaitanya588 force-pushed the merge_parquet_op_final branch 2 times, most recently from 45fe916 to 34cff74 Compare May 4, 2026 06:08
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/heap.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs
…umn included writes

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
@chaitanya588 chaitanya588 force-pushed the merge_parquet_op_final branch from b8d34fa to cfde86d Compare May 9, 2026 12:12
@mgodwan mgodwan added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 9, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Data Loss

In sort_large_file, when no batches contain data (all batches have zero rows), the function returns Ok(0) without creating an output file. The caller expects a file at output_filename to exist after this function completes. If the final Parquet file is not created, subsequent code that attempts to open or read output_filename will fail with a file-not-found error. This occurs when the IPC staging file contains only empty batches.

if chunk_paths.is_empty() {
    log_debug!("No data to sort in file: {}", temp_filename);
    return Ok(0);
}
Resource Leak

In sort_large_file, if merge_sorted fails after chunk files have been written, the function returns early via ? without cleaning up the temporary chunk files in chunk_paths. These files remain on disk indefinitely. The cleanup loop at lines 425-427 is only reached if the merge succeeds.

let _merge_output = merge_sorted(
    &chunk_paths,
    output_filename,
    index_name,
    sort_columns,
    reverse_sorts,
    nulls_first,
)
.map_err(|e| -> Box<dyn std::error::Error> {
    format!("Streaming merge failed: {}", e).into()
})?;
Possible Issue

In sort_batch, sort_indices is declared as Vec<u32> and sorted in-place. If batch.num_rows() exceeds u32::MAX (approximately 4.3 billion rows), the cast (0..batch.num_rows() as u32) will silently wrap or truncate, producing incorrect indices. This leads to data corruption during the take operation. While unlikely in typical scenarios, large batches could trigger this if batch_size or concatenated batch sizes are not bounded.

let mut sort_indices: Vec<u32> = (0..batch.num_rows() as u32).collect();
sort_indices.sort_unstable_by(|&a, &b| rows.row(a as usize).cmp(&rows.row(b as usize)));

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Prevent orphaned IPC staging files

If create_writer fails after creating the IPC staging file but before inserting into
WRITERS, the staging file will be orphaned. Consider wrapping the file creation in a
cleanup guard or removing the file on error paths to prevent disk space leaks.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [132-135]

 let ipc_path = format!("{}{}", temp_filename, IPC_STAGING_SUFFIX);
-let file = File::create(&ipc_path)?;
-let ipc_writer = IpcFileWriter::try_new(file, &schema)?;
+let file = File::create(&ipc_path).map_err(|e| {
+    let _ = std::fs::remove_file(&ipc_path);
+    e
+})?;
+let ipc_writer = IpcFileWriter::try_new(file, &schema).map_err(|e| {
+    let _ = std::fs::remove_file(&ipc_path);
+    e
+})?;
 (WriterVariant::Ipc(Arc::new(Mutex::new(ipc_writer))), None)
Suggestion importance[1-10]: 7

__

Why: Valid concern about orphaned files if create_writer fails after creating the IPC staging file. The suggestion to add cleanup on error paths would prevent disk space leaks. However, the improved_code has a flaw: it attempts to remove the file even if File::create failed, which is unnecessary.

Medium
Handle IPC staging file cleanup errors

The IPC staging file removal silently ignores errors. If the file cannot be deleted
(e.g., due to permissions or locks), this could lead to disk space leaks over time.
Log the error or propagate it to ensure cleanup failures are visible.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [216-217]

 let ipc_path = format!("{}{}", temp_filename, IPC_STAGING_SUFFIX);
 let crc32 = Self::sort_and_rewrite_parquet(&ipc_path, &filename, index_name, &settings.sort_columns, &settings.reverse_sorts, &settings.nulls_first, writer_generation)?;
-let _ = std::fs::remove_file(&ipc_path);
+if let Err(e) = std::fs::remove_file(&ipc_path) {
+    log_error!("Failed to remove IPC staging file {}: {}", ipc_path, e);
+}
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that std::fs::remove_file errors are silently ignored with let _ =. Logging cleanup failures would improve observability and help detect disk space leaks. However, this is a minor improvement since the file is temporary and the system will eventually clean it up.

Low
Avoid potential overflow in sort indices

Casting u32 to usize in the comparison closure could panic if batch.num_rows()
exceeds u32::MAX. Although unlikely in practice, verify that batch sizes are bounded
or use usize indices throughout to avoid potential overflow issues.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [471-473]

 let rows = converter.convert_columns(&sort_arrays)?;
-let mut sort_indices: Vec<u32> = (0..batch.num_rows() as u32).collect();
-sort_indices.sort_unstable_by(|&a, &b| rows.row(a as usize).cmp(&rows.row(b as usize)));
+let mut sort_indices: Vec<usize> = (0..batch.num_rows()).collect();
+sort_indices.sort_unstable_by(|&a, &b| rows.row(a).cmp(&rows.row(b)));
+let indices = arrow::array::UInt32Array::from(
+    sort_indices.into_iter().map(|i| i as u32).collect::<Vec<_>>()
+);
Suggestion importance[1-10]: 3

__

Why: The concern about u32 overflow is theoretical since Arrow batches are unlikely to exceed u32::MAX rows in practice. The suggested change adds unnecessary complexity by converting between usize and u32 twice. The current code is more efficient and the risk is negligible.

Low

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

✅ Gradle check result for cfde86d: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 9, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.42%. Comparing base (8f72a95) to head (cfde86d).

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21464      +/-   ##
============================================
- Coverage     73.48%   73.42%   -0.07%     
+ Complexity    74646    74552      -94     
============================================
  Files          5980     5980              
  Lines        338777   338777              
  Branches      48848    48848              
============================================
- Hits         248964   248746     -218     
- Misses        70026    70207     +181     
- Partials      19787    19824      +37     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mgodwan mgodwan merged commit 4d40eb8 into opensearch-project:main May 9, 2026
22 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants