Skip to content

Improve merge performance#4

Open
Bukhtawar wants to merge 2 commits intoShailesh-Kumar-Singh:merge_parquetfrom
Bukhtawar:merge-parquet-optimised
Open

Improve merge performance#4
Bukhtawar wants to merge 2 commits intoShailesh-Kumar-Singh:merge_parquetfrom
Bukhtawar:merge-parquet-optimised

Conversation

@Bukhtawar
Copy link
Copy Markdown

@Bukhtawar Bukhtawar commented Apr 19, 2026

Description

Optimizations Applied

  1. Zero-allocation row comparison via arrow::row::RowConverter (row_compare.rs, cursor_optimized.rs)

The original merge extracted sort keys into a freshly allocated Vec on every row comparison. Flamegraph profiling showed 50% of sort key extraction time was spent in malloc. The fix: RowConverter pre-computes a comparable byte representation per batch at load time. Row comparison becomes a memcmp — no allocation, no type dispatch, no dynamic downcasting.

  1. Precomputed column index mapping (schema_optimized.rs)

The original pad_batch_to_schema did schema.index_of(field.name()) (linear name scan) for every field on every batch. ColumnMapping precomputes the source→target index mapping once per cursor and reuses it via indexed lookup.

  1. Tokio-based prefetch pool separation (cursor_optimized.rs, io_task.rs)

The original cursor prefetched next batches on the same 4-thread Rayon pool used for parallel column encoding. Under load, prefetch IO and CPU encoding competed for threads. The fix: prefetch uses tokio::spawn_blocking on the existing Tokio runtime (auto-scaling blocking pool), keeping the Rayon pool 100% available for CPU-bound encoding.

  1. Single-chunk concat_batches skip (context.rs)

When only one batch is buffered before flush, the original still called concat_batches (full memcpy). Now it pops the single batch directly.

Benchmark Results
Config	Baseline	Optimized	Improvement
3 files × 50k (150k rows)	295 ms	236 ms	19.9% faster
5 files × 100k (500k rows)	869 ms	676 ms	22.2% faster
10 files × 100k (1M rows)	1586 ms	1368 ms	13.7% faster

Related Issues

Resolves # opensearch-project#21079

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Shailesh-Kumar-Singh and others added 2 commits April 19, 2026 03:28
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>

/// Convert sort columns of a batch into comparable `Rows`.
/// Each `Row` in the result can be compared with `Ord` (zero-alloc memcmp).
pub fn convert_batch_rows(
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that for nearly-sorted Parquet files we'd mostly hit Tier 2 (emit whole batch, 1 comparison) or Tier 3 (binary search for boundary to emit a prefix, ~17 comparisons for 100K batch), we're converting the sort columns for 100K rows but only comparing a handful.
Would this still be more efficient than on-demand extraction for just the rows we need?

self.prefetch_pending = true;
let reader = Arc::clone(&self.reader);
let tx = self.prefetch_tx.clone();
get_io_runtime().spawn_blocking(move || {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial plan we discussed was to parallelize column decoding on Rayon (similar to how we parallelize column encoding via into_par_iter during flush), but the current Parquet reader API doesn't expose per-column readers, reader.next() decodes all columns sequentially in a single call.

Agreed on moving to tokio and spawning a thread here so that prefetch IO and CPU encoding do not compete.
Now that the tokio pool is shared between disk writes and prefetch, lets keep a cap on the blocking thread count via .max_blocking_threads(), Open to inputs on the max cap we should keep here.

Shailesh-Kumar-Singh pushed a commit that referenced this pull request May 9, 2026
…earch-project#21513)

* [Analytics Engine] Port json_array_length to DataFusion backend

First PPL json_* function wired through PPL → Calcite → Substrait →
DataFusion. Scaffolds the pattern every follow-up UDF reuses: Rust kernel
+ YAML signature + ScalarFunction enum entry + JsonFunctionAdapters
rename + FunctionMappings.s(...) binding + STANDARD_PROJECT_OPS entry.

Rust UDF (rust/src/udf/json_array_length.rs) coerces the input to Utf8,
parses with serde_json, and returns Int32 to match PPL's
INTEGER_FORCE_NULLABLE declaration — returning Int64 would leak through
column-valued calls even though literal args const-fold via a narrowing
CAST. Malformed / non-array / NULL input → NULL, matching legacy
JsonArrayLengthFunctionImpl's NullPolicy.ANY + Gson parity.

ScalarFunction.CAST added to STANDARD_PROJECT_OPS so PPL's implicit CAST
around a UDF call (inserted when the UDF's declared return type differs
from the eval column's inferred type) doesn't fail OpenSearchProjectRule
with "No backend supports scalar function [CAST]". DataFusion handles
CAST natively — no UDF needed.

STANDARD_PROJECT_OPS and scalarFunctionAdapters reshaped to one-entry-
per-line (Map.ofEntries / Set.of) so parallel json_* PRs append without
touching neighbour lines.

Tests:
  * 10 Rust unit tests (flat/nested arrays, non-array, malformed, NULL,
    coerce_types accept/reject, arity guard, scalar-input fast path).
  * JsonFunctionAdaptersTests guards adapter shape + return-type
    preservation (BIGINT vs LOCAL_OP's INTEGER_NULLABLE).
  * ScalarJsonFunctionIT covers happy path, empty array, non-array
    object → NULL, malformed → NULL via /_analytics/ppl.

Parity-checked against legacy SQL plugin
CalcitePPLJsonBuiltinFunctionIT.testJsonArrayLength.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] JSON: introduce jsonpath-rust parser + shared helpers

Lands the parser crate + a small shared helpers module ahead of the per-
function json_* UDFs. Keeping this on its own commit lets reviewers sign
off on the crate choice (jsonpath-rust 0.7) and path-conversion behaviour
before 8 UDF bodies land on top.

  * rust/Cargo.toml: add jsonpath-rust = "0.7".
  * rust/src/udf/json_common.rs:
      - convert_ppl_path: PPL path syntax (`a{i}.b{}`) -> JSONPath (`$.a[i].b[*]`).
        Mirrors JsonUtils.convertToJsonPath in sql/core. Empty string maps
        to "$" to match legacy root semantics.
      - parse: serde_json wrapper returning None on malformed input, the
        contract every json_* UDF will share.
      - check_arity / check_arity_range: plan_err! wrappers for the
        top-of-invoke guards.
  * rust/src/udf/mod.rs: register the module (helpers are crate-private).

Consumers land in follow-up commits on the same PR (opensearch-project#21513); a module-
level #![allow(dead_code)] keeps this commit's cargo check clean.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_keys to DataFusion backend

Adds the second PPL json_* UDF on top of opensearch-project#21476 (json_array_length).
Matches the legacy SQL-plugin contract: object → JSON-array-encoded keys
in insertion order; non-object / malformed / scalar → SQL NULL.

- Rust UDF at rust/src/udf/json_keys.rs with scalar + columnar paths
- Shared rust/src/udf/json_common.rs helpers (parse, arity, Utf8 downcast,
  PPL-path → JSONPath) seeded for later json_* UDFs
- serde_json preserve_order feature to preserve legacy LinkedHashMap ordering
- Java wiring: ScalarFunction.JSON_KEYS, JsonKeysAdapter, Substrait sig,
  YAML signature, plugin project-op + adapter registration
- ScalarJsonFunctionIT parity test for the four legacy fixtures

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_extract to DataFusion backend

Rust UDF at rust/src/udf/json_extract.rs wraps jsonpath-rust: single path →
unquoted scalar or JSON-serialized container; multi-path → JSON array with
literal null slots for misses. < 2 args, malformed doc, malformed path, and
explicit-null matches all collapse to SQL NULL, matching legacy
JsonExtractFunctionImpl's calcite jsonQuery/jsonValue pair.

JsonExtractAdapter renames the PPL call to the Rust UDF name via the variadic
path; routing lives in FunctionMappings.s(...) in DataFusionFragmentConvertor
and the STANDARD_PROJECT_OPS allow-list.

Also fixes a pre-existing transport bug in DatafusionResultStream.getFieldValue:
VarCharVector.getObject returns Arrow Text, which StreamOutput.writeGenericValue
cannot serialize, so string-valued UDF results (json_keys, json_extract) were
dropped when shard results traveled back to the coordinator. Converting
VarCharVector cells to String at the source mirrors ArrowValues.toJavaValue
and unblocks every string-returning UDF.

Parity IT (ScalarJsonFunctionIT) replays four verbatim legacy cases covering
single-path scalar/container match, wildcard multi-match, multi-path with
missing path, and explicit-null resolution.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_delete to DataFusion backend

Mutation UDF #1. Introduces the shared mutation walker that json_set,
json_append, and json_extend will reuse on the same PR.

Rust side (rust/src/udf/json_delete.rs + json_common.rs):
  * `parse_ppl_segments` tokenises PPL paths (a.b{0}.c{}) into Field /
    Index / Wildcard segments without allocating field names.
  * `walk_mut` drives a mutation closure against every terminal match in
    a serde_json::Value; missing intermediate keys and out-of-range
    indices are silent no-ops, matching Jayway's SUPPRESS_EXCEPTIONS
    behaviour that legacy `JsonDeleteFunctionImpl` (→ Calcite
    `JsonFunctions.jsonRemove`) relies on.
  * `json_delete` terminal closure: `shift_remove` on Object (preserves
    insertion order via serde_json's `preserve_order` feature),
    `Vec::remove` on Array-with-Index, `Vec::clear` on Array-with-Wildcard.
    Any-NULL-arg / malformed doc / malformed path → NULL.

The walker is generic enough that json_set / json_append / json_extend
are now pure terminal-closure swaps (set value, push value, extend
array) — no further traversal plumbing needed.

Java side:
  * JSON_DELETE added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonDeleteAdapter` is a plain `AbstractNameMappingAdapter` rename
    (matches the other json_* adapters).
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract.

Tests:
  * 10 Rust unit tests for json_delete (4 legacy IT fixtures replayed:
    flat-key, nested, missing-path-unchanged, wildcard-array; plus
    any-NULL / malformed / coerce_types / return_type).
  * 4 new walker tests in json_common (tokeniser, flat-delete,
    missing-noop, wildcard-fan-out, index-out-of-range-noop).
  * ScalarJsonFunctionIT gains `testJsonDeleteParityWithLegacy`
    replaying all 4 legacy assertions.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonDelete*`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_set to DataFusion backend

Mutation UDF #2. Reuses the walker introduced by #json_delete; this
commit is a pure terminal-closure swap on the Rust side (replace, not
remove) plus the usual 7-file Java/YAML wiring.

Rust side (rust/src/udf/json_set.rs):
  * Terminal closure overwrites only existing keys on Object
    (`map.contains_key` guard), in-range slots on Array-with-Index, and
    every element on Array-with-Wildcard. This is the replace-only
    semantics from legacy `JsonSetFunctionImpl` (→ Calcite
    `JsonFunctions.jsonSet`, which guards `ctx.set` with
    `ctx.read(k) != null`).
  * Variadic arity: (doc, path1, val1, [path2, val2, ...]). Fewer than
    3 args or an odd total (unpaired trailing path) short-circuits to
    NULL, mirroring the "malformed input → NULL" convention the other
    json_* UDFs follow.
  * Values are always stored as `Value::String` because every arg is
    coerced to Utf8 by `coerce_types` — matches the legacy fixture's
    `"b":"3"` (stringified, not numeric).
  * Root-path (`parse_ppl_segments` returns empty) is a no-op to match
    Jayway's behaviour: `ctx.set("$", v)` silently fails because the
    root is indelible and unreplaceable.

Java side:
  * JSON_SET added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonSetAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract / json_delete.

Tests:
  * 9 Rust unit tests for json_set (3 legacy IT fixtures replayed:
    wildcard-replace, wrong-path-unchanged, partial-wildcard-set; plus
    multi-pair / any-NULL / malformed-doc / malformed-path /
    coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonSetParityWithLegacy` replaying
    all 3 legacy assertions.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonSet*`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_append to DataFusion backend

Mutation UDF #3. Another walker reuse: terminal closure pushes the
paired value onto array-valued targets (non-array / missing targets
are silent no-ops).

Rust side (rust/src/udf/json_append.rs):
  * Terminal closure branches: Object+Field → look up field, if it's an
    Array push the stringified value; Array+Index → if the indexed slot
    is an Array, push; Array+Wildcard → push onto every array-valued
    child. Non-array matches are skipped, matching legacy
    `JsonFunctions.jsonInsert` via Jayway's Collection-parent branch
    (`Collection.add`) which is how `JsonAppendFunctionImpl`'s
    `.meaningless_key` suffix trick ultimately expands.
  * Variadic arity (doc, path1, val1, [path2, val2, ...]). Fewer than 3
    args or an odd total (unpaired trailing path) → NULL — the
    malformed-input-to-NULL convention all other json_* UDFs share.
    Matches legacy's `RuntimeException("needs corresponding path and
    values")` observably-as-error via NULL surface.
  * Pre-stringified values: all args are Utf8-coerced at `coerce_types`
    entry, so nested `json_object(...)` / `json_array(...)` arrive here
    already stringified. They are pushed as `Value::String`, which
    reproduces the legacy IT's quoted-JSON-as-element rows without the
    new engine having to implement `json_object`/`json_array` yet
    (they ship in a follow-up PR).

Java side:
  * JSON_APPEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonAppendAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract / json_delete / json_set.

Tests:
  * 12 Rust unit tests for json_append (3 legacy IT fixtures replayed
    with pre-stringified nested JSON: named-array push, nested-path
    push, stringified-object push; plus multi-pair / wildcard-fan-out /
    non-array-noop / missing-path-noop / any-NULL / malformed-doc /
    malformed-path / coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonAppendParityWithLegacy`
    replaying all 3 legacy assertions with literal stringified JSON in
    place of the nested constructor calls the legacy test uses.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonAppend`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_extend to DataFusion backend

Mutation UDF #4 — last walker reuse. Same push shape as json_append,
but each paired value is first tried as a JSON-array parse: success →
spread the elements; failure → push the whole string as one element
(parity with legacy `JsonExtendFunctionImpl`'s `gson.fromJson(v,
List.class)` try/fall-back).

Rust side (rust/src/udf/json_extend.rs):
  * Helper `spread(raw) -> Vec<Value>`: returns the parsed items when
    `raw` is a JSON array, else `[Value::String(raw)]`. Scalars,
    objects, and malformed JSON all go through the single-push branch.
  * Terminal closure reuses json_append's array-target guards (Object
    field → Array, Array+Index → inner Array, Array+Wildcard → every
    array child). `Vec::extend(items.iter().cloned())` handles the
    spread and the single-push case uniformly.
  * Variadic arity matches every other mutation UDF. Invalid arity /
    any-NULL / malformed-doc / malformed-path → NULL.

Deliberate divergence from legacy: integer-typed spread elements stay
integers (serde_json preserves source type) rather than being widened
to Double as Gson does. Documented in `json.md:555` but not covered by
any legacy IT; we preserve the more useful default and will file a
tracking issue for the wider Gson-compat decision.

Java side:
  * JSON_EXTEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonExtendAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    the other variadic json_* UDFs.

Tests:
  * 13 Rust unit tests for json_extend (3 legacy IT fixtures replayed:
    single-push on non-array value, plain-string push, JSON-array
    spread; plus empty-array-value / mixed-type-spread / wildcard-fan
    / non-array-noop / missing-path-noop / any-NULL / malformed-doc /
    malformed-path / coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonExtendParityWithLegacy`
    replaying all 3 legacy assertions with literal stringified JSON
    standing in for the nested constructor calls the legacy test uses.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonExtend`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

---------

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>
@Shailesh-Kumar-Singh Shailesh-Kumar-Singh force-pushed the merge_parquet branch 4 times, most recently from 58a9ebd to 30d7a3b Compare May 11, 2026 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants