Improve merge performance#4
Improve merge performance#4Bukhtawar wants to merge 2 commits intoShailesh-Kumar-Singh:merge_parquetfrom
Conversation
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
|
|
||
| /// Convert sort columns of a batch into comparable `Rows`. | ||
| /// Each `Row` in the result can be compared with `Ord` (zero-alloc memcmp). | ||
| pub fn convert_batch_rows( |
There was a problem hiding this comment.
Given that for nearly-sorted Parquet files we'd mostly hit Tier 2 (emit whole batch, 1 comparison) or Tier 3 (binary search for boundary to emit a prefix, ~17 comparisons for 100K batch), we're converting the sort columns for 100K rows but only comparing a handful.
Would this still be more efficient than on-demand extraction for just the rows we need?
| self.prefetch_pending = true; | ||
| let reader = Arc::clone(&self.reader); | ||
| let tx = self.prefetch_tx.clone(); | ||
| get_io_runtime().spawn_blocking(move || { |
There was a problem hiding this comment.
The initial plan we discussed was to parallelize column decoding on Rayon (similar to how we parallelize column encoding via into_par_iter during flush), but the current Parquet reader API doesn't expose per-column readers, reader.next() decodes all columns sequentially in a single call.
Agreed on moving to tokio and spawning a thread here so that prefetch IO and CPU encoding do not compete.
Now that the tokio pool is shared between disk writes and prefetch, lets keep a cap on the blocking thread count via .max_blocking_threads(), Open to inputs on the max cap we should keep here.
40d152e to
d97d3ce
Compare
322aded to
d88b19d
Compare
…earch-project#21513) * [Analytics Engine] Port json_array_length to DataFusion backend First PPL json_* function wired through PPL → Calcite → Substrait → DataFusion. Scaffolds the pattern every follow-up UDF reuses: Rust kernel + YAML signature + ScalarFunction enum entry + JsonFunctionAdapters rename + FunctionMappings.s(...) binding + STANDARD_PROJECT_OPS entry. Rust UDF (rust/src/udf/json_array_length.rs) coerces the input to Utf8, parses with serde_json, and returns Int32 to match PPL's INTEGER_FORCE_NULLABLE declaration — returning Int64 would leak through column-valued calls even though literal args const-fold via a narrowing CAST. Malformed / non-array / NULL input → NULL, matching legacy JsonArrayLengthFunctionImpl's NullPolicy.ANY + Gson parity. ScalarFunction.CAST added to STANDARD_PROJECT_OPS so PPL's implicit CAST around a UDF call (inserted when the UDF's declared return type differs from the eval column's inferred type) doesn't fail OpenSearchProjectRule with "No backend supports scalar function [CAST]". DataFusion handles CAST natively — no UDF needed. STANDARD_PROJECT_OPS and scalarFunctionAdapters reshaped to one-entry- per-line (Map.ofEntries / Set.of) so parallel json_* PRs append without touching neighbour lines. Tests: * 10 Rust unit tests (flat/nested arrays, non-array, malformed, NULL, coerce_types accept/reject, arity guard, scalar-input fast path). * JsonFunctionAdaptersTests guards adapter shape + return-type preservation (BIGINT vs LOCAL_OP's INTEGER_NULLABLE). * ScalarJsonFunctionIT covers happy path, empty array, non-array object → NULL, malformed → NULL via /_analytics/ppl. Parity-checked against legacy SQL plugin CalcitePPLJsonBuiltinFunctionIT.testJsonArrayLength. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] JSON: introduce jsonpath-rust parser + shared helpers Lands the parser crate + a small shared helpers module ahead of the per- function json_* UDFs. Keeping this on its own commit lets reviewers sign off on the crate choice (jsonpath-rust 0.7) and path-conversion behaviour before 8 UDF bodies land on top. * rust/Cargo.toml: add jsonpath-rust = "0.7". * rust/src/udf/json_common.rs: - convert_ppl_path: PPL path syntax (`a{i}.b{}`) -> JSONPath (`$.a[i].b[*]`). Mirrors JsonUtils.convertToJsonPath in sql/core. Empty string maps to "$" to match legacy root semantics. - parse: serde_json wrapper returning None on malformed input, the contract every json_* UDF will share. - check_arity / check_arity_range: plan_err! wrappers for the top-of-invoke guards. * rust/src/udf/mod.rs: register the module (helpers are crate-private). Consumers land in follow-up commits on the same PR (opensearch-project#21513); a module- level #![allow(dead_code)] keeps this commit's cargo check clean. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_keys to DataFusion backend Adds the second PPL json_* UDF on top of opensearch-project#21476 (json_array_length). Matches the legacy SQL-plugin contract: object → JSON-array-encoded keys in insertion order; non-object / malformed / scalar → SQL NULL. - Rust UDF at rust/src/udf/json_keys.rs with scalar + columnar paths - Shared rust/src/udf/json_common.rs helpers (parse, arity, Utf8 downcast, PPL-path → JSONPath) seeded for later json_* UDFs - serde_json preserve_order feature to preserve legacy LinkedHashMap ordering - Java wiring: ScalarFunction.JSON_KEYS, JsonKeysAdapter, Substrait sig, YAML signature, plugin project-op + adapter registration - ScalarJsonFunctionIT parity test for the four legacy fixtures Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_extract to DataFusion backend Rust UDF at rust/src/udf/json_extract.rs wraps jsonpath-rust: single path → unquoted scalar or JSON-serialized container; multi-path → JSON array with literal null slots for misses. < 2 args, malformed doc, malformed path, and explicit-null matches all collapse to SQL NULL, matching legacy JsonExtractFunctionImpl's calcite jsonQuery/jsonValue pair. JsonExtractAdapter renames the PPL call to the Rust UDF name via the variadic path; routing lives in FunctionMappings.s(...) in DataFusionFragmentConvertor and the STANDARD_PROJECT_OPS allow-list. Also fixes a pre-existing transport bug in DatafusionResultStream.getFieldValue: VarCharVector.getObject returns Arrow Text, which StreamOutput.writeGenericValue cannot serialize, so string-valued UDF results (json_keys, json_extract) were dropped when shard results traveled back to the coordinator. Converting VarCharVector cells to String at the source mirrors ArrowValues.toJavaValue and unblocks every string-returning UDF. Parity IT (ScalarJsonFunctionIT) replays four verbatim legacy cases covering single-path scalar/container match, wildcard multi-match, multi-path with missing path, and explicit-null resolution. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_delete to DataFusion backend Mutation UDF #1. Introduces the shared mutation walker that json_set, json_append, and json_extend will reuse on the same PR. Rust side (rust/src/udf/json_delete.rs + json_common.rs): * `parse_ppl_segments` tokenises PPL paths (a.b{0}.c{}) into Field / Index / Wildcard segments without allocating field names. * `walk_mut` drives a mutation closure against every terminal match in a serde_json::Value; missing intermediate keys and out-of-range indices are silent no-ops, matching Jayway's SUPPRESS_EXCEPTIONS behaviour that legacy `JsonDeleteFunctionImpl` (→ Calcite `JsonFunctions.jsonRemove`) relies on. * `json_delete` terminal closure: `shift_remove` on Object (preserves insertion order via serde_json's `preserve_order` feature), `Vec::remove` on Array-with-Index, `Vec::clear` on Array-with-Wildcard. Any-NULL-arg / malformed doc / malformed path → NULL. The walker is generic enough that json_set / json_append / json_extend are now pure terminal-closure swaps (set value, push value, extend array) — no further traversal plumbing needed. Java side: * JSON_DELETE added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and `scalarFunctionAdapters`. * `JsonDeleteAdapter` is a plain `AbstractNameMappingAdapter` rename (matches the other json_* adapters). * Substrait YAML signature uses `variadic: {min: 1}` — same shape as json_extract. Tests: * 10 Rust unit tests for json_delete (4 legacy IT fixtures replayed: flat-key, nested, missing-path-unchanged, wildcard-array; plus any-NULL / malformed / coerce_types / return_type). * 4 new walker tests in json_common (tokeniser, flat-delete, missing-noop, wildcard-fan-out, index-out-of-range-noop). * ScalarJsonFunctionIT gains `testJsonDeleteParityWithLegacy` replaying all 4 legacy assertions. Parity-checked against legacy SQL plugin `CalcitePPLJsonBuiltinFunctionIT.testJsonDelete*`. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_set to DataFusion backend Mutation UDF #2. Reuses the walker introduced by #json_delete; this commit is a pure terminal-closure swap on the Rust side (replace, not remove) plus the usual 7-file Java/YAML wiring. Rust side (rust/src/udf/json_set.rs): * Terminal closure overwrites only existing keys on Object (`map.contains_key` guard), in-range slots on Array-with-Index, and every element on Array-with-Wildcard. This is the replace-only semantics from legacy `JsonSetFunctionImpl` (→ Calcite `JsonFunctions.jsonSet`, which guards `ctx.set` with `ctx.read(k) != null`). * Variadic arity: (doc, path1, val1, [path2, val2, ...]). Fewer than 3 args or an odd total (unpaired trailing path) short-circuits to NULL, mirroring the "malformed input → NULL" convention the other json_* UDFs follow. * Values are always stored as `Value::String` because every arg is coerced to Utf8 by `coerce_types` — matches the legacy fixture's `"b":"3"` (stringified, not numeric). * Root-path (`parse_ppl_segments` returns empty) is a no-op to match Jayway's behaviour: `ctx.set("$", v)` silently fails because the root is indelible and unreplaceable. Java side: * JSON_SET added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and `scalarFunctionAdapters`. * `JsonSetAdapter` is a plain `AbstractNameMappingAdapter` rename. * Substrait YAML signature uses `variadic: {min: 1}` — same shape as json_extract / json_delete. Tests: * 9 Rust unit tests for json_set (3 legacy IT fixtures replayed: wildcard-replace, wrong-path-unchanged, partial-wildcard-set; plus multi-pair / any-NULL / malformed-doc / malformed-path / coerce_types / return_type). * ScalarJsonFunctionIT gains `testJsonSetParityWithLegacy` replaying all 3 legacy assertions. Parity-checked against legacy SQL plugin `CalcitePPLJsonBuiltinFunctionIT.testJsonSet*`. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_append to DataFusion backend Mutation UDF #3. Another walker reuse: terminal closure pushes the paired value onto array-valued targets (non-array / missing targets are silent no-ops). Rust side (rust/src/udf/json_append.rs): * Terminal closure branches: Object+Field → look up field, if it's an Array push the stringified value; Array+Index → if the indexed slot is an Array, push; Array+Wildcard → push onto every array-valued child. Non-array matches are skipped, matching legacy `JsonFunctions.jsonInsert` via Jayway's Collection-parent branch (`Collection.add`) which is how `JsonAppendFunctionImpl`'s `.meaningless_key` suffix trick ultimately expands. * Variadic arity (doc, path1, val1, [path2, val2, ...]). Fewer than 3 args or an odd total (unpaired trailing path) → NULL — the malformed-input-to-NULL convention all other json_* UDFs share. Matches legacy's `RuntimeException("needs corresponding path and values")` observably-as-error via NULL surface. * Pre-stringified values: all args are Utf8-coerced at `coerce_types` entry, so nested `json_object(...)` / `json_array(...)` arrive here already stringified. They are pushed as `Value::String`, which reproduces the legacy IT's quoted-JSON-as-element rows without the new engine having to implement `json_object`/`json_array` yet (they ship in a follow-up PR). Java side: * JSON_APPEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and `scalarFunctionAdapters`. * `JsonAppendAdapter` is a plain `AbstractNameMappingAdapter` rename. * Substrait YAML signature uses `variadic: {min: 1}` — same shape as json_extract / json_delete / json_set. Tests: * 12 Rust unit tests for json_append (3 legacy IT fixtures replayed with pre-stringified nested JSON: named-array push, nested-path push, stringified-object push; plus multi-pair / wildcard-fan-out / non-array-noop / missing-path-noop / any-NULL / malformed-doc / malformed-path / coerce_types / return_type). * ScalarJsonFunctionIT gains `testJsonAppendParityWithLegacy` replaying all 3 legacy assertions with literal stringified JSON in place of the nested constructor calls the legacy test uses. Parity-checked against legacy SQL plugin `CalcitePPLJsonBuiltinFunctionIT.testJsonAppend`. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> * [Analytics Engine] Port json_extend to DataFusion backend Mutation UDF #4 — last walker reuse. Same push shape as json_append, but each paired value is first tried as a JSON-array parse: success → spread the elements; failure → push the whole string as one element (parity with legacy `JsonExtendFunctionImpl`'s `gson.fromJson(v, List.class)` try/fall-back). Rust side (rust/src/udf/json_extend.rs): * Helper `spread(raw) -> Vec<Value>`: returns the parsed items when `raw` is a JSON array, else `[Value::String(raw)]`. Scalars, objects, and malformed JSON all go through the single-push branch. * Terminal closure reuses json_append's array-target guards (Object field → Array, Array+Index → inner Array, Array+Wildcard → every array child). `Vec::extend(items.iter().cloned())` handles the spread and the single-push case uniformly. * Variadic arity matches every other mutation UDF. Invalid arity / any-NULL / malformed-doc / malformed-path → NULL. Deliberate divergence from legacy: integer-typed spread elements stay integers (serde_json preserves source type) rather than being widened to Double as Gson does. Documented in `json.md:555` but not covered by any legacy IT; we preserve the more useful default and will file a tracking issue for the wider Gson-compat decision. Java side: * JSON_EXTEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and `scalarFunctionAdapters`. * `JsonExtendAdapter` is a plain `AbstractNameMappingAdapter` rename. * Substrait YAML signature uses `variadic: {min: 1}` — same shape as the other variadic json_* UDFs. Tests: * 13 Rust unit tests for json_extend (3 legacy IT fixtures replayed: single-push on non-array value, plain-string push, JSON-array spread; plus empty-array-value / mixed-type-spread / wildcard-fan / non-array-noop / missing-path-noop / any-NULL / malformed-doc / malformed-path / coerce_types / return_type). * ScalarJsonFunctionIT gains `testJsonExtendParityWithLegacy` replaying all 3 legacy assertions with literal stringified JSON standing in for the nested constructor calls the legacy test uses. Parity-checked against legacy SQL plugin `CalcitePPLJsonBuiltinFunctionIT.testJsonExtend`. Signed-off-by: Eric Wei <mengwei.eric@gmail.com> --------- Signed-off-by: Eric Wei <mengwei.eric@gmail.com>
58a9ebd to
30d7a3b
Compare
Description
Optimizations Applied
The original merge extracted sort keys into a freshly allocated Vec on every row comparison. Flamegraph profiling showed 50% of sort key extraction time was spent in malloc. The fix: RowConverter pre-computes a comparable byte representation per batch at load time. Row comparison becomes a memcmp — no allocation, no type dispatch, no dynamic downcasting.
The original pad_batch_to_schema did schema.index_of(field.name()) (linear name scan) for every field on every batch. ColumnMapping precomputes the source→target index mapping once per cursor and reuses it via indexed lookup.
The original cursor prefetched next batches on the same 4-thread Rayon pool used for parallel column encoding. Under load, prefetch IO and CPU encoding competed for threads. The fix: prefetch uses tokio::spawn_blocking on the existing Tokio runtime (auto-scaling blocking pool), keeping the Rayon pool 100% available for CPU-bound encoding.
When only one batch is buffered before flush, the original still called concat_batches (full memcpy). Now it pops the single batch directly.
Related Issues
Resolves # opensearch-project#21079
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.