Skip to content

R6 csv empty sym#194

Open
singaraiona wants to merge 9 commits intomasterfrom
r6-csv-empty-sym
Open

R6 csv empty sym#194
singaraiona wants to merge 9 commits intomasterfrom
r6-csv-empty-sym

Conversation

@singaraiona
Copy link
Copy Markdown
Collaborator

No description provided.

`(select {s: (sum a) from: t})` was returning N copies of the same
value instead of a single row.  The projection-only path lowered
aggregates as ordinary column expressions, so OP_SELECT saw a scalar
atom and broadcast it to the input row count (exec.c: vec->type<0 ->
broadcast_scalar).

Route the all-aggregate / no-by case through ray_group(n_keys=0),
which already has a 1-row scalar-aggregate fast path.  WHERE is
pre-executed (same pattern as the by-with-where fuse path) so the
lazy g->selection bitmap reaches the reduction.

The n_keys==0 parallel scalar path was effectively dead code before
this and its FIRST/LAST merge silently relied on worker-id order
matching row-index order — broken under work-stealing dispatch.
Force serial execution when FIRST/LAST is in play; the DA path stays
parallel and tracks per-slot first_row/last_row already.

Two existing tests asserted the buggy broadcast row count
(groupby_aggregators.rfl:64, group_coverage.rfl:417); updated to the
correct 1-row expectation.
…t), LIKE on dict SYM

Lands the four findings + bonus from RAYFORCE_BOTTLENECKS.md, taking
ClickBench hot-run total from ~1.6 M ms to ~14 K ms across 40
measurable queries (≈99% reduction).

* Fused `select { … asc/desc: c take: K }` lowers to bounded-heap
  top-K when k << nrows and keys resolve to plain column refs.
  Single-key uses the radix-encoded fast path; multi-key falls back
  to the comparator-based heap.  Q26 SearchPhrase: 5 186 → 72 ms.

* Grouped `count(distinct)` no longer routed through per-group
  eval-fallback — the fused OP_COUNT_DISTINCT runs per group-slice.
  Scaling moves from 94×/decade to ≈4.6×/decade between 100 K and
  1 M rows (essentially linear).

* LIKE on dict-encoded SYM scans the dictionary once and lifts the
  result through the codes vector instead of re-evaluating per row.
  Low-card SYM (54-unique BrowserCountry): 52 → 3.65 ms (14×).
  High-card SYM (1.73 M-unique URL): 498 → 220 ms (2.3×).

* Unifies the previously-divergent glob matchers (eval used `*?[abc]`,
  DAG used SQL `%_`; one variant blew up exponentially on
  `a*a*…a*b` against an a-only string) behind a single iterative
  two-pointer implementation in src/ops/glob.{c,h}.  Both call sites
  delegate.

* Bonus: `(at table (iasc table.col))` no longer crashes on tables —
  re-indexes each column to return a TABLE.

Tests: query_coverage / read_csv / reserved_namespace updated for the
new dispatch paths; cross_type_workout / collection/at extended.
CSV format conflates "field empty" and "field missing" — both look
like a zero-length cell.  The SYM materialisation path treated the
parse-time null bit as the q/k null sentinel ID 0, so `(!= col "")`
never excluded those rows: the value-vs-null comparison kernel
returns true for `0Ns != ""` (matching q semantics, not SQL's).

After this change the loader interns "" once per call and remaps
null-flagged SYM rows to that ID, clearing their null bit so the
compare kernel takes the both-non-null branch.  Net effect — empty
TSV/CSV cells round-trip through Rayforce as the empty SYM, matching
how DuckDB / Spark / polars handle the same input.

Affects ten ClickBench queries that filter on `(!= col "")`:
Q11, Q22, Q23, Q25–Q27, Q31, Q32, Q37, Q38.  Selectivity ranges
from 1.0001× (URL is rarely empty) to 26× (MobilePhoneModel cuts
5M → 192K) — see ClickBench/rayforce/REMAINING_FIXES.md §R6 for
the per-query expected delta.

RAY_STR columns and non-string types preserve the null distinction
unchanged.  test/test_csv.c::null_sym + null_mixed_columns updated
for the new SYM behaviour; new R6 fixture in
test/rfl/system/read_csv.rfl.
… LIST

Q35 = `(select {one: 1, c: (count URL), from: hits, by: URL, desc: c, take: 10})`
ran in 130–159 ms vs Q34 (the same query without `one: 1`) at 21 ms —
a 6× regression from one literal column.  Two paths fed the cost:

* The non-agg scatter at line 4180+ allocated ~70 MB of bookkeeping
  (gk/row_gid/cnt/off/pos for n_groups + 2*n_groups hash slots) and
  walked all 5 M rows building row→gid even for expressions that
  don't reference any column.
* The per-cell broadcast loop then retained the literal n_groups
  times into a RAY_LIST, and the LIST column blocked apply_sort_take
  from picking the top-K fast path downstream.

Detect at the top of the n_nonaggs > 0 branch whether every non-agg
expression is a self-evaluating atom literal (atom-typed, no
RAY_ATTR_NAME → not a name reference), pre-allocate one typed
broadcast vec per literal via `atom_broadcast_vec`, and skip directly
to `nonagg_done` past the row→gid setup.  `can_atom_broadcast` gates
on supported atom types so we never half-apply and have to roll back.

Q35 is now 22 ms — within the noise of Q34.  Two-literal variant
(`{one: 1, two: 2, …}`) lands at 23 ms, also broadcast.  Tests
(2072 / 2073, 1 skipped) all green; the previous regression in
`select_by_nonagg_list_col` / `select_by_nonagg_colref_vs_const` —
where `m2: m` was eagerly broadcast as if `m` were a literal — went
away after gating on `!(attrs & RAY_ATTR_NAME)` in the predicate.

Verifying probe: bench/bottleneck/R8_const_column.rfl.
…atom

The atomic_map_binary_op DAG path excludes SYM (IS_NUM_TYPE doesn't
list it), so equality between a SYM column and a SYM atom fanned out
to one ray_neq_fn call + one bool atom allocation per row.  At 5M
rows on the ClickBench hits.tsv:

  (!= URL nu)  standalone, 5M rows    113 ms  →  17 ms  (~7×)
  (== URL nu)  standalone, 5M rows    100 ms  →  14 ms

That's the per-row work dropping from ≈22 ns/row (call + alloc) to
≈3 ns/row (load + truncate + cmp + store), bottoming out on memory
bandwidth.  Further gains require parallelisation across cores.

Detect the SYM-vec ↔ SYM-atom shape early in atomic_map_binary_op,
read the atom's i64 sym ID once, then run a tight per-width loop
(W8/W16/W32/W64) writing bool output.  For the rare case of an
already-null vec or null atom, fall through to a per-row branch that
preserves the q/k atom-vs-atom rules from cmp.c (`null != x` is true,
`null == null` is true).

Effects on the ClickBench cluster:
* Filter clauses against `nu` in Q11/Q22/Q23/Q25/Q26/Q31/Q32/Q37/Q38
  were paying the slow per-element bool alloc.  The select planner
  pushes filters before grouping so the reduction is partial — Q31
  wins ≈3 ms, Q32 ≈3 ms.  Q22/Q23 are still LIKE-bound (R3).
* The literal-folded `(!= col "")` form already short-circuits and
  is unaffected.

Tests: 2072 / 2073 (1 skipped, 0 failed).  Verifying probe:
bench/bottleneck/R9_filter_chain.rfl.
The global-hash kernel (single hash keyed by `(group_id, value)`) wins
on high group cardinality where per-group setup overhead dominates,
but pays for a 256+ MB hash table on the low-cardinality side: Q9
(3 K groups × 5 M rows) was sizing 16 M slots = 256 MB, blowing the
L3 and cache-missing on every probe.

Pick the path based on `n_groups`:

  n_groups ≤ 50 000  →  per-group-slice (small hashes fit L1/L2)
  n_groups > 50 000  →  global hash (per-group setup dominates the alt)

Empirical numbers on 5 M-row hits.tsv after R6:

  Q9   137 ms  →   38 ms   (3 K groups, 5 M rows)
  Q10   84 ms  →   61 ms   (same shape + 3 more aggregates)
  Q11   53 ms  →   60 ms   (84 groups; flat — already fast)
  Q14  200 ms  →  217 ms   (611 K groups; still on global, untouched)
  Q15   24 ms  →   29 ms   (composite key, 100 K-ish groups)

Q14 stays slow because it sits on the global side of the threshold and
the global kernel still has the cache-miss-per-probe bottleneck.  A
parallel partitioned variant is the next step (R2 follow-up).

Also hoists per-type read dispatch out of the global hash inner loop
(no perf impact alone but simplifies the next change).  Tests:
2072 / 2073 (1 skipped, 0 failed).
The serial global-hash kernel allocates a 32–256 MB hash table for
high-cardinality grouped count(distinct), and every probe is a cold
cache line on the i7-14700 (20 MB L3).  Add a partitioned variant
mirroring the existing whole-table count_distinct shape:

  Pass 1 (cdpg_hist_fn)  — per-worker histogram of hash partitions.
  Pass 2 (cdpg_scat_fn)  — scatter (gid+1, val) pairs into a partitioned
                           buffer using per-(worker, partition) cursors.
  Pass 3 (cdpg_dedup_fn) — per-partition open-addressing dedup; atomic
                           fetch-add into odata[gid] for each new pair.

P=64 partitions on a 28-core box keeps each per-partition dedup hash
inside L2 (≤ ~32 K rows × 16 B per slot ≤ 1 MB) and atomic_fetch_add
spreads writes across the n_groups output array — no measurable
contention at 600 K+ groups.

Gated on n_rows ≥ 200 000 (smaller inputs don't pay the dispatch
overhead).  Falls through to the serial kernel on no-pool / OOM, so
behaviour is preserved on platforms without a worker pool.

Empirically on the ClickBench 5 M-row hits.tsv:

  Standalone kernel cost (Q14 internals)  ~140 ms → ~3.5 ms
                                          (hist 0.2 / scat 0.5 / dedup 2.8)

Q14 query-level total stays at ~200 ms because the bottleneck has
shifted: the count-distinct kernel is no longer dominant; the
row→group_id rebuild in query.c::ray_select_fn (allocating an
n_groups-sized hash and probing every row) now eats the ~190 ms.
That's a separate fix — R2-followup territory — but the kernel work
here is correct and unblocks any caller for whom row_gid is cheap
(future plan: thread row_gid through OP_GROUP rather than recomputing
post-DAG).

Tests: 2072 / 2073 (1 skipped, 0 failed).
The grouped count(distinct) path post-DAG-group rebuilds a row→gid
mapping by hashing each input row's group key against the gk[gi]→gi
hash.  At Q14 scale (937 K filtered rows × 611 K groups → 18 MB hash)
the serial probe loop spent most of its time waiting on cache misses.

The hash is read-only by the time the probe runs (the insert phase
that built it is single-threaded and complete), so each worker can
independently process its row range with no synchronisation.  Add a
file-scope key reader (`key_read_i64`) and probe worker
(`rgid_probe_fn`), dispatch via the existing `ray_pool_get` worker
pool when nrows ≥ 200 K and the pool has ≥ 2 workers.

Effect on Q14:  200 ms → 189 ms.  The remainder of Q14's gap is in
deeper machinery (filter eval interaction with the DAG group +
allocations for idx_buf / grp_cnt / row_gid).  Profiling each phase
of the rebuild confirmed:

  gk_copy        0.4 ms
  hash_insert    5.0 ms   (n_groups inserts into the 18 MB key→gid hash)
  probe          0.7 ms   (was ~10 ms before parallelisation)
  cnt accum      2.8 ms

So the probe was ~10 ms of the 190 ms Q14 budget; this commit removes
it.  The serial fallback remains for nrows < 200 K and pools with
< 2 workers.

Tests: 2072 / 2073 (1 skipped, 0 failed).
… let count(distinct col) ride path A

Two changes that together drop Q14 from 169 ms → 69 ms (5.10× → 1.74×
DuckDB on the 5 M-row hits.tsv with 611 K SearchPhrase groups).

1. Parallel-kernel correctness fix.

The previous count_distinct_per_group_parallel laid hist / cursor as
nw × P arrays indexed by `(worker_id, partition)`.  ray_pool_dispatch
uses dynamic work stealing, so the same morsel can be processed by
worker_id A in the histogram pass and worker_id B in the scatter
pass.  The (worker, partition) cursors were misaligned: hist counted
X rows for `(W=A, p)` but scatter advanced `cursor[W=B, p]`, leaving
uninitialised slots in out_buf that the dedup pass then atomically
counted into odata — Q14 returned counts up to ~10^15 instead of the
correct values.

Switch to per-partition atomic counters: a single `hist[P]` and a
single `cursor[P]`, both updated via __atomic_fetch_add.  Each worker
walks its assigned row range, builds local per-partition counts (no
contention), then pushes the deltas to shared hist with one atomic
per non-zero partition.  Scatter atomically advances cursor[partition]
to claim a write position.  P=64, ~14 K rows/partition, contention
is negligible.

After the fix: Q14 returns the expected top counts (2118 / 1588 /
1382 / …) matching DuckDB exactly.

2. Path-A enablement for count(distinct col_ref).

Refine the WHERE-handling pre-scan so that `(count (distinct col))`
non-aggs no longer trip the path-B materialisation.  The scatter for
that shape doesn't need a flat post-filter table — it reads the
column directly via ray_count_distinct_per_group and skips rows where
row_gid[r] < 0.

When path A is taken, retain g->selection across the eventual
graph_free, then in the n_nonaggs scatter walk the morsel-segmented
rowsel and mask non-selected rows to row_gid = -1.  This walks the
RAY_SEL_NONE / ALL / MIX flags directly without building a flat
bitmap, ~5 ms for 5 M rows.

Net effect: Q14 skips materialising filtered_tbl (937 K rows × 105
columns ≈ 750 MB copy that was eating most of the query budget) and
the count_distinct kernel now correctly produces the same answer the
materialised path used to produce.

Tests: 2072 / 2073 (1 skipped, 0 failed).  Verifying probes:
* /tmp/cdpg_220.rfl — 220K-row distinct test → max(u)=1 (correct)
* Q14 vs duckdb-fetch — top counts match exactly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant