diff --git a/include/rayforce.h b/include/rayforce.h index 3152dbe1..5ee643e3 100644 --- a/include/rayforce.h +++ b/include/rayforce.h @@ -359,6 +359,14 @@ int64_t ray_sym_intern(const char* str, size_t len); int64_t ray_sym_find(const char* str, size_t len); ray_t* ray_sym_str(int64_t id); uint32_t ray_sym_count(void); + +/* Borrow a snapshot of the sym → string array. Returns a pointer to + * the underlying ray_t** strings table along with its length; valid + * only while no concurrent ray_sym_intern occurs (i.e. read-only + * execution phases). Lock is taken once for the snapshot and dropped + * before return — caller may iterate freely. Both *out_strings and + * *out_count must be non-NULL. */ +void ray_sym_strings_borrow(ray_t*** out_strings, uint32_t* out_count); bool ray_sym_ensure_cap(uint32_t needed); ray_err_t ray_sym_save(const char* path); ray_err_t ray_sym_load(const char* path); diff --git a/src/lang/eval.c b/src/lang/eval.c index 9046dd66..f5221a62 100644 --- a/src/lang/eval.c +++ b/src/lang/eval.c @@ -875,10 +875,6 @@ ray_t* gather_by_idx(ray_t* vec, int64_t* idx, int64_t n) { case 1: for (int64_t i = 0; i < n; i++) dst[i] = src[idx[i]]; break; default: for (int64_t i = 0; i < n; i++) memcpy(dst + i*esz, src + idx[i]*esz, esz); break; } - if (vec->sym_dict) { - ray_retain(vec->sym_dict); - result->sym_dict = vec->sym_dict; - } if (has_nulls) { for (int64_t i = 0; i < n; i++) if (ray_vec_is_null(vec, idx[i])) @@ -2280,7 +2276,12 @@ static void ray_register_builtins(void) { register_vary("update", RAY_FN_SPECIAL_FORM | RAY_FN_RESTRICTED, ray_update_fn); register_vary("insert", RAY_FN_SPECIAL_FORM | RAY_FN_RESTRICTED, ray_insert_fn); register_vary("upsert", RAY_FN_SPECIAL_FORM | RAY_FN_RESTRICTED, ray_upsert_fn); - register_binary("xbar", RAY_FN_ATOMIC, ray_xbar_fn); + /* xbar is registered NON-atomic so the call path lands in + * ray_xbar_fn(VEC, scalar) directly. ray_xbar_fn handles the + * vector fast path itself (tight per-element loop, no per-atom + * allocation) and recurses through atomic_map_binary for the rare + * (collection, collection) zip case. */ + register_binary("xbar", RAY_FN_NONE, ray_xbar_fn); /* Join operations */ register_vary("left-join", RAY_FN_NONE, ray_left_join_fn); @@ -2294,6 +2295,8 @@ static void ray_register_builtins(void) { register_vary("println", RAY_FN_NONE, ray_println_fn); register_vary("show", RAY_FN_NONE, ray_show_fn); register_vary("format", RAY_FN_NONE, ray_format_fn); + register_vary("read-csv", RAY_FN_RESTRICTED, ray_read_csv_fn); + register_vary("write-csv", RAY_FN_RESTRICTED, ray_write_csv_fn); register_vary(".csv.read", RAY_FN_RESTRICTED, ray_read_csv_fn); register_vary(".csv.write", RAY_FN_RESTRICTED, ray_write_csv_fn); register_binary("as", RAY_FN_NONE, ray_cast_fn); diff --git a/src/ops/collection.c b/src/ops/collection.c index 64ce4632..1a5079ad 100644 --- a/src/ops/collection.c +++ b/src/ops/collection.c @@ -1554,6 +1554,39 @@ ray_t* ray_at_fn(ray_t* vec, ray_t* idx) { return ray_dict_new(keys, vals); } + /* Table row selection by index vector: apply the row ids to each + * column and return a table. Keep this before the generic collection + * fallback; otherwise a table indexed by millions of row ids becomes + * a LIST of row dictionaries. */ + if (vec->type == RAY_TABLE && idx->type == RAY_I64) { + int64_t nrows = ray_table_nrows(vec); + int64_t nidx = ray_len(idx); + int64_t* ids = (int64_t*)ray_data(idx); + for (int64_t i = 0; i < nidx; i++) { + if (ids[i] < 0 || ids[i] >= nrows) + return ray_error("domain", NULL); + } + + int64_t ncols = ray_table_ncols(vec); + ray_t* result = ray_table_new(ncols); + if (!result || RAY_IS_ERR(result)) return result ? result : ray_error("oom", NULL); + for (int64_t c = 0; c < ncols; c++) { + ray_t* col = ray_table_get_col_idx(vec, c); + int64_t name = ray_table_col_name(vec, c); + if (!col) continue; + ray_t* gathered = gather_by_idx(col, ids, nidx); + if (!gathered || RAY_IS_ERR(gathered)) { + ray_release(result); + return gathered ? gathered : ray_error("oom", NULL); + } + result = ray_table_add_col(result, name, gathered); + ray_release(gathered); + if (!result || RAY_IS_ERR(result)) + return result ? result : ray_error("oom", NULL); + } + return result; + } + /* Dict key access: (at dict key) → value or 0Nl if missing */ if (vec->type == RAY_DICT) { ray_t* v = ray_dict_get(vec, idx); diff --git a/src/ops/glob.c b/src/ops/glob.c index dea37d1e..bef85daf 100644 --- a/src/ops/glob.c +++ b/src/ops/glob.c @@ -13,6 +13,9 @@ #include "ops/glob.h" +#define _GNU_SOURCE +#include + /* Lowercase an ASCII byte; non-ASCII passes through unchanged. */ static inline char to_lower(char c) { return (c >= 'A' && c <= 'Z') ? (char)(c + 32) : c; @@ -100,3 +103,96 @@ bool ray_glob_match(const char* s, size_t sn, const char* p, size_t pn) { bool ray_glob_match_ci(const char* s, size_t sn, const char* p, size_t pn) { return glob_impl(s, sn, p, pn, true); } + +ray_glob_compiled_t ray_glob_compile(const char* p, size_t pn) { + ray_glob_compiled_t c = { RAY_GLOB_SHAPE_NONE, NULL, 0 }; + + if (pn == 0) { + c.shape = RAY_GLOB_SHAPE_EXACT; + c.lit = p; c.lit_len = 0; + return c; + } + + /* Strip a single leading and trailing '*'; classify by the residual + * pattern. Any other glob metachar (`?`, `[`, or interior `*`) + * forces the general matcher. */ + size_t lo = 0, hi = pn; + bool leading_star = (p[0] == '*'); + bool trailing_star = (pn > 0 && p[pn - 1] == '*' && + /* don't double-count single '*' as both */ + (pn > 1 || !leading_star)); + if (leading_star) lo = 1; + if (trailing_star) hi = pn - 1; + + /* Ensure the residual has no glob metacharacters. */ + for (size_t i = lo; i < hi; i++) { + char ch = p[i]; + if (ch == '*' || ch == '?' || ch == '[') { + c.shape = RAY_GLOB_SHAPE_NONE; + return c; + } + } + + c.lit = p + lo; + c.lit_len = hi - lo; + + if (leading_star && trailing_star) { + c.shape = (c.lit_len == 0) ? RAY_GLOB_SHAPE_ANY + : RAY_GLOB_SHAPE_CONTAINS; + } else if (leading_star) { + c.shape = RAY_GLOB_SHAPE_SUFFIX; + } else if (trailing_star) { + c.shape = RAY_GLOB_SHAPE_PREFIX; + } else { + c.shape = RAY_GLOB_SHAPE_EXACT; + } + return c; +} + +bool ray_glob_match_compiled(const ray_glob_compiled_t* c, + const char* s, size_t sn) { + switch (c->shape) { + case RAY_GLOB_SHAPE_ANY: + return true; + case RAY_GLOB_SHAPE_EXACT: + return sn == c->lit_len && + (c->lit_len == 0 || memcmp(s, c->lit, c->lit_len) == 0); + case RAY_GLOB_SHAPE_PREFIX: + return sn >= c->lit_len && + (c->lit_len == 0 || memcmp(s, c->lit, c->lit_len) == 0); + case RAY_GLOB_SHAPE_SUFFIX: + return sn >= c->lit_len && + (c->lit_len == 0 || + memcmp(s + sn - c->lit_len, c->lit, c->lit_len) == 0); + case RAY_GLOB_SHAPE_CONTAINS: + if (c->lit_len == 0) return true; + if (sn < c->lit_len) return false; + /* glibc's memmem is SIMD-accelerated; use it where available. + * Falls back to a portable Boyer-Moore-Horspool when not. */ +#if defined(__GLIBC__) || defined(__APPLE__) || defined(__FreeBSD__) + return memmem(s, sn, c->lit, c->lit_len) != NULL; +#else + { + /* Portable fallback: short-needle byte scan with memchr. */ + const char first = c->lit[0]; + const char* haystack = s; + size_t remaining = sn; + while (remaining >= c->lit_len) { + const char* hit = (const char*)memchr(haystack, first, + remaining - c->lit_len + 1); + if (!hit) return false; + if (memcmp(hit, c->lit, c->lit_len) == 0) return true; + size_t adv = (size_t)(hit - haystack) + 1; + haystack = hit + 1; + remaining -= adv; + } + return false; + } +#endif + case RAY_GLOB_SHAPE_NONE: + default: + /* Caller contract violation — fall through to false rather than + * silently matching everything. */ + return false; + } +} diff --git a/src/ops/glob.h b/src/ops/glob.h index 71bc3a22..8b8552eb 100644 --- a/src/ops/glob.h +++ b/src/ops/glob.h @@ -40,4 +40,47 @@ bool ray_glob_match(const char* s, size_t sn, const char* p, size_t pn); bool ray_glob_match_ci(const char* s, size_t sn, const char* p, size_t pn); +/* ---- Pre-compiled pattern fast path ------------------------------------- + * Many LIKE workloads have very simple patterns (e.g. `*google*`). When + * the pattern has no metacharacters except (optionally) a leading `*` + * and/or a trailing `*`, the match collapses to a literal substring / + * prefix / suffix / equality test that we can drive with memcmp / + * memmem — both libc-vectorised on modern glibc. Detect the shape once + * up front, then run the entire dictionary (or row vector) through a + * single tight loop. + * + * Shapes: + * RAY_GLOB_SHAPE_NONE — pattern needs the full glob matcher + * RAY_GLOB_SHAPE_EXACT — no `*`/`?`/`[` — literal equality + * RAY_GLOB_SHAPE_PREFIX — `*` — strncmp prefix + * RAY_GLOB_SHAPE_SUFFIX — `*` — tail equality + * RAY_GLOB_SHAPE_CONTAINS — `**` — memmem + * RAY_GLOB_SHAPE_ANY — pattern is "*" — always true + * The compiled struct caches a pointer/length into the original + * pattern buffer, so the caller must keep the pattern alive while the + * compiled view is in use. */ +typedef enum { + RAY_GLOB_SHAPE_NONE = 0, + RAY_GLOB_SHAPE_EXACT, + RAY_GLOB_SHAPE_PREFIX, + RAY_GLOB_SHAPE_SUFFIX, + RAY_GLOB_SHAPE_CONTAINS, + RAY_GLOB_SHAPE_ANY, +} ray_glob_shape_t; + +typedef struct { + ray_glob_shape_t shape; + const char* lit; /* literal substring inside the pattern */ + size_t lit_len; +} ray_glob_compiled_t; + +/* Classify a pattern. Returns the simplest matching shape; falls back + * to RAY_GLOB_SHAPE_NONE when the pattern needs the general matcher. */ +ray_glob_compiled_t ray_glob_compile(const char* p, size_t pn); + +/* Match a single string against a compiled simple-shape pattern. + * Caller must guarantee shape != RAY_GLOB_SHAPE_NONE. */ +bool ray_glob_match_compiled(const ray_glob_compiled_t* c, + const char* s, size_t sn); + #endif /* RAY_OPS_GLOB_H */ diff --git a/src/ops/group.c b/src/ops/group.c index 4665155f..705ed991 100644 --- a/src/ops/group.c +++ b/src/ops/group.c @@ -218,7 +218,275 @@ static void reduce_merge(reduce_acc_t* dst, const reduce_acc_t* src, int8_t in_t * and the last worker's last is the global last. */ } -/* Hash-based count distinct for integer/float columns */ +/* Hash mixing constants used by the count-distinct kernel and helpers. */ +#define CD_HASH_K1 0x9E3779B97F4A7C15ULL +#define CD_HASH_K2 0xBF58476D1CE4E5B9ULL + +/* Per-partition hash-distinct. Each worker is given a contiguous slice + * of partition payloads (already grouped by hash high bits) and counts + * distinct values within. Since distinct values are guaranteed to fall + * into the same partition, the global distinct count is the sum of + * per-partition counts. */ +typedef struct { + int64_t* values; /* concatenated partition payloads */ + int64_t* part_off; /* P+1 prefix sums, partition boundaries */ + int64_t* part_count; /* OUT: per-partition distinct count */ +} cd_part_ctx_t; + +static void cd_part_dedup_fn(void* ctx, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + cd_part_ctx_t* x = (cd_part_ctx_t*)ctx; + for (int64_t p = start; p < end; p++) { + int64_t off = x->part_off[p]; + int64_t cnt = x->part_off[p + 1] - off; + if (cnt == 0) { x->part_count[p] = 0; continue; } + + uint64_t cap = (uint64_t)cnt * 2; + if (cap < 32) cap = 32; + uint64_t c = 1; + while (c && c < cap) c <<= 1; + if (!c) { x->part_count[p] = -1; continue; } + cap = c; + uint64_t mask = cap - 1; + + ray_t* set_hdr = NULL; + ray_t* used_hdr = NULL; + int64_t* set = (int64_t*)scratch_alloc (&set_hdr, + (size_t)cap * sizeof(int64_t)); + uint8_t* used = (uint8_t*)scratch_calloc(&used_hdr, + (size_t)cap * sizeof(uint8_t)); + if (!set || !used) { + if (set_hdr) scratch_free(set_hdr); + if (used_hdr) scratch_free(used_hdr); + x->part_count[p] = -1; + continue; + } + + int64_t* base = x->values + off; + int64_t distinct = 0; + for (int64_t i = 0; i < cnt; i++) { + int64_t v = base[i]; + uint64_t h = (uint64_t)v * CD_HASH_K1; + h ^= h >> 33; + uint64_t slot = h & mask; + while (used[slot]) { + if (set[slot] == v) goto cd_next; + slot = (slot + 1) & mask; + } + set[slot] = v; + used[slot] = 1; + distinct++; + cd_next:; + } + scratch_free(set_hdr); + scratch_free(used_hdr); + x->part_count[p] = distinct; + } +} + +/* Width-specialised value extraction for the partition pass. Reading + * row-by-row through read_col_i64 was the dispatch overhead in the + * sequential path; specialising on the column width lets the autovec + * pass tighten the loop. */ +typedef struct { + const void* base; + int64_t* counts; /* P per-partition row counts (per worker) */ + uint32_t p_bits; + uint64_t p_mask; + uint8_t stride_log2; /* log2(elem size) for plain int paths */ + uint8_t is_f64; + int8_t type; + uint8_t attrs; +} cd_count_ctx_t; + +/* Count rows per partition (per worker, into worker-local slot). Two + * passes: this one fills the histograms; the next does the scatter. */ +static void cd_hist_fn(void* ctx, uint32_t worker_id, + int64_t start, int64_t end) { + cd_count_ctx_t* x = (cd_count_ctx_t*)ctx; + int64_t* hist = x->counts + (size_t)worker_id * (x->p_mask + 1); + const void* base = x->base; + int8_t in_type = x->type; + uint8_t in_attrs = x->attrs; + uint64_t p_mask = x->p_mask; + if (x->is_f64) { + const double* d = (const double*)base; + for (int64_t i = start; i < end; i++) { + double fv = d[i]; + if (fv != fv) fv = (double)NAN; + else if (fv == 0.0) fv = 0.0; + int64_t val; + memcpy(&val, &fv, sizeof(int64_t)); + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } else if (in_type == RAY_I64 || in_type == RAY_TIMESTAMP) { + const int64_t* d = (const int64_t*)base; + for (int64_t i = start; i < end; i++) { + int64_t val = d[i]; + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } else if (in_type == RAY_I32 || in_type == RAY_DATE || in_type == RAY_TIME) { + const int32_t* d = (const int32_t*)base; + for (int64_t i = start; i < end; i++) { + int64_t val = d[i]; + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } else if (in_type == RAY_I16) { + const int16_t* d = (const int16_t*)base; + for (int64_t i = start; i < end; i++) { + int64_t val = d[i]; + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } else if (in_type == RAY_BOOL || in_type == RAY_U8) { + const uint8_t* d = (const uint8_t*)base; + for (int64_t i = start; i < end; i++) { + int64_t val = d[i]; + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } else if (in_type == RAY_SYM) { + for (int64_t i = start; i < end; i++) { + int64_t val = read_col_i64(base, i, in_type, in_attrs); + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + hist[p]++; + } + } +} + +typedef struct { + const void* base; + int64_t* out_buf; /* concatenated payloads (output) */ + int64_t* cursor; /* per-worker × P; advances per scatter */ + uint32_t p_bits; + uint64_t p_mask; + uint8_t is_f64; + int8_t type; + uint8_t attrs; +} cd_scatter_ctx_t; + +static void cd_scatter_fn(void* ctx, uint32_t worker_id, + int64_t start, int64_t end) { + cd_scatter_ctx_t* x = (cd_scatter_ctx_t*)ctx; + int64_t* cur = x->cursor + (size_t)worker_id * (x->p_mask + 1); + int64_t* out = x->out_buf; + const void* base = x->base; + int8_t in_type = x->type; + uint8_t in_attrs = x->attrs; + uint64_t p_mask = x->p_mask; + #define SCATTER_BODY(LOAD) \ + for (int64_t i = start; i < end; i++) { \ + int64_t val = (LOAD); \ + uint64_t h = (uint64_t)val * CD_HASH_K1; \ + h ^= h >> 33; \ + uint64_t p = (h ^ (h >> 33)) & p_mask; \ + out[cur[p]++] = val; \ + } + if (x->is_f64) { + const double* d = (const double*)base; + for (int64_t i = start; i < end; i++) { + double fv = d[i]; + if (fv != fv) fv = (double)NAN; + else if (fv == 0.0) fv = 0.0; + int64_t val; + memcpy(&val, &fv, sizeof(int64_t)); + uint64_t h = (uint64_t)val * CD_HASH_K1; + h ^= h >> 33; + uint64_t p = (h ^ (h >> 33)) & p_mask; + out[cur[p]++] = val; + } + } else if (in_type == RAY_I64 || in_type == RAY_TIMESTAMP) { + const int64_t* d = (const int64_t*)base; + SCATTER_BODY(d[i]) + } else if (in_type == RAY_I32 || in_type == RAY_DATE || in_type == RAY_TIME) { + const int32_t* d = (const int32_t*)base; + SCATTER_BODY(d[i]) + } else if (in_type == RAY_I16) { + const int16_t* d = (const int16_t*)base; + SCATTER_BODY(d[i]) + } else if (in_type == RAY_BOOL || in_type == RAY_U8) { + const uint8_t* d = (const uint8_t*)base; + SCATTER_BODY(d[i]) + } else { /* RAY_SYM */ + SCATTER_BODY(read_col_i64(base, i, in_type, in_attrs)) + } + #undef SCATTER_BODY +} + +/* Sequential fallback for small inputs / when the pool isn't available. + * Same algorithm as the original: open-addressing hash set, single pass. */ +static int64_t cd_seq_count(int8_t in_type, uint8_t in_attrs, + const void* base, int64_t len) { + uint64_t cap = (uint64_t)(len < 16 ? 32 : len) * 2; + uint64_t c = 1; + while (c && c < cap) c <<= 1; + if (!c) return -1; + cap = c; + uint64_t mask = cap - 1; + + ray_t* set_hdr = NULL; + ray_t* used_hdr = NULL; + int64_t* set = (int64_t*)scratch_alloc (&set_hdr, (size_t)cap * sizeof(int64_t)); + uint8_t* used = (uint8_t*)scratch_calloc(&used_hdr, (size_t)cap * sizeof(uint8_t)); + if (!set || !used) { + if (set_hdr) scratch_free(set_hdr); + if (used_hdr) scratch_free(used_hdr); + return -1; + } + int64_t count = 0; + for (int64_t i = 0; i < len; i++) { + int64_t val; + if (in_type == RAY_F64) { + double fv = ((const double*)base)[i]; + if (fv != fv) fv = (double)NAN; + else if (fv == 0.0) fv = 0.0; + memcpy(&val, &fv, sizeof(int64_t)); + } else { + val = read_col_i64(base, i, in_type, in_attrs); + } + uint64_t h = (uint64_t)val * CD_HASH_K1; + uint64_t slot = h & mask; + while (used[slot]) { + if (set[slot] == val) goto cd_seq_next; + slot = (slot + 1) & mask; + } + set[slot] = val; + used[slot] = 1; + count++; + cd_seq_next:; + } + scratch_free(set_hdr); + scratch_free(used_hdr); + return count; +} + +/* Hash-based count distinct for integer/float columns. + * + * Strategy: + * - small inputs → sequential single-pass hash set (low overhead). + * - large inputs → radix-partition by hash high bits across the + * worker pool, then dedup each partition in + * parallel. Each partition fits L2, eliminating + * the cache-miss-per-probe pattern of one giant + * global set. Distinct values land in the same + * partition, so the global count is the sum of + * per-partition counts. */ ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input) { (void)g; (void)op; if (!input || RAY_IS_ERR(input)) return input; @@ -228,70 +496,250 @@ ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input) { if (len == 0) return ray_i64(0); - /* Only numeric/ordinal/sym column types are supported */ switch (in_type) { case RAY_BOOL: case RAY_U8: case RAY_I16: case RAY_I32: case RAY_I64: case RAY_F64: case RAY_DATE: case RAY_TIME: case RAY_TIMESTAMP: case RAY_SYM: break; + case RAY_STR: + case RAY_GUID: + case RAY_LIST: { + /* The hash kernel only handles fixed-width scalar types. For + * STR / GUID / LIST the rewrite-aware path is to delegate to + * distinct_vec_eager (which uses the row-aware hashset_t) and + * count its result. Slower than the radix kernel but correct. */ + ray_t* dist = distinct_vec_eager(input); + if (!dist || RAY_IS_ERR(dist)) return dist ? dist : ray_error("oom", NULL); + int64_t cnt = ray_len(dist); + ray_release(dist); + return ray_i64(cnt); + } default: return ray_error("type", NULL); } - /* Use a simple open-addressing hash set for int64 values */ - uint64_t cap = (uint64_t)(len < 16 ? 32 : len) * 2; - /* Round up to power of 2 */ + void* base = ray_data(input); + ray_pool_t* pool = ray_pool_get(); + + /* Small-input fast path: per-row dispatch overhead would dwarf the + * actual work. */ + if (!pool || len < (1 << 16)) { + int64_t cnt = cd_seq_count(in_type, input->attrs, base, len); + if (cnt < 0) return ray_error("oom", NULL); + return ray_i64(cnt); + } + + uint32_t nw = ray_pool_total_workers(pool); + + /* Partition count: a small power of two ≥ nw, capped so per-partition + * sets stay in L2. 16 works well for nw=28; 32 for >32 workers. */ + uint32_t p_bits; + if (nw <= 8) p_bits = 4; /* 16 partitions */ + else if (nw <= 32) p_bits = 5; /* 32 partitions */ + else p_bits = 6; /* 64 partitions */ + uint64_t P = (uint64_t)1 << p_bits; + uint64_t p_mask = P - 1; + + /* Pass 1: per-worker histogram (P × nw int64 cells). */ + ray_t* hist_hdr = NULL; + int64_t* hist = (int64_t*)scratch_calloc(&hist_hdr, + (size_t)P * nw * sizeof(int64_t)); + if (!hist) { + return ray_error("oom", NULL); + } + cd_count_ctx_t hctx = { + .base = base, .counts = hist, + .p_bits = p_bits, .p_mask = p_mask, + .stride_log2 = 0, .is_f64 = (in_type == RAY_F64), + .type = in_type, .attrs = input->attrs, + }; + ray_pool_dispatch(pool, cd_hist_fn, &hctx, len); + + /* Convert per-worker histograms into a global prefix sum. Order: + * partition_0_worker_0, partition_0_worker_1, …, partition_1_worker_0, … + * so each (worker, partition) range is a contiguous slice of out_buf. */ + ray_t* off_hdr = NULL; + int64_t* part_off = (int64_t*)scratch_alloc(&off_hdr, + (size_t)(P + 1) * sizeof(int64_t)); + if (!part_off) { scratch_free(hist_hdr); return ray_error("oom", NULL); } + ray_t* cur_hdr = NULL; + int64_t* cursor = (int64_t*)scratch_alloc(&cur_hdr, + (size_t)P * nw * sizeof(int64_t)); + if (!cursor) { + scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_error("oom", NULL); + } + + int64_t total = 0; + for (uint64_t p = 0; p < P; p++) { + part_off[p] = total; + for (uint32_t w = 0; w < nw; w++) { + cursor[(size_t)w * P + p] = total; + total += hist[(size_t)w * P + p]; + } + } + part_off[P] = total; + + /* Sanity: total must equal len. */ + if (total != len) { + scratch_free(cur_hdr); scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_error("nyi", "count_distinct: histogram mismatch"); + } + + /* Pass 2: scatter values into out_buf. */ + ray_t* buf_hdr = NULL; + int64_t* out_buf = (int64_t*)scratch_alloc(&buf_hdr, + (size_t)len * sizeof(int64_t)); + if (!out_buf) { + scratch_free(cur_hdr); scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_error("oom", NULL); + } + cd_scatter_ctx_t sctx = { + .base = base, .out_buf = out_buf, .cursor = cursor, + .p_bits = p_bits, .p_mask = p_mask, + .is_f64 = (in_type == RAY_F64), + .type = in_type, .attrs = input->attrs, + }; + ray_pool_dispatch(pool, cd_scatter_fn, &sctx, len); + + /* Pass 3: dedup each partition in parallel. Each partition gets one + * task — distinct values land in the same partition, so per-partition + * sums give the global distinct count. */ + ray_t* pcnt_hdr = NULL; + int64_t* part_count = (int64_t*)scratch_alloc(&pcnt_hdr, + (size_t)P * sizeof(int64_t)); + if (!part_count) { + scratch_free(buf_hdr); scratch_free(cur_hdr); + scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_error("oom", NULL); + } + cd_part_ctx_t dctx = { + .values = out_buf, .part_off = part_off, .part_count = part_count, + }; + ray_pool_dispatch_n(pool, cd_part_dedup_fn, &dctx, (uint32_t)P); + + int64_t total_distinct = 0; + for (uint64_t p = 0; p < P; p++) { + if (part_count[p] < 0) { + scratch_free(pcnt_hdr); scratch_free(buf_hdr); scratch_free(cur_hdr); + scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_error("oom", NULL); + } + total_distinct += part_count[p]; + } + + scratch_free(pcnt_hdr); scratch_free(buf_hdr); scratch_free(cur_hdr); + scratch_free(off_hdr); scratch_free(hist_hdr); + return ray_i64(total_distinct); +} + +/* Grouped count(distinct): single global hash keyed by (group_id, value). + * One linear pass over all rows, O(n) total instead of O(per-group setup * + * n_groups). Returns an I64 vector of length n_groups with the per-group + * distinct count. Rows whose row_gid[r] < 0 are skipped. + * + * Supported value types: integers / SYM / TIMESTAMP / DATE / TIME / F64. + * Caller is responsible for verifying the type up-front (it should match + * exec_count_distinct's whitelist) and returning NULL on miss so the + * legacy per-group fallback handles unsupported configs. + * + * Cap selection: 2 * n_rows rounded to power of 2. Worst case all rows + * are distinct pairs → load factor 0.5, no rehash needed. Slot stores + * gid+1 (so 0 means empty) and the int64-encoded value. 64-bit composite + * hash mixes both halves so rare-gid collisions don't cluster. */ +ray_t* ray_count_distinct_per_group(ray_t* src, const int64_t* row_gid, + int64_t n_rows, int64_t n_groups) { + if (!src || RAY_IS_ERR(src) || n_groups < 0) return ray_error("domain", NULL); + int8_t in_type = src->type; + switch (in_type) { + case RAY_BOOL: case RAY_U8: + case RAY_I16: case RAY_I32: case RAY_I64: + case RAY_F64: case RAY_DATE: case RAY_TIME: case RAY_TIMESTAMP: + case RAY_SYM: + break; + default: + return NULL; /* unsupported — caller falls back. */ + } + if (src->len < n_rows) return ray_error("domain", NULL); + + ray_t* out = ray_vec_new(RAY_I64, n_groups); + if (!out || RAY_IS_ERR(out)) return out ? out : ray_error("oom", NULL); + out->len = n_groups; + int64_t* odata = (int64_t*)ray_data(out); + memset(odata, 0, (size_t)n_groups * sizeof(int64_t)); + if (n_rows == 0 || n_groups == 0) return out; + + /* Pick capacity ≥ 2 * n_rows rounded up to power of two. This bounds + * load factor at 0.5 even when every (gid,val) pair is distinct. */ + uint64_t cap = (uint64_t)n_rows * 2; + if (cap < 32) cap = 32; uint64_t c = 1; while (c && c < cap) c <<= 1; - if (!c) return ray_error("oom", NULL); /* overflow: cap too large */ + if (!c) { ray_release(out); return ray_error("oom", NULL); } cap = c; + uint64_t mask = cap - 1; - ray_t* set_hdr; - int64_t* set = (int64_t*)scratch_calloc(&set_hdr, - (size_t)cap * sizeof(int64_t)); - ray_t* used_hdr; - uint8_t* used = (uint8_t*)scratch_calloc(&used_hdr, - (size_t)cap * sizeof(uint8_t)); - if (!set || !used) { - if (set_hdr) scratch_free(set_hdr); - if (used_hdr) scratch_free(used_hdr); + /* Slot layout: parallel arrays of (gid_plus_one, value). gid_plus_one + * == 0 means slot is empty; storing gid+1 lets us skip a separate + * `used` bitmap. Both arrays are scratch_alloc so they go through + * the slab/heap fast path. */ + ray_t* k_hdr = NULL; + ray_t* v_hdr = NULL; + int64_t* slot_gid = (int64_t*)scratch_calloc(&k_hdr, + (size_t)cap * sizeof(int64_t)); + int64_t* slot_val = (int64_t*)scratch_alloc(&v_hdr, + (size_t)cap * sizeof(int64_t)); + if (!slot_gid || !slot_val) { + if (k_hdr) scratch_free(k_hdr); + if (v_hdr) scratch_free(v_hdr); + ray_release(out); return ray_error("oom", NULL); } - int64_t count = 0; - uint64_t mask = cap - 1; - void* base = ray_data(input); + void* base = ray_data(src); + bool has_nulls = (src->attrs & RAY_ATTR_HAS_NULLS) != 0; + const uint8_t* null_bm = has_nulls ? ray_vec_nullmap_bytes(src, NULL, NULL) + : NULL; + + for (int64_t r = 0; r < n_rows; r++) { + int64_t gid = row_gid[r]; + if (gid < 0 || gid >= n_groups) continue; + if (has_nulls && null_bm && ((null_bm[r/8] >> (r%8)) & 1)) continue; - for (int64_t i = 0; i < len; i++) { int64_t val; if (in_type == RAY_F64) { - double fv = ((double*)base)[i]; - /* Normalize: NaN → canonical NaN, -0.0 → +0.0 */ - if (fv != fv) fv = (double)NAN; /* canonical NaN */ - else if (fv == 0.0) fv = 0.0; /* +0.0 */ + double fv = ((double*)base)[r]; + if (fv != fv) fv = (double)NAN; + else if (fv == 0.0) fv = 0.0; memcpy(&val, &fv, sizeof(int64_t)); } else { - val = read_col_i64(base, i, in_type, input->attrs); + val = read_col_i64(base, r, in_type, src->attrs); } - /* Open-addressing linear probe */ + int64_t gid_p1 = gid + 1; + /* Mix gid and val so groups don't form long runs of collisions. */ uint64_t h = (uint64_t)val * 0x9E3779B97F4A7C15ULL; + h ^= (uint64_t)gid_p1 * 0xBF58476D1CE4E5B9ULL; + h ^= h >> 33; + h *= 0xC4CEB9FE1A85EC53ULL; uint64_t slot = h & mask; - while (used[slot]) { - if (set[slot] == val) goto next_val; + for (;;) { + int64_t cur = slot_gid[slot]; + if (cur == 0) { + slot_gid[slot] = gid_p1; + slot_val[slot] = val; + odata[gid]++; + break; + } + if (cur == gid_p1 && slot_val[slot] == val) break; slot = (slot + 1) & mask; } - /* New distinct value */ - set[slot] = val; - used[slot] = 1; - count++; - next_val:; } - scratch_free(set_hdr); - scratch_free(used_hdr); - return ray_i64(count); + scratch_free(k_hdr); + scratch_free(v_hdr); + return out; } ray_t* exec_reduction(ray_graph_t* g, ray_op_t* op, ray_t* input) { @@ -2647,6 +3095,7 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, /* ---- Scalar aggregate fast path (n_keys == 0): flat vector scan ---- */ if (n_keys == 0 && nrows > 0) { uint8_t need_flags = DA_NEED_COUNT; + bool has_first_last = false; for (uint8_t a = 0; a < n_aggs; a++) { uint16_t aop = ext->agg_ops[a]; if (aop == OP_SUM || aop == OP_PROD || aop == OP_AVG || aop == OP_FIRST || aop == OP_LAST) @@ -2655,6 +3104,7 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, { need_flags |= DA_NEED_SUM; need_flags |= DA_NEED_SUMSQ; } else if (aop == OP_MIN) need_flags |= DA_NEED_MIN; else if (aop == OP_MAX) need_flags |= DA_NEED_MAX; + if (aop == OP_FIRST || aop == OP_LAST) has_first_last = true; } void* agg_ptrs[vla_aggs]; @@ -2670,7 +3120,15 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, } ray_pool_t* sc_pool = ray_pool_get(); - uint32_t sc_n = (sc_pool && nrows >= RAY_PARALLEL_THRESHOLD) + /* Pool dispatch is work-stealing: chunks may be processed out of + * row-index order across workers, so the "count[0]==1" sentinel + * scalar_accum_row uses for FIRST (and the always-overwrite for + * LAST) only yields the per-worker first/last, not the global + * one. The merge step then picks worker[0]'s FIRST regardless + * of which range it actually covered. Force serial execution + * when FIRST/LAST is in play; the DA path (which does track + * per-slot row bounds) is still preferred when we have keys. */ + uint32_t sc_n = (sc_pool && nrows >= RAY_PARALLEL_THRESHOLD && !has_first_last) ? ray_pool_total_workers(sc_pool) : 1; ray_t* sc_hdr; diff --git a/src/ops/idiom.c b/src/ops/idiom.c index fc5092d7..c6ca086d 100644 --- a/src/ops/idiom.c +++ b/src/ops/idiom.c @@ -178,10 +178,15 @@ static bool is_ext_root(uint16_t opcode) { opcode == OP_WINDOW || opcode == OP_WINDOW_JOIN || opcode == OP_SELECT; } -static void try_rewrite(ray_graph_t* g, ray_op_t* node) { - if (!node || (node->flags & OP_FLAG_DEAD)) return; - if (is_ext_root(node->opcode)) return; - if (node->opcode >= RAY_IDIOM_OPCODE_CAP) return; +/* Try one rewrite at `node`. Returns the replacement when the rewrite + * fires, else NULL. Caller redirects consumers and marks the old node + * dead — having the helper return the replacement also lets the pass + * track when the *root* was rewritten so the caller's root pointer can + * be bumped to the replacement. */ +static ray_op_t* try_rewrite(ray_graph_t* g, ray_op_t* node) { + if (!node || (node->flags & OP_FLAG_DEAD)) return NULL; + if (is_ext_root(node->opcode)) return NULL; + if (node->opcode >= RAY_IDIOM_OPCODE_CAP) return NULL; int idx = first_idiom[node->opcode]; while (idx >= 0) { @@ -193,16 +198,17 @@ static void try_rewrite(ray_graph_t* g, ray_op_t* node) { /* UINT32_MAX sentinels: no nodes to skip during redirect */ redirect_consumers(g, node->id, repl, UINT32_MAX, UINT32_MAX); node->flags |= OP_FLAG_DEAD; - return; /* first-match-wins */ + return repl; /* first-match-wins */ } } } idx = next_idiom[idx]; } + return NULL; } -void ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { - if (!g || !root || g->node_count == 0) return; +ray_op_t* ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { + if (!g || !root || g->node_count == 0) return root; build_index(); /* Iterative post-order walk: children rewritten before parents so @@ -210,7 +216,7 @@ void ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { pattern — push roots onto stack1, drain into stack2 (reverse), pop stack2 to get post-order. */ uint32_t nc = g->node_count; - if (nc > UINT32_MAX / 4) return; /* overflow guard, mirrors fuse.c */ + if (nc > UINT32_MAX / 4) return root; /* overflow guard, mirrors fuse.c */ uint32_t cap = nc * 2; uint32_t stk1_local[256], stk2_local[256]; @@ -219,7 +225,7 @@ void ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { if (!stk1 || !stk2) { if (stk1 && stk1 != stk1_local) ray_sys_free(stk1); if (stk2 && stk2 != stk2_local) ray_sys_free(stk2); - return; + return root; } /* Visited-bit guard against re-entry on shared subgraphs. */ @@ -228,7 +234,7 @@ void ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { if (!visited) { if (stk1 != stk1_local) ray_sys_free(stk1); if (stk2 != stk2_local) ray_sys_free(stk2); - return; + return root; } memset(visited, 0, nc); @@ -248,13 +254,21 @@ void ray_idiom_pass(ray_graph_t* g, ray_op_t* root) { } } - /* Post-order: pop stk2 from top, call try_rewrite. */ + /* Post-order: pop stk2 from top, call try_rewrite. Track whether + * the root itself was rewritten — caller needs the new pointer to + * avoid executing the dead node. */ + uint32_t root_id = root->id; while (sp2 > 0) { uint32_t nid = stk2[--sp2]; - try_rewrite(g, &g->nodes[nid]); + ray_op_t* repl = try_rewrite(g, &g->nodes[nid]); + if (repl && nid == root_id) { + root = repl; + root_id = repl->id; + } } if (visited != visited_local) ray_sys_free(visited); if (stk1 != stk1_local) ray_sys_free(stk1); if (stk2 != stk2_local) ray_sys_free(stk2); + return root; } diff --git a/src/ops/idiom.h b/src/ops/idiom.h index ba29a9d4..7826b16c 100644 --- a/src/ops/idiom.h +++ b/src/ops/idiom.h @@ -40,6 +40,11 @@ typedef struct { extern const ray_idiom_t ray_idioms[]; extern const int ray_idioms_count; -void ray_idiom_pass(ray_graph_t* g, ray_op_t* root); +/* Returns the (possibly updated) root. When the rewrite replaces the + * root node itself (e.g. count(distinct) → count_distinct on a single- + * statement chain), the caller would otherwise hold a pointer to the + * dead OLD node. Always assign the return value back to the caller's + * root pointer. */ +ray_op_t* ray_idiom_pass(ray_graph_t* g, ray_op_t* root); #endif /* RAY_IDIOM_H */ diff --git a/src/ops/internal.h b/src/ops/internal.h index 328d9be6..7270638e 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -758,6 +758,14 @@ ray_t* exec_window_join(ray_graph_t* g, ray_op_t* op, /* ── group.c ── */ ray_t* exec_reduction(ray_graph_t* g, ray_op_t* op, ray_t* input); ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input); + +/* Single-pass per-group count(distinct). Returns I64 vec of length + * n_groups, or NULL if `src->type` isn't a supported scalar/SYM type + * (caller falls back to per-group exec_count_distinct). Errors are + * returned as RAY_IS_ERR ray_t*. */ +ray_t* ray_count_distinct_per_group(ray_t* src, const int64_t* row_gid, + int64_t n_rows, int64_t n_groups); + ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t group_limit); /* ── collection.c ── */ diff --git a/src/ops/ops.h b/src/ops/ops.h index 90c019b7..82da76ff 100644 --- a/src/ops/ops.h +++ b/src/ops/ops.h @@ -679,6 +679,20 @@ void ray_graph_dump(ray_graph_t* g, ray_op_t* root, void* out); ray_t* ray_sort_indices(ray_t** cols, uint8_t* descs, uint8_t* nulls_first, uint8_t n_cols, int64_t nrows); +/* Top-K bounded-heap path: returns a new K-row table of `tbl` ordered by + * `col` in the requested direction. Returns NULL when the input doesn't + * fit the single-key fast path (unsupported type, K ≥ nrows, etc.) so + * the caller can fall back to a full sort. Skips the full O(n log n) + * sort entirely — selection runs in O(n log K + K log K). */ +ray_t* ray_topk_table(ray_t* tbl, ray_t* col, uint8_t desc, uint8_t nf, + int64_t k); + +/* Multi-key variant of ray_topk_table: bounded-heap selection on n_keys + * sort columns with per-key direction / nulls-first. Same fallback + * contract — returns NULL when the inputs don't fit the fast path. */ +ray_t* ray_topk_table_multi(ray_t* tbl, ray_t** key_cols, uint8_t* descs, + uint8_t* nfs, uint8_t n_keys, int64_t k); + /* ===== Executor API ===== */ ray_t* ray_execute(ray_graph_t* g, ray_op_t* root); diff --git a/src/ops/opt.c b/src/ops/opt.c index c41b967e..61601542 100644 --- a/src/ops/opt.c +++ b/src/ops/opt.c @@ -2024,9 +2024,10 @@ ray_op_t* ray_optimize(ray_graph_t* g, ray_op_t* root) { pass_constant_fold(g, root); ray_profile_tick("constant fold"); - /* Pass 3: Idiom rewrite */ + /* Pass 3: Idiom rewrite (may replace the root, e.g. count(distinct) + * → count_distinct on a single-statement chain). */ ray_profile_span_start("idiom"); - ray_idiom_pass(g, root); + root = ray_idiom_pass(g, root); ray_profile_span_end("idiom"); ray_profile_tick("idiom rewrite"); diff --git a/src/ops/query.c b/src/ops/query.c index 34d01bf4..95d0e414 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -242,7 +242,12 @@ static uint16_t resolve_agg_opcode(int64_t sym_id) { /* Apply sort (asc/desc) and take clauses to a materialized result table. * Used by eval-level paths that bypass the DAG (e.g., LIST/STR group keys). * Builds a temporary DAG for sorting (supports per-column direction flags) - * and applies take via ray_head/ray_tail or ray_take_fn. */ + * and applies take via ray_head/ray_tail or ray_take_fn. + * + * Top-K fast path: when there is exactly one sort key (a single column + * name), an atom take with K << nrows, and the result is a flat table + * with no LIST columns, dispatch to ray_topk_table — bounded-heap + * selection in O(n log K) instead of full sort + gather. */ static ray_t* apply_sort_take(ray_t* result, ray_t** dict_elems, int64_t dict_n, int64_t asc_id, int64_t desc_id, int64_t take_id) { if (!result || RAY_IS_ERR(result)) return result; @@ -257,6 +262,108 @@ static ray_t* apply_sort_take(ray_t* result, ray_t** dict_elems, int64_t dict_n, } if (!has_sort && !take_val_expr) return result; + /* ---- Top-K fast path detection ---- + * Conditions: + * - Exactly ONE asc:/desc: clause naming a SINGLE scalar column. + * - take is an atom in [1, K_MAX], where K_MAX is well under nrows. + * - result has no LIST columns (the topk gather handles LIST too, + * but skip to keep the surface area small until we have LIST + * test fixtures). Most benchmark workloads are LIST-free. + * + * Anything else falls through to the full-sort DAG path below. */ + if (has_sort && take_val_expr && result->type == RAY_TABLE) { + /* Collect ALL sort keys (across asc:/desc: clauses) into a flat + * (sym, dir) list. Single-key takes the radix-encoded fast + * path; multi-key takes the comparator-based bounded heap. */ + enum { TOPK_MAX_KEYS = 16 }; + int64_t key_syms[TOPK_MAX_KEYS]; + uint8_t key_descs[TOPK_MAX_KEYS]; + uint8_t n_keys = 0; + int bad_clause = 0; + for (int64_t i = 0; i + 1 < dict_n; i += 2) { + int64_t kid = dict_elems[i]->i64; + uint8_t is_desc = 0; + if (kid == asc_id) is_desc = 0; + else if (kid == desc_id) is_desc = 1; + else continue; + ray_t* val = dict_elems[i + 1]; + if (!val) { bad_clause = 1; break; } + if (val->type == -RAY_SYM) { + if (n_keys >= TOPK_MAX_KEYS) { bad_clause = 1; break; } + key_syms[n_keys] = val->i64; + key_descs[n_keys] = is_desc; + n_keys++; + } else if (ray_is_vec(val) && val->type == RAY_SYM) { + for (int64_t c = 0; c < val->len; c++) { + if (n_keys >= TOPK_MAX_KEYS) { bad_clause = 1; break; } + key_syms[n_keys] = ray_read_sym(ray_data(val), c, + val->type, val->attrs); + key_descs[n_keys] = is_desc; + n_keys++; + } + if (bad_clause) break; + } else { + /* Computed sort key (expression) — full DAG path handles it. */ + bad_clause = 1; + break; + } + } + if (!bad_clause && n_keys > 0) { + /* Probe the take expression — only atom-K with K > 0 qualifies. */ + ray_t* tv = ray_eval(take_val_expr); + if (tv && !RAY_IS_ERR(tv) && ray_is_atom(tv) && + (tv->type == -RAY_I64 || tv->type == -RAY_I32)) { + int64_t k = (tv->type == -RAY_I64) ? tv->i64 : tv->i32; + ray_release(tv); + int64_t nrows = ray_table_nrows(result); + /* Bound K and the over-cardinality ratio: only useful + * when K is well under nrows. Leave the take=full / + * negative-take cases to the existing path. */ + if (k > 0 && k < nrows && k <= 8192) { + /* Reject LIST columns — full path handles those. */ + int has_list = 0; + int64_t ncols = ray_table_ncols(result); + for (int64_t c = 0; c < ncols; c++) { + ray_t* col = ray_table_get_col_idx(result, c); + if (col && col->type == RAY_LIST) { has_list = 1; break; } + } + if (!has_list) { + ray_t* topk = NULL; + if (n_keys == 1) { + ray_t* sort_col = ray_table_get_col(result, key_syms[0]); + if (sort_col) { + topk = ray_topk_table(result, sort_col, + key_descs[0], key_descs[0] + /*nf=desc by default*/, k); + } + } else { + ray_t* key_cols[TOPK_MAX_KEYS]; + uint8_t nfs[TOPK_MAX_KEYS]; + int ok = 1; + for (uint8_t i = 0; i < n_keys; i++) { + key_cols[i] = ray_table_get_col(result, key_syms[i]); + nfs[i] = key_descs[i]; + if (!key_cols[i]) { ok = 0; break; } + } + if (ok) { + topk = ray_topk_table_multi(result, key_cols, + key_descs, nfs, n_keys, k); + } + } + if (topk && !RAY_IS_ERR(topk)) { + ray_release(result); + return topk; + } + if (topk && RAY_IS_ERR(topk)) ray_release(topk); + /* topk == NULL: unsupported config, fall through. */ + } + } + } else if (tv) { + ray_release(tv); + } + } + } + /* Build temporary DAG on the materialized result */ ray_graph_t* g = ray_graph_new(result); if (!g) return result; @@ -1016,6 +1123,35 @@ static int is_agg_expr(ray_t* expr) { return resolve_agg_opcode(elems[0]->i64) != 0; } +static int expr_contains_call_named(ray_t* expr, const char* name, size_t name_len) { + if (!expr) return 0; + if (expr->type != RAY_LIST) return 0; + ray_t** elems = (ray_t**)ray_data(expr); + int64_t n = ray_len(expr); + if (n <= 0) return 0; + ray_t* head = elems[0]; + if (head && head->type == -RAY_SYM) { + ray_t* s = ray_sym_str(head->i64); + if (s && ray_str_len(s) == name_len && + memcmp(ray_str_ptr(s), name, name_len) == 0) + return 1; + } + for (int64_t i = 0; i < n; i++) + if (expr_contains_call_named(elems[i], name, name_len)) + return 1; + return 0; +} + +/* True when a grouped aggregate expression can be lowered to OP_GROUP. + * `(count (distinct col))` is semantically an aggregate, but `distinct` + * is not a row-aligned DAG input inside GROUP. Route it through the + * per-group eval fallback so `distinct` sees each group's slice. */ +static int is_group_dag_agg_expr(ray_t* expr) { + if (!is_agg_expr(expr)) return 0; + ray_t** elems = (ray_t**)ray_data(expr); + return !expr_contains_call_named(elems[1], "distinct", 8); +} + /* True for `(fn arg ...)` where fn resolves to a RAY_UNARY marked * RAY_FN_AGGR — i.e. a builtin aggregator (sum/avg/min/max/count and * the non-whitelisted med/dev/var/stddev/etc). Used to route these @@ -1034,6 +1170,41 @@ static int is_aggr_unary_call(ray_t* expr) { return (fn_obj->attrs & RAY_FN_AGGR) != 0; } +static int is_streaming_aggr_unary_call(ray_t* expr) { + if (!is_aggr_unary_call(expr)) return 0; + ray_t** elems = (ray_t**)ray_data(expr); + return !expr_contains_call_named(elems[1], "distinct", 8); +} + +/* Detect `(count (distinct ))` exactly — the only shape that + * routes through the OP_COUNT_DISTINCT fast path per group. Returns + * the inner expression on success, NULL otherwise. More complex + * forms like `(count (distinct (+ col 1)))` are accepted; the inner + * expr is full-table-evaluable. Anything where the outer call is + * not a plain `(count …)` or the inner is not a plain `(distinct …)` + * is rejected so the eval fallback handles it. */ +static ray_t* match_count_distinct(ray_t* expr) { + if (!expr || expr->type != RAY_LIST) return NULL; + int64_t n = ray_len(expr); + if (n != 2) return NULL; + ray_t** elems = (ray_t**)ray_data(expr); + if (!elems[0] || elems[0]->type != -RAY_SYM) return NULL; + ray_t* nm = ray_sym_str(elems[0]->i64); + if (!nm || ray_str_len(nm) != 5 || + memcmp(ray_str_ptr(nm), "count", 5) != 0) return NULL; + + ray_t* inner = elems[1]; + if (!inner || inner->type != RAY_LIST) return NULL; + int64_t in_n = ray_len(inner); + if (in_n != 2) return NULL; + ray_t** in_elems = (ray_t**)ray_data(inner); + if (!in_elems[0] || in_elems[0]->type != -RAY_SYM) return NULL; + ray_t* dnm = ray_sym_str(in_elems[0]->i64); + if (!dnm || ray_str_len(dnm) != 8 || + memcmp(ray_str_ptr(dnm), "distinct", 8) != 0) return NULL; + return in_elems[1]; +} + /* Walk expr once, gather unique column-ref symbol ids that resolve to * columns of `tbl`. Dotted refs (`Timestamp.ss`) record the head * segment. Caps at `max_out` entries (16 is plenty for s: clauses); @@ -1358,6 +1529,154 @@ static ray_t* aggr_unary_per_group_buf(ray_t* expr, ray_t* tbl, return agg_vec; } +/* Per-group count(distinct) using the existing OP_COUNT_DISTINCT kernel. + * Mirrors aggr_unary_per_group_buf but slices the source column once per + * group and calls exec_count_distinct directly — bypasses the full + * ray_eval per-group path that re-walks the (count (distinct …)) AST + * for each slice. + * + * `inner_expr` is the operand to `distinct` extracted via + * match_count_distinct (typically a column ref, possibly a dotted-name + * or computed sub-expression). Returns an I64 vector of length + * n_groups with the per-group distinct count. */ +static ray_t* count_distinct_per_group_buf(ray_t* inner_expr, ray_t* tbl, + const int64_t* idx_buf, + const int64_t* offsets, + const int64_t* grp_cnt, + int64_t n_groups) { + /* Resolve the source vector — either a direct column ref (zero copy) + * or a full-table eval of the inner sub-expression. */ + ray_t* src = NULL; + if (inner_expr && inner_expr->type == -RAY_SYM && + (inner_expr->attrs & RAY_ATTR_NAME)) { + src = ray_table_get_col(tbl, inner_expr->i64); + if (src) ray_retain(src); + } + if (!src) { + if (ray_env_push_scope() != RAY_OK) return ray_error("oom", NULL); + expr_bind_table_names(inner_expr, tbl); + src = ray_eval(inner_expr); + ray_env_pop_scope(); + if (!src || RAY_IS_ERR(src)) return src ? src : ray_error("domain", NULL); + } + + ray_t* out = ray_vec_new(RAY_I64, n_groups); + if (!out || RAY_IS_ERR(out)) { + ray_release(src); + return out ? out : ray_error("oom", NULL); + } + out->len = n_groups; + int64_t* odata = (int64_t*)ray_data(out); + + for (int64_t gi = 0; gi < n_groups; gi++) { + int64_t cnt = grp_cnt[gi]; + if (cnt == 0) { odata[gi] = 0; continue; } + /* gather_by_idx preserves the source's typed layout (I64 stays + * I64, SYM stays SYM with adaptive width, etc.) — exactly what + * exec_count_distinct expects. ray_at_fn would coerce numeric + * vec + numeric idx vec into a RAY_LIST of atoms, breaking the + * type-dispatch in exec_count_distinct. */ + ray_t* subset = gather_by_idx(src, + (int64_t*)&idx_buf[offsets[gi]], cnt); + if (!subset || RAY_IS_ERR(subset)) { + ray_t* err = subset ? subset : ray_error("oom", NULL); + ray_release(src); ray_release(out); + return err; + } + ray_t* cv = exec_count_distinct(NULL, NULL, subset); + ray_release(subset); + if (!cv || RAY_IS_ERR(cv)) { + ray_t* err = cv ? cv : ray_error("oom", NULL); + ray_release(src); ray_release(out); + return err; + } + /* exec_count_distinct returns an i64 atom. */ + odata[gi] = (cv->type == -RAY_I64) ? cv->i64 + : (cv->type == -RAY_I32) ? (int64_t)cv->i32 : 0; + ray_release(cv); + } + + ray_release(src); + return out; +} + +/* Variant for the LIST-`groups` layout used by the eval-fallback + * (ray_group_fn output is a 2-list of {key, idx_list} pairs). Slices + * via ray_at_fn the same way and dispatches to exec_count_distinct. */ +static ray_t* count_distinct_per_group_groups(ray_t* inner_expr, ray_t* tbl, + ray_t* groups, int64_t n_groups) { + ray_t* src = NULL; + if (inner_expr && inner_expr->type == -RAY_SYM && + (inner_expr->attrs & RAY_ATTR_NAME)) { + src = ray_table_get_col(tbl, inner_expr->i64); + if (src) ray_retain(src); + } + if (!src) { + if (ray_env_push_scope() != RAY_OK) return ray_error("oom", NULL); + expr_bind_table_names(inner_expr, tbl); + src = ray_eval(inner_expr); + ray_env_pop_scope(); + if (!src || RAY_IS_ERR(src)) return src ? src : ray_error("domain", NULL); + } + + ray_t* out = ray_vec_new(RAY_I64, n_groups); + if (!out || RAY_IS_ERR(out)) { ray_release(src); return out ? out : ray_error("oom", NULL); } + out->len = n_groups; + int64_t* odata = (int64_t*)ray_data(out); + + ray_t** items = (ray_t**)ray_data(groups); + for (int64_t gi = 0; gi < n_groups; gi++) { + ray_t* idx_list = items[gi * 2 + 1]; + if (!idx_list) { odata[gi] = 0; continue; } + int64_t cnt = ray_len(idx_list); + if (cnt == 0) { odata[gi] = 0; continue; } + + /* idx_list from ray_group_fn is an I64 vector — gather_by_idx + * needs a raw int64_t* + count, so resolve the pointer either + * directly (typed I64 vec) or by walking the LIST cells. */ + ray_t* subset = NULL; + ray_t* tmp_hdr = NULL; + if (idx_list->type == RAY_I64) { + subset = gather_by_idx(src, (int64_t*)ray_data(idx_list), cnt); + } else { + /* Fallback: copy indices into a scratch buffer. Rare path — + * shouldn't trigger for well-formed ray_group_fn output. */ + int64_t* tmp = (int64_t*)scratch_alloc(&tmp_hdr, + (size_t)cnt * sizeof(int64_t)); + if (!tmp) { + ray_release(src); ray_release(out); + return ray_error("oom", NULL); + } + for (int64_t k = 0; k < cnt; k++) { + int alloc = 0; + ray_t* e = collection_elem(idx_list, k, &alloc); + tmp[k] = e ? as_i64(e) : 0; + if (alloc && e) ray_release(e); + } + subset = gather_by_idx(src, tmp, cnt); + scratch_free(tmp_hdr); + } + if (!subset || RAY_IS_ERR(subset)) { + ray_t* err = subset ? subset : ray_error("oom", NULL); + ray_release(src); ray_release(out); + return err; + } + ray_t* cv = exec_count_distinct(NULL, NULL, subset); + ray_release(subset); + if (!cv || RAY_IS_ERR(cv)) { + ray_t* err = cv ? cv : ray_error("oom", NULL); + ray_release(src); ray_release(out); + return err; + } + odata[gi] = (cv->type == -RAY_I64) ? cv->i64 + : (cv->type == -RAY_I32) ? (int64_t)cv->i32 : 0; + ray_release(cv); + } + + ray_release(src); + return out; +} + /* Forward declarations for eval-level groupby fallback */ /* (select {from: t [where: pred] [by: key] [col: expr ...]}) @@ -1854,7 +2173,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { int64_t kid = dict_elems[i]->i64; if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id) continue; - if (!is_agg_expr(dict_elems[i + 1])) { any_nonagg = 1; break; } + if (!is_group_dag_agg_expr(dict_elems[i + 1])) { any_nonagg = 1; break; } } } @@ -1885,7 +2204,21 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { * DAG path (exec_group handles wide keys correctly * and stays parallel / segment-streamed on parted * tables). */ + use_eval_group = 1; + } + } + if (!use_eval_group && by_expr->type == RAY_SYM && ray_len(by_expr) > 1) { + int64_t nk = ray_len(by_expr); + int64_t* sym_ids = (int64_t*)ray_data(by_expr); + for (int64_t k = 0; k < nk; k++) { + ray_t* key_col = ray_table_get_col(tbl, sym_ids[k]); + if (!key_col) continue; + int8_t kct = key_col->type; + if (RAY_IS_PARTED(kct)) kct = (int8_t)RAY_PARTED_BASETYPE(kct); + if (kct == RAY_LIST || kct == RAY_STR) { use_eval_group = 1; + break; + } } } /* Non-aggregation expressions (arithmetic, lambda, etc.) are @@ -1924,13 +2257,240 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { } else { ray_graph_free(g); g = NULL; } - /* eval_group path supports only simple scalar / [col] by-forms; - * multi-key and computed keys shouldn't land here. */ - if (by_key_sym < 0) { + if (by_key_sym < 0 && by_expr->type == RAY_SYM && ray_len(by_expr) > 1) { + int64_t nk = ray_len(by_expr); + int64_t* key_syms = (int64_t*)ray_data(by_expr); + int64_t nrows = ray_table_nrows(eval_tbl); + ray_t* key_cols[16]; + if (nk <= 0 || nk > 16) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return ray_error("domain", "eval-level multi-key groupby requires 1..16 keys"); + } + for (int64_t k = 0; k < nk; k++) { + key_cols[k] = ray_table_get_col(eval_tbl, key_syms[k]); + if (!key_cols[k]) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return ray_error("domain", "group key column not found"); + } + } + + ray_t* composite_keys = ray_list_new(nrows); + if (!composite_keys || RAY_IS_ERR(composite_keys)) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return composite_keys ? composite_keys : ray_error("oom", NULL); + } + for (int64_t r = 0; r < nrows; r++) { + ray_t* row_key = ray_list_new(nk); + if (!row_key || RAY_IS_ERR(row_key)) { + ray_release(composite_keys); + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return row_key ? row_key : ray_error("oom", NULL); + } + for (int64_t k = 0; k < nk; k++) { + int alloc = 0; + ray_t* cell = collection_elem(key_cols[k], r, &alloc); + if (!cell || RAY_IS_ERR(cell)) { + ray_release(row_key); + ray_release(composite_keys); + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return cell ? cell : ray_error("domain", NULL); + } + row_key = ray_list_append(row_key, cell); + if (alloc) ray_release(cell); + if (!row_key || RAY_IS_ERR(row_key)) { + ray_release(composite_keys); + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return row_key ? row_key : ray_error("oom", NULL); + } + } + composite_keys = ray_list_append(composite_keys, row_key); + ray_release(row_key); + if (!composite_keys || RAY_IS_ERR(composite_keys)) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return composite_keys ? composite_keys : ray_error("oom", NULL); + } + } + + ray_t* groups_dict = ray_group_fn(composite_keys); + ray_release(composite_keys); + if (!groups_dict || RAY_IS_ERR(groups_dict)) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return groups_dict ? groups_dict : ray_error("domain", NULL); + } + ray_t* groups = groups_to_pair_list(groups_dict); + ray_release(groups_dict); + if (!groups || RAY_IS_ERR(groups)) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return groups ? groups : ray_error("domain", NULL); + } + int64_t n_groups = ray_len(groups) / 2; + + int n_agg_out = 0; + int64_t agg_names[16]; + ray_t* agg_results[16] = {0}; + for (int64_t i = 0; i + 1 < dict_n && n_agg_out < 16; i += 2) { + int64_t kid = dict_elems[i]->i64; + if (kid == from_id || kid == where_id || kid == by_id || + kid == take_id || kid == asc_id || kid == desc_id) continue; + ray_t* val_expr_item = dict_elems[i + 1]; + + /* Per-group count(distinct) — bypass full ray_eval per + * group and dispatch directly to exec_count_distinct on + * each group's slice. Same kernel the standalone + * `(count (distinct col))` fast path uses. */ + ray_t* cd_inner = match_count_distinct(val_expr_item); + if (cd_inner) { + ray_t* per_group = count_distinct_per_group_groups( + cd_inner, eval_tbl, groups, n_groups); + if (!per_group || RAY_IS_ERR(per_group)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return per_group ? per_group : ray_error("domain", NULL); + } + agg_names[n_agg_out] = kid; + agg_results[n_agg_out] = per_group; + n_agg_out++; + continue; + } + + if (is_streaming_aggr_unary_call(val_expr_item)) { + ray_t** agg_elems = (ray_t**)ray_data(val_expr_item); + ray_t* agg_fn_name = agg_elems[0]; + ray_t* agg_col_expr = agg_elems[1]; + ray_t* src_col_val = NULL; + if (agg_col_expr->type == -RAY_SYM && (agg_col_expr->attrs & RAY_ATTR_NAME)) { + src_col_val = ray_table_get_col(eval_tbl, agg_col_expr->i64); + if (src_col_val) ray_retain(src_col_val); + } + if (!src_col_val) { + src_col_val = ray_eval(agg_col_expr); + if (!src_col_val || RAY_IS_ERR(src_col_val)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return src_col_val ? src_col_val : ray_error("domain", NULL); + } + } + + ray_t* agg_vec = NULL; + ray_t** grp_items = (ray_t**)ray_data(groups); + for (int64_t gi = 0; gi < n_groups; gi++) { + ray_t* idx_list = grp_items[gi * 2 + 1]; + ray_t* subset = ray_at_fn(src_col_val, idx_list); + if (!subset || RAY_IS_ERR(subset)) continue; + ray_t* agg_val = NULL; + ray_t* fn_obj = ray_env_get(agg_fn_name->i64); + if (fn_obj && fn_obj->type == RAY_UNARY) { + ray_unary_fn uf = (ray_unary_fn)(uintptr_t)fn_obj->i64; + agg_val = uf(subset); + } + ray_release(subset); + if (!agg_val || RAY_IS_ERR(agg_val)) continue; + if (!agg_vec) { + int8_t vt = -(agg_val->type); + agg_vec = ray_vec_new(vt, n_groups); + if (!agg_vec || RAY_IS_ERR(agg_vec)) { ray_release(agg_val); break; } + agg_vec->len = n_groups; + } + store_typed_elem(agg_vec, gi, agg_val); + ray_release(agg_val); + } + ray_release(src_col_val); + agg_names[n_agg_out] = kid; + agg_results[n_agg_out] = agg_vec; + n_agg_out++; + } else { + ray_t* per_group = nonagg_eval_per_group(val_expr_item, eval_tbl, groups, n_groups); + if (!per_group || RAY_IS_ERR(per_group)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return per_group ? per_group : ray_error("domain", NULL); + } + agg_names[n_agg_out] = kid; + agg_results[n_agg_out] = per_group; + n_agg_out++; + } + } + + ray_t* result = ray_table_new(nk + n_agg_out); + if (!result || RAY_IS_ERR(result)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return result ? result : ray_error("oom", NULL); + } + ray_t** grp_items = (ray_t**)ray_data(groups); + for (int64_t k = 0; k < nk; k++) { + ray_t* src = key_cols[k]; + int8_t kt = src->type; + if (RAY_IS_PARTED(kt)) kt = (int8_t)RAY_PARTED_BASETYPE(kt); + ray_t* key_vec = NULL; + if (kt == RAY_STR) { + key_vec = ray_vec_new(RAY_STR, n_groups); + for (int64_t gi = 0; gi < n_groups && key_vec && !RAY_IS_ERR(key_vec); gi++) { + ray_t* row_key = grp_items[gi * 2]; + ray_t* cell = (row_key && row_key->type == RAY_LIST && k < row_key->len) + ? ((ray_t**)ray_data(row_key))[k] : NULL; + const char* sp = cell ? ray_str_ptr(cell) : ""; + size_t slen = cell ? ray_str_len(cell) : 0; + key_vec = ray_str_vec_append(key_vec, sp ? sp : "", sp ? slen : 0); + } + } else { + key_vec = (kt == RAY_SYM) + ? ray_sym_vec_new(src->attrs & RAY_SYM_W_MASK, n_groups) + : ray_vec_new(kt, n_groups); + if (key_vec && !RAY_IS_ERR(key_vec)) { + key_vec->len = n_groups; + memset(ray_data(key_vec), 0, (size_t)n_groups * ray_sym_elem_size(kt, key_vec->attrs)); + for (int64_t gi = 0; gi < n_groups; gi++) { + ray_t* row_key = grp_items[gi * 2]; + ray_t* cell = (row_key && row_key->type == RAY_LIST && k < row_key->len) + ? ((ray_t**)ray_data(row_key))[k] : NULL; + if (cell) store_typed_elem(key_vec, gi, cell); + } + } + } + if (!key_vec || RAY_IS_ERR(key_vec)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(result); ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return key_vec ? key_vec : ray_error("oom", NULL); + } + result = ray_table_add_col(result, key_syms[k], key_vec); + ray_release(key_vec); + if (RAY_IS_ERR(result)) { + for (int ai = 0; ai < n_agg_out; ai++) if (agg_results[ai]) ray_release(agg_results[ai]); + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return result; + } + } + for (int ai = 0; ai < n_agg_out; ai++) { + if (agg_results[ai]) { + result = ray_table_add_col(result, agg_names[ai], agg_results[ai]); + ray_release(agg_results[ai]); + if (RAY_IS_ERR(result)) { ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); return result; } + } + } + + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); - return ray_error("nyi", "eval-level groupby requires scalar key"); + return apply_sort_take(result, dict_elems, dict_n, asc_id, desc_id, take_id); } + + /* eval_group path supports only simple scalar / [col] by-forms; + * computed keys shouldn't land here. */ + if (by_key_sym < 0) { + if (eval_tbl != tbl) ray_release(eval_tbl); + ray_release(tbl); + return ray_error("nyi", "eval-level groupby requires scalar key"); + } ray_t* key_col = ray_table_get_col(eval_tbl, by_key_sym); /* Fast path: (select {from: t by: k}) with no aggs and @@ -2160,7 +2720,26 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id) continue; ray_t* val_expr_item = dict_elems[i + 1]; - if (is_aggr_unary_call(val_expr_item)) { + /* Per-group count(distinct) — bypass full ray_eval per + * group and dispatch directly to exec_count_distinct. */ + { + ray_t* cd_inner = match_count_distinct(val_expr_item); + if (cd_inner) { + ray_t* per_group = count_distinct_per_group_groups( + cd_inner, eval_tbl, groups, n_groups); + if (!per_group || RAY_IS_ERR(per_group)) { + for (int ai = 0; ai < n_agg_out; ai++) { if (agg_results[ai]) ray_release(agg_results[ai]); } + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return per_group ? per_group : ray_error("domain", NULL); + } + agg_names[n_agg_out] = kid; + agg_results[n_agg_out] = per_group; + n_agg_out++; + continue; + } + } + + if (is_streaming_aggr_unary_call(val_expr_item)) { /* Streaming-style per-group AGG branch. Accepts both * the resolve_agg_opcode whitelist (sum/avg/min/max/...) * and the broader RAY_FN_AGGR + RAY_UNARY set @@ -2215,6 +2794,20 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { agg_results[n_agg_out] = agg_vec; n_agg_out++; } else { + if (is_agg_expr(val_expr_item)) { + ray_t* per_group = nonagg_eval_per_group( + val_expr_item, eval_tbl, groups, n_groups); + if (RAY_IS_ERR(per_group)) { + for (int ai = 0; ai < n_agg_out; ai++) { if (agg_results[ai]) ray_release(agg_results[ai]); } + ray_release(groups); if (eval_tbl != tbl) ray_release(eval_tbl); ray_release(tbl); + return per_group; + } + agg_names[n_agg_out] = kid; + agg_results[n_agg_out] = per_group; + n_agg_out++; + continue; + } + /* Non-aggregation expression: evaluate on full table, * then gather per-group subsets into a LIST column * (non-agg produces list-of-vectors). */ @@ -2450,7 +3043,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { int64_t kid = dict_elems[i]->i64; if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id) continue; - if (!is_agg_expr(dict_elems[i + 1])) { has_nonagg = 1; break; } + if (!is_group_dag_agg_expr(dict_elems[i + 1])) { has_nonagg = 1; break; } } /* The post-DAG scatter needs a flat single-segment table: it @@ -2565,14 +3158,14 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id) continue; ray_t* val_expr = dict_elems[i + 1]; - if (is_agg_expr(val_expr) && n_aggs < 16) { + if (is_group_dag_agg_expr(val_expr) && n_aggs < 16) { ray_t** agg_elems = (ray_t**)ray_data(val_expr); agg_ops[n_aggs] = resolve_agg_opcode(agg_elems[0]->i64); /* Compile the aggregation input (the column reference) */ agg_ins[n_aggs] = compile_expr_dag(g, agg_elems[1]); if (!agg_ins[n_aggs]) { ray_graph_free(g); ray_release(tbl); return ray_error("domain", NULL); } n_aggs++; - } else if (!is_agg_expr(val_expr) && n_nonaggs < 16) { + } else if (!is_group_dag_agg_expr(val_expr) && n_nonaggs < 16) { nonagg_names[n_nonaggs] = kid; nonagg_exprs[n_nonaggs] = val_expr; n_nonaggs++; @@ -3170,27 +3763,97 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { return apply_sort_take(result, dict_elems, dict_n, asc_id, desc_id, take_id); } } else if (n_out > 0) { - /* Projection only (no group by) — select specific columns */ - ray_op_t* col_ops[16]; - uint8_t nc = 0; + /* No `by:` but explicit output expressions. + * + * Two sub-cases: + * (a) All outputs are aggregates → scalar reduction. Route + * through ray_group(n_keys=0) so the result is ONE row, + * not the input row count broadcast. The naive ray_select + * path lowers `(sum c)` to OP_SUM as a column expression; + * OP_SELECT then broadcasts the scalar atom to nrows + * (exec.c: vec->type < 0 → broadcast_scalar), producing + * N copies of the same value. + * (b) At least one non-agg output → keep the existing + * projection (broadcast-as-column), matching q's + * per-row evaluation semantics. + * + * Mixed agg+non-agg without `by:` continues to flow through (b); + * q's semantics there imply LIST/scalar mixing that is out of + * scope for this fix. */ + int has_agg = 0; + int has_nonagg_out = 0; for (int64_t i = 0; i + 1 < dict_n; i += 2) { int64_t kid = dict_elems[i]->i64; - if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id || kid == nearest_id) continue; - if (nc < 16) { - col_ops[nc] = compile_expr_dag(g, dict_elems[i + 1]); - if (!col_ops[nc]) { - /* Nearest-path resources must be freed here too — the - * rerank handle/query buffers are held across the whole - * ray_select_fn body, not just inside the nearest block. */ - if (nearest_handle_owned) ray_release(nearest_handle_owned); - if (nearest_query_owned) ray_sys_free(nearest_query_owned); + if (kid == from_id || kid == where_id || kid == by_id || + kid == take_id || kid == asc_id || kid == desc_id || kid == nearest_id) continue; + if (is_agg_expr(dict_elems[i + 1])) has_agg = 1; + else has_nonagg_out = 1; + } + + if (has_agg && !has_nonagg_out && !nearest_expr) { + /* Scalar reduction. Pre-execute the WHERE filter (already + * wired as ray_filter at the top) so OP_FILTER on the table + * input populates g->selection, which exec_group then + * honours in its n_keys==0 fast path. */ + if (where_expr) { + root = ray_optimize(g, root); + ray_t* fres = exec_node(g, root); + if (!fres || RAY_IS_ERR(fres)) { + if (g->selection) { + ray_release(g->selection); + g->selection = NULL; + } + ray_graph_free(g); ray_release(tbl); + return fres ? fres : ray_error("domain", NULL); + } + ray_release(fres); + } + + uint16_t s_agg_ops[16]; + ray_op_t* s_agg_ins[16]; + uint8_t s_n_aggs = 0; + for (int64_t i = 0; i + 1 < dict_n && s_n_aggs < 16; i += 2) { + int64_t kid = dict_elems[i]->i64; + if (kid == from_id || kid == where_id || kid == by_id || + kid == take_id || kid == asc_id || kid == desc_id || kid == nearest_id) continue; + ray_t* val_expr = dict_elems[i + 1]; + ray_t** agg_elems = (ray_t**)ray_data(val_expr); + s_agg_ops[s_n_aggs] = resolve_agg_opcode(agg_elems[0]->i64); + s_agg_ins[s_n_aggs] = compile_expr_dag(g, agg_elems[1]); + if (!s_agg_ins[s_n_aggs]) { + if (g->selection) { + ray_release(g->selection); + g->selection = NULL; + } ray_graph_free(g); ray_release(tbl); return ray_error("domain", NULL); } - nc++; + s_n_aggs++; } + root = ray_group(g, NULL, 0, s_agg_ops, s_agg_ins, s_n_aggs); + } else { + /* Projection only (no group by) — select specific columns */ + ray_op_t* col_ops[16]; + uint8_t nc = 0; + for (int64_t i = 0; i + 1 < dict_n; i += 2) { + int64_t kid = dict_elems[i]->i64; + if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id || kid == nearest_id) continue; + if (nc < 16) { + col_ops[nc] = compile_expr_dag(g, dict_elems[i + 1]); + if (!col_ops[nc]) { + /* Nearest-path resources must be freed here too — the + * rerank handle/query buffers are held across the whole + * ray_select_fn body, not just inside the nearest block. */ + if (nearest_handle_owned) ray_release(nearest_handle_owned); + if (nearest_query_owned) ray_sys_free(nearest_query_owned); + ray_graph_free(g); ray_release(tbl); + return ray_error("domain", NULL); + } + nc++; + } + } + root = ray_select(g, root, col_ops, nc); } - root = ray_select(g, root, col_ops, nc); } /* Sort: collect asc/desc columns in dict iteration order. @@ -3397,7 +4060,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id) continue; if (n_all_user < 16) all_user_names[n_all_user++] = kid; - if (by_expr && !is_agg_expr(dict_elems[i + 1])) continue; + if (by_expr && !is_group_dag_agg_expr(dict_elems[i + 1])) continue; if (n_agg_user < 16) agg_user_names[n_agg_user++] = kid; } if (by_expr) { @@ -3542,14 +4205,101 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { KEY_READ(gk[gi], grp_key, gkt, gi); /* Build row→group_id map. Rows whose key isn't in the - * surviving group set get row_gid = -1 and are skipped. */ - for (int64_t r = 0; r < nrows; r++) { - int64_t rv; - KEY_READ(rv, orig_key, okt, r); - row_gid[r] = -1; + * surviving group set get row_gid = -1 and are skipped. + * + * For high group cardinality (n_groups large), the naive + * O(nrows * n_groups) double loop dominated runtime — + * 5M * 730K ≈ 4T comparisons. Build a value→gid hash + * instead so each row is one O(1) probe. */ + { + /* Capacity: 2 * n_groups rounded up to power of 2. + * Slot stores gid+1 (0 = empty) and the int64 key. */ + uint64_t cap = (uint64_t)n_groups * 2; + if (cap < 32) cap = 32; + uint64_t c = 1; + while (c && c < cap) c <<= 1; + if (!c) { + ray_free(gk_hdr); ray_free(rg_hdr); ray_free(cnt_hdr); + ray_free(off_hdr); ray_free(pos_hdr); + ray_release(result); ray_release(tbl); + return ray_error("oom", NULL); + } + cap = c; + uint64_t mask = cap - 1; + ray_t* gk_keys_hdr = NULL; + ray_t* gk_idx_hdr = NULL; + int64_t* hk_keys = (int64_t*)scratch_alloc(&gk_keys_hdr, + (size_t)cap * sizeof(int64_t)); + int32_t* hk_gid_p1 = (int32_t*)scratch_calloc(&gk_idx_hdr, + (size_t)cap * sizeof(int32_t)); + if (!hk_keys || !hk_gid_p1) { + if (gk_keys_hdr) scratch_free(gk_keys_hdr); + if (gk_idx_hdr) scratch_free(gk_idx_hdr); + ray_free(gk_hdr); ray_free(rg_hdr); ray_free(cnt_hdr); + ray_free(off_hdr); ray_free(pos_hdr); + ray_release(result); ray_release(tbl); + return ray_error("oom", NULL); + } + + /* If n_groups exceeds the int32 sentinel range we'd + * lose distinct gids — fall back to the int64 store + * (rare: n_groups > ~2.1 B). Otherwise i32+1 fits. */ + int use_i64_gid = (n_groups >= ((int64_t)1 << 31) - 1); + ray_t* gk64_hdr = NULL; + int64_t* hk_gid64 = NULL; + if (use_i64_gid) { + hk_gid64 = (int64_t*)scratch_calloc(&gk64_hdr, + (size_t)cap * sizeof(int64_t)); + if (!hk_gid64) { + scratch_free(gk_keys_hdr); scratch_free(gk_idx_hdr); + ray_free(gk_hdr); ray_free(rg_hdr); ray_free(cnt_hdr); + ray_free(off_hdr); ray_free(pos_hdr); + ray_release(result); ray_release(tbl); + return ray_error("oom", NULL); + } + } + + /* Insert (gk[gi] -> gi) into the hash. */ for (int64_t gi = 0; gi < n_groups; gi++) { - if (rv == gk[gi]) { row_gid[r] = gi; break; } + int64_t k = gk[gi]; + uint64_t h = (uint64_t)k * 0x9E3779B97F4A7C15ULL; + h ^= h >> 33; + uint64_t s = h & mask; + for (;;) { + int64_t cur_p1 = use_i64_gid ? hk_gid64[s] + : (int64_t)hk_gid_p1[s]; + if (cur_p1 == 0) { + if (use_i64_gid) hk_gid64[s] = gi + 1; + else hk_gid_p1[s] = (int32_t)(gi + 1); + hk_keys[s] = k; + break; + } + if (hk_keys[s] == k) break; /* dup gk — keep first */ + s = (s + 1) & mask; + } + } + + /* Probe each row to assign its gid. */ + for (int64_t r = 0; r < nrows; r++) { + int64_t rv; + KEY_READ(rv, orig_key, okt, r); + uint64_t h = (uint64_t)rv * 0x9E3779B97F4A7C15ULL; + h ^= h >> 33; + uint64_t s = h & mask; + int64_t found = -1; + for (;;) { + int64_t cur_p1 = use_i64_gid ? hk_gid64[s] + : (int64_t)hk_gid_p1[s]; + if (cur_p1 == 0) break; + if (hk_keys[s] == rv) { found = cur_p1 - 1; break; } + s = (s + 1) & mask; + } + row_gid[r] = found; } + + scratch_free(gk_keys_hdr); + scratch_free(gk_idx_hdr); + if (gk64_hdr) scratch_free(gk64_hdr); } #undef KEY_READ @@ -3580,6 +4330,45 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { ray_t* scatter_err = NULL; for (uint8_t ni = 0; ni < n_nonaggs && !scatter_err; ni++) { + /* Per-group count(distinct) — dispatch directly to + * exec_count_distinct on each group's slice using + * the same idx_buf+offsets+grp_cnt layout the + * streaming-AGG branch uses. + * + * High-cardinality grouping: try the single-pass + * global-hash kernel first. Falls back to the + * per-group slice path on type miss / error. */ + ray_t* cd_inner = match_count_distinct(nonagg_exprs[ni]); + if (cd_inner) { + ray_t* col = NULL; + /* Resolve the inner column for the global-hash + * fast path. Direct column refs hit the path; + * computed expressions use the per-group fallback. */ + ray_t* src_for_global = NULL; + int src_owned = 0; + if (cd_inner->type == -RAY_SYM && + (cd_inner->attrs & RAY_ATTR_NAME)) { + src_for_global = ray_table_get_col(tbl, cd_inner->i64); + } + if (src_for_global) { + col = ray_count_distinct_per_group( + src_for_global, row_gid, nrows, n_groups); + /* col == NULL → unsupported type, fall through. */ + } + if (src_owned && src_for_global) ray_release(src_for_global); + if (!col) { + col = count_distinct_per_group_buf( + cd_inner, tbl, idx_buf, offsets, grp_cnt, n_groups); + } + if (RAY_IS_ERR(col)) { scatter_err = col; break; } + result = ray_table_add_col(result, nonagg_names[ni], col); + ray_release(col); + if (RAY_IS_ERR(result)) { + scatter_err = result; result = NULL; break; + } + continue; + } + /* Streaming-style fast path for `(aggr_fn col_or_expr)` * where aggr_fn is RAY_FN_AGGR + RAY_UNARY (sum/avg/..., * med/dev/var/stddev/...). Bypasses the full-table eval @@ -3587,7 +4376,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { * group and calling the unary fn directly into a typed * vec. Equivalent perf-class to the streaming AGG path * the eval-fallback uses for the same shapes. */ - if (is_aggr_unary_call(nonagg_exprs[ni])) { + if (is_streaming_aggr_unary_call(nonagg_exprs[ni])) { ray_t* col = aggr_unary_per_group_buf( nonagg_exprs[ni], tbl, idx_buf, offsets, grp_cnt, n_groups); @@ -3600,6 +4389,20 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { continue; } + if (is_agg_expr(nonagg_exprs[ni])) { + ray_t* per_group = nonagg_eval_per_group_buf( + nonagg_exprs[ni], tbl, idx_buf, offsets, grp_cnt, n_groups); + if (RAY_IS_ERR(per_group)) { + scatter_err = per_group; break; + } + result = ray_table_add_col(result, nonagg_names[ni], per_group); + ray_release(per_group); + if (RAY_IS_ERR(result)) { + scatter_err = result; result = NULL; break; + } + continue; + } + if (ray_env_push_scope() != RAY_OK) { scatter_err = ray_error("oom", NULL); break; } @@ -3733,6 +4536,93 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { /* (xbar col bucket) — time/value bucketing: floor(col/bucket)*bucket */ ray_t* ray_xbar_fn(ray_t* col, ray_t* bucket) { + /* Vectorised fast path for `(xbar VEC scalar_int)` on integer or + * temporal columns. The generic atomic_map_binary path was + * allocating one ray_t* atom per row and calling ray_xbar_fn + * recursively — at 5M rows this dominates (≥100 ms). A direct + * tight loop computes floor-div + multiply per element with no + * allocations. When the bucket is a power of two we lower the + * divide further to mask + arithmetic. + * + * Short-circuited only when both bucket and col are well-typed; + * everything else falls through to the recursive + * atomic_map_binary path. */ + if (col && ray_is_vec(col) && bucket && ray_is_atom(bucket) && + (bucket->type == -RAY_I64 || bucket->type == -RAY_I32 || + bucket->type == -RAY_I16) && + (col->type == RAY_I64 || col->type == RAY_I32 || + col->type == RAY_I16 || col->type == RAY_TIMESTAMP || + col->type == RAY_DATE || col->type == RAY_TIME) && + !RAY_ATOM_IS_NULL(bucket)) { + int64_t b = bucket->i64; + if (b == 0) return ray_error("domain", NULL); + int64_t n = col->len; + ray_t* out = ray_vec_new(col->type, n); + if (!out || RAY_IS_ERR(out)) return out ? out : ray_error("oom", NULL); + out->len = n; + + /* Compute (q*b) where q = floor(a/b). C division truncates + * toward zero; for negative dividend we adjust. */ + int8_t out_type = col->type; + if (out_type == RAY_I64 || out_type == RAY_TIMESTAMP) { + const int64_t* in = (const int64_t*)ray_data(col); + int64_t* o = (int64_t*)ray_data(out); + if (b > 0 && (b & (b - 1)) == 0) { + /* Bucket is a power of two on a non-negative-friendly path: + * a/b == a >> log2(b), but still need the floor adjustment + * for negative inputs. Use bitmask: q*b = a & ~(b-1) for + * non-negative `a`. For mixed-sign data this falls back + * to the general path. */ + int64_t mask = ~(b - 1); + for (int64_t i = 0; i < n; i++) { + int64_t a = in[i]; + /* Floor toward -inf for negative a too: a & mask. */ + o[i] = a & mask; + } + } else { + for (int64_t i = 0; i < n; i++) { + int64_t a = in[i]; + int64_t q = a / b; + if ((a ^ b) < 0 && q * b != a) q--; + o[i] = q * b; + } + } + } else if (out_type == RAY_I32 || out_type == RAY_DATE || out_type == RAY_TIME) { + const int32_t* in = (const int32_t*)ray_data(col); + int32_t* o = (int32_t*)ray_data(out); + int32_t b32 = (int32_t)b; + if (b32 > 0 && ((uint32_t)b32 & ((uint32_t)b32 - 1)) == 0) { + int32_t mask = (int32_t)~((uint32_t)b32 - 1); + for (int64_t i = 0; i < n; i++) o[i] = in[i] & mask; + } else { + for (int64_t i = 0; i < n; i++) { + int32_t a = in[i]; + int32_t q = a / b32; + if ((a ^ b32) < 0 && q * b32 != a) q--; + o[i] = q * b32; + } + } + } else { /* RAY_I16 */ + const int16_t* in = (const int16_t*)ray_data(col); + int16_t* o = (int16_t*)ray_data(out); + int16_t b16 = (int16_t)b; + for (int64_t i = 0; i < n; i++) { + int16_t a = in[i]; + int16_t q = a / b16; + if ((a ^ b16) < 0 && q * b16 != a) q--; + o[i] = q * b16; + } + } + + /* Propagate null bitmap if present. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) { + for (int64_t i = 0; i < n; i++) + if (ray_vec_is_null(col, i)) + ray_vec_set_null(out, i, true); + } + return out; + } + /* Recursive unwrap for nested collections (list of vectors) */ if (is_collection(col) || is_collection(bucket)) return atomic_map_binary(ray_xbar_fn, col, bucket); diff --git a/src/ops/sort.c b/src/ops/sort.c index f9a701a1..a5875e27 100644 --- a/src/ops/sort.c +++ b/src/ops/sort.c @@ -3062,6 +3062,295 @@ str_msd_done:; return result; } +static void topk_cmp_sift_down(const sort_cmp_ctx_t* ctx, int64_t* heap, + int64_t n, int64_t root) { + for (;;) { + int64_t worst = root; + int64_t l = 2 * root + 1; + int64_t r = 2 * root + 2; + if (l < n && sort_cmp(ctx, heap[l], heap[worst]) > 0) worst = l; + if (r < n && sort_cmp(ctx, heap[r], heap[worst]) > 0) worst = r; + if (worst == root) break; + int64_t tmp = heap[root]; + heap[root] = heap[worst]; + heap[worst] = tmp; + root = worst; + } +} + +/* Comparator-based top-K: works for any sort key types and any number of + * keys (1..n). Used as the fallback when radix-encoded fast-path is not + * applicable (e.g. SYM, STR, multi-key). O(n log K + K log K). */ +static ray_t* topk_indices_cmp(ray_t** cols, uint8_t* descs, uint8_t* nfs, + uint8_t n_cols, int64_t nrows, int64_t k) { + if (!cols || n_cols == 0 || k <= 0 || nrows <= 0 || k >= nrows) return NULL; + for (uint8_t c = 0; c < n_cols; c++) if (!cols[c]) return NULL; + + ray_t* idx = ray_vec_new(RAY_I64, k); + if (!idx || RAY_IS_ERR(idx)) return idx ? idx : ray_error("oom", NULL); + idx->len = k; + int64_t* heap = (int64_t*)ray_data(idx); + for (int64_t i = 0; i < k; i++) heap[i] = i; + + sort_cmp_ctx_t ctx = { + .vecs = cols, + .desc = descs, + .nulls_first = nfs, + .n_sort = n_cols, + }; + + for (int64_t i = k / 2 - 1; i >= 0; i--) + topk_cmp_sift_down(&ctx, heap, k, i); + + for (int64_t i = k; i < nrows; i++) { + if (sort_cmp(&ctx, i, heap[0]) >= 0) continue; + heap[0] = i; + topk_cmp_sift_down(&ctx, heap, k, 0); + } + + for (int64_t i = 1; i < k; i++) { + int64_t v = heap[i]; + int64_t j = i - 1; + while (j >= 0 && sort_cmp(&ctx, v, heap[j]) < 0) { + heap[j + 1] = heap[j]; + j--; + } + heap[j + 1] = v; + } + + return idx; +} + +static ray_t* topk_indices_cmp_single(ray_t* col, uint8_t desc, uint8_t nf, + int64_t nrows, int64_t k) { + ray_t* cols[1] = { col }; + uint8_t descs[1] = { desc }; + uint8_t nfs[1] = { nf }; + return topk_indices_cmp(cols, descs, nfs, 1, nrows, k); +} + +/* -------------------------------------------------------------------------- + * Top-K bounded-heap selection on a single sort key. + * + * Replaces a full O(n log n) sort + take-K with O(n log K + K log K) when + * K << n. At plan time, the apply_sort_take / projection paths detect + * "single sort key + small atom take" and call this in lieu of OP_SORT + + * OP_HEAD. Multi-key, take-range, or take-K-near-n cases keep the + * existing fused sort+limit path (which is already O(n log n) bounded + * with K-row gather). + * + * Implementation: encode each row's key to a uint64 (same encoding + * radix_encode_fn uses, so smaller key = earlier in ASC order, with DESC + * already pre-flipped). Maintain a max-heap of K (key, original_idx) + * pairs; for each row r > K, if r's encoded key is smaller than the + * heap-top key, replace the top and sift down. After the scan, sort + * the K (key, idx) pairs by key ascending — the result is the top-K + * indices in the user's requested order. + * + * Supported types: I64, I32, I16, U8, BOOL, F64, DATE, TIME, + * TIMESTAMP, plus SYM via a comparator heap. STR/GUID fall through + * to the caller (return NULL → caller uses full sort). Returns NULL + * on any unsupported configuration so the caller's fallback path + * handles it. + * -------------------------------------------------------------------------- */ +static ray_t* topk_indices_single(ray_t* col, uint8_t desc, uint8_t nf, + int64_t nrows, int64_t k) { + if (!col || k <= 0 || nrows <= 0) return NULL; + if (k >= nrows) return NULL; /* full sort is at least as good */ + + int8_t type = col->type; + /* Whitelist of types where radix_encode_fn produces an order-preserving + * uint64 — exactly the cases topk can handle without a comparator. */ + bool ok = (type == RAY_I64 || type == RAY_TIMESTAMP || type == RAY_F64 || + type == RAY_I32 || type == RAY_DATE || type == RAY_TIME || + type == RAY_SYM || type == RAY_I16 || + type == RAY_BOOL || type == RAY_U8); + if (!ok) return NULL; + + if (type == RAY_SYM) + return topk_indices_cmp_single(col, desc, nf, nrows, k); + + /* Encode all rows to a single uint64 key array. */ + ray_t* keys_hdr = NULL; + uint64_t* keys = (uint64_t*)scratch_alloc(&keys_hdr, + (size_t)nrows * sizeof(uint64_t)); + if (!keys) return NULL; + + radix_encode_ctx_t enc = { + .keys = keys, + .indices = NULL, + .data = ray_data(col), + .col = col, + .type = type, + .col_attrs = col->attrs, + .desc = desc != 0, + .nulls_first = nf != 0, + .enum_rank = NULL, + .n_keys = 1, + }; + /* Single-threaded encode is plenty for the heap pass that follows; + * radix_encode_fn handles the type/desc/nulls dispatch correctly. */ + radix_encode_fn(&enc, 0, 0, nrows); + + /* Max-heap of K (key, idx) pairs. Stored in two parallel arrays + * for cache locality on the comparison path. */ + ray_t* hk_hdr = NULL; + ray_t* hi_hdr = NULL; + uint64_t* hk = (uint64_t*)scratch_alloc(&hk_hdr, (size_t)k * sizeof(uint64_t)); + int64_t* hi = (int64_t*)scratch_alloc(&hi_hdr, (size_t)k * sizeof(int64_t)); + if (!hk || !hi) { + if (hk_hdr) scratch_free(hk_hdr); + if (hi_hdr) scratch_free(hi_hdr); + scratch_free(keys_hdr); + return NULL; + } + + /* Seed with the first K rows. */ + for (int64_t i = 0; i < k; i++) { hk[i] = keys[i]; hi[i] = i; } + + /* Heapify (build max-heap on hk[]). */ + for (int64_t i = k / 2 - 1; i >= 0; i--) { + int64_t idx = i; + for (;;) { + int64_t largest = idx; + int64_t l = 2 * idx + 1, r = 2 * idx + 2; + if (l < k && hk[l] > hk[largest]) largest = l; + if (r < k && hk[r] > hk[largest]) largest = r; + if (largest == idx) break; + uint64_t tk = hk[idx]; hk[idx] = hk[largest]; hk[largest] = tk; + int64_t ti = hi[idx]; hi[idx] = hi[largest]; hi[largest] = ti; + idx = largest; + } + } + + /* Scan remaining rows, push when the new key is strictly smaller + * than heap-top. Sift the new root down to restore the max-heap. */ + for (int64_t i = k; i < nrows; i++) { + if (keys[i] >= hk[0]) continue; + hk[0] = keys[i]; + hi[0] = i; + int64_t idx = 0; + for (;;) { + int64_t largest = idx; + int64_t l = 2 * idx + 1, r = 2 * idx + 2; + if (l < k && hk[l] > hk[largest]) largest = l; + if (r < k && hk[r] > hk[largest]) largest = r; + if (largest == idx) break; + uint64_t tk = hk[idx]; hk[idx] = hk[largest]; hk[largest] = tk; + int64_t ti = hi[idx]; hi[idx] = hi[largest]; hi[largest] = ti; + idx = largest; + } + } + + /* The heap contains the K best (smallest key) rows but unsorted. + * Sort by key ascending so the gather order matches a full sort. */ + key_heapsort(hk, hi, k); + + /* Build the result I64 vec of indices. */ + ray_t* result = ray_vec_new(RAY_I64, k); + if (!result || RAY_IS_ERR(result)) { + scratch_free(hk_hdr); scratch_free(hi_hdr); + scratch_free(keys_hdr); + return result ? result : ray_error("oom", NULL); + } + result->len = k; + memcpy(ray_data(result), hi, (size_t)k * sizeof(int64_t)); + + scratch_free(hk_hdr); scratch_free(hi_hdr); + scratch_free(keys_hdr); + return result; +} + +/* Gather K rows of `tbl` at the given indices and return a new table. + * Used by both single-key and multi-key top-K paths. Releases `idx`. */ +static ray_t* topk_gather_rows(ray_t* tbl, ray_t* idx, int64_t k) { + int64_t* idx_data = (int64_t*)ray_data(idx); + int64_t ncols = ray_table_ncols(tbl); + + ray_t* result = ray_table_new(ncols); + if (!result || RAY_IS_ERR(result)) { ray_release(idx); return result; } + for (int64_t c = 0; c < ncols; c++) { + ray_t* src = ray_table_get_col_idx(tbl, c); + int64_t name = ray_table_col_name(tbl, c); + if (!src) continue; + ray_t* dst; + if (src->type == RAY_LIST) { + dst = ray_list_new(k); + if (!dst || RAY_IS_ERR(dst)) { + ray_release(idx); ray_release(result); + return dst ? dst : ray_error("oom", NULL); + } + ray_t** sp = (ray_t**)ray_data(src); + ray_t** dp = (ray_t**)ray_data(dst); + for (int64_t i = 0; i < k; i++) { + dp[i] = sp[idx_data[i]]; + if (dp[i]) ray_retain(dp[i]); + } + dst->len = k; + } else { + dst = gather_by_idx(src, idx_data, k); + if (!dst || RAY_IS_ERR(dst)) { + ray_release(idx); ray_release(result); + return dst ? dst : ray_error("oom", NULL); + } + } + result = ray_table_add_col(result, name, dst); + ray_release(dst); + if (RAY_IS_ERR(result)) { ray_release(idx); return result; } + } + ray_release(idx); + return result; +} + +/* Public top-K gather: returns a new table of `k` rows of `tbl`, sorted by + * `col` in the requested direction. When the inputs don't match the + * single-key fast-path (multi-key, unsupported type, etc.), returns NULL + * so the caller can fall back to the full-sort path. */ +ray_t* ray_topk_table(ray_t* tbl, ray_t* col, uint8_t desc, uint8_t nf, + int64_t k) { + if (!tbl || tbl->type != RAY_TABLE || !col) return NULL; + int64_t nrows = ray_table_nrows(tbl); + if (k <= 0 || nrows <= 0) return NULL; + if (k >= nrows) return NULL; + int64_t ncols = ray_table_ncols(tbl); + for (int64_t c = 0; c < ncols; c++) { + ray_t* src = ray_table_get_col_idx(tbl, c); + if (src && src->type == RAY_LIST) return NULL; + } + + ray_t* idx = topk_indices_single(col, desc, nf, nrows, k); + if (!idx) return NULL; + return topk_gather_rows(tbl, idx, k); +} + +/* Multi-key top-K: comparator-based bounded heap across `n_keys` columns. + * Falls back to a comparator heap (no radix encoding) since multi-key + * radix encoding requires uniform-width packed keys. Returns NULL when + * the inputs aren't supported (n_keys==0, K>=nrows, LIST columns) so the + * caller can fall back to a full sort. Cost is O(n_rows * n_keys * log K + * + K log K) in comparisons — wins decisively when K << n_rows even with + * the per-compare overhead. All key columns must come from the same + * table; row indices are interpreted into each column at the same + * position. */ +ray_t* ray_topk_table_multi(ray_t* tbl, ray_t** key_cols, uint8_t* descs, + uint8_t* nfs, uint8_t n_keys, int64_t k) { + if (!tbl || tbl->type != RAY_TABLE || !key_cols || n_keys == 0) return NULL; + int64_t nrows = ray_table_nrows(tbl); + if (k <= 0 || nrows <= 0 || k >= nrows) return NULL; + int64_t ncols = ray_table_ncols(tbl); + for (int64_t c = 0; c < ncols; c++) { + ray_t* src = ray_table_get_col_idx(tbl, c); + if (src && src->type == RAY_LIST) return NULL; + } + for (uint8_t i = 0; i < n_keys; i++) + if (!key_cols[i] || key_cols[i]->len < nrows) return NULL; + + ray_t* idx = topk_indices_cmp(key_cols, descs, nfs, n_keys, nrows, k); + if (!idx) return NULL; + if (RAY_IS_ERR(idx)) return idx; + return topk_gather_rows(tbl, idx, k); +} + ray_t* ray_sort_indices(ray_t** cols, uint8_t* descs, uint8_t* nulls_first, uint8_t n_cols, int64_t nrows) { return sort_indices_ex(cols, descs, nulls_first, n_cols, nrows, NULL, NULL); @@ -3126,6 +3415,58 @@ ray_t* exec_sort(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t limit) { uint8_t n_sort = ext->sort.n_cols; if (n_sort > 16) return ray_error("nyi", NULL); /* radix_encode_ctx_t limit */ + /* ---- Top-K bounded-heap shortcut ---- + * Triggered by the SORT+HEAD fusion (HEAD passes limit > 0). When + * K is well below nrows (K << n) and every sort key is a direct + * OP_SCAN of a column on `tbl`, run a heap-based partial selection + * in O(n log K + K log K) instead of the full O(n log n) sort. + * Single key → radix-encoded fast path; multi-key → comparator + * heap (still O(n log K) in compares, big win when K << n). + * Falls through to the full sort whenever the topk path returns + * NULL (unsupported type, computed-key sort, etc.). */ + if (limit > 0 && n_sort >= 1 && limit < nrows && limit <= 8192 && + g && g->selection == NULL) { + ray_t* key_cols[16]; + int all_scan = 1; + for (uint8_t k = 0; k < n_sort; k++) { + ray_op_t* key_op = ext->sort.columns[k]; + ray_op_ext_t* key_ext = find_ext(g, key_op->id); + if (key_ext && key_ext->base.opcode == OP_SCAN) { + key_cols[k] = ray_table_get_col(tbl, key_ext->sym); + if (!key_cols[k]) { all_scan = 0; break; } + } else { + all_scan = 0; + break; + } + } + if (all_scan) { + if (n_sort == 1) { + uint8_t desc = ext->sort.desc ? ext->sort.desc[0] : 0; + uint8_t nf = ext->sort.nulls_first + ? ext->sort.nulls_first[0] + : !desc; + ray_t* topk_res = ray_topk_table(tbl, key_cols[0], desc, nf, limit); + if (topk_res && !RAY_IS_ERR(topk_res)) return topk_res; + if (topk_res && RAY_IS_ERR(topk_res)) ray_release(topk_res); + } else { + /* Default nulls-first to !desc per-key when caller + * didn't supply a vector. */ + uint8_t nfs[16]; + for (uint8_t k = 0; k < n_sort; k++) { + uint8_t d = ext->sort.desc ? ext->sort.desc[k] : 0; + nfs[k] = ext->sort.nulls_first + ? ext->sort.nulls_first[k] + : !d; + } + ray_t* topk_res = ray_topk_table_multi(tbl, key_cols, + ext->sort.desc, nfs, n_sort, limit); + if (topk_res && !RAY_IS_ERR(topk_res)) return topk_res; + if (topk_res && RAY_IS_ERR(topk_res)) ray_release(topk_res); + } + /* topk_res == NULL → unsupported config, fall through. */ + } + } + /* Resolve sort key vectors */ ray_t* sort_vecs[n_sort > 0 ? n_sort : 1]; uint8_t sort_owned[n_sort > 0 ? n_sort : 1]; diff --git a/src/ops/string.c b/src/ops/string.c index e9430340..dd013874 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -23,12 +23,44 @@ #include "ops/internal.h" #include "ops/glob.h" +#include "core/pool.h" /* ============================================================================ * OP_LIKE: glob pattern matching on STR / SYM columns. See ops/glob.[ch]. * Syntax: * (any), ? (one char), [abc] / [a-z] / [!abc] (character class). * ============================================================================ */ +/* Pattern-resolve worker for the SYM-LIKE fast path. Runs over a + * range of sym_ids; for each marked-as-seen sid, runs the matcher and + * writes the answer to lut[sid]. Pure read-only on the inputs after + * the seen-mark phase, so workers are independent. */ +typedef struct { + ray_t** sym_strings; + uint8_t* seen; + uint8_t* lut; + const ray_glob_compiled_t* pc; + bool use_simple; + const char* pat_str; + size_t pat_len; +} like_resolve_ctx_t; + +static void like_resolve_fn(void* ctx, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + like_resolve_ctx_t* x = (like_resolve_ctx_t*)ctx; + for (int64_t sid = start; sid < end; sid++) { + if (!x->seen[sid]) continue; + ray_t* str = x->sym_strings[sid]; + if (!str) { x->lut[sid] = 0; continue; } + const char* sp = ray_str_ptr(str); + size_t sl = ray_str_len(str); + x->lut[sid] = (x->use_simple + ? ray_glob_match_compiled(x->pc, sp, sl) + : ray_glob_match(sp, sl, x->pat_str, x->pat_len)) + ? 1 : 0; + } +} + ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { ray_t* input = exec_node(g, op->inputs[0]); ray_t* pat_v = exec_node(g, op->inputs[1]); @@ -39,6 +71,13 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { const char* pat_str = ray_str_ptr(pat_v); size_t pat_len = ray_str_len(pat_v); + /* Pre-compile pattern into the simple-shape form when possible — the + * substring/prefix/suffix branches drive memmem/memcmp directly, + * roughly an order of magnitude faster than the iterative matcher + * for the very common `*literal*` shape. */ + ray_glob_compiled_t pc = ray_glob_compile(pat_str, pat_len); + bool use_simple = pc.shape != RAY_GLOB_SHAPE_NONE; + int64_t len = input->len; ray_t* result = ray_vec_new(RAY_BOOL, len); if (!result || RAY_IS_ERR(result)) { @@ -55,17 +94,125 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { for (int64_t i = 0; i < len; i++) { const char* sp = ray_str_t_ptr(&elems[i], pool); size_t sl = elems[i].len; - dst[i] = ray_glob_match(sp, sl, pat_str, pat_len) ? 1 : 0; + dst[i] = (use_simple + ? ray_glob_match_compiled(&pc, sp, sl) + : ray_glob_match(sp, sl, pat_str, pat_len)) ? 1 : 0; } } else if (RAY_IS_SYM(in_type)) { + /* Dictionary-cached fast path. + * + * Three-phase pipeline: + * (1) seen-mark — single sequential row scan that flips a + * byte in `seen[]` for every referenced sym_id. Cheap; + * just sets a byte per row. + * (2) parallel pattern resolve — partition the dict_n range + * across pool workers; for each sid where seen[sid]==1, + * run the matcher and store the answer in lut[sid]. + * (3) parallel row projection — every row reads lut[sid_i]. + * + * Splitting the resolve from the row scan lets phase (2) drive + * the pattern matcher (memmem on long URL strings) across the + * worker pool. ray_sym_count is the GLOBAL dictionary so for + * a low-card column like BrowserCountry phase (1) keeps the + * resolve work bounded to that column's actual sym_ids. */ const void* base = ray_data(input); - for (int64_t i = 0; i < len; i++) { - int64_t sym_id = ray_read_sym(base, i, in_type, input->attrs); - ray_t* s = ray_sym_str(sym_id); - if (!s) { dst[i] = 0; continue; } - const char* sp = ray_str_ptr(s); - size_t sl = ray_str_len(s); - dst[i] = ray_glob_match(sp, sl, pat_str, pat_len) ? 1 : 0; + ray_t** sym_strings = NULL; + uint32_t dict_n = 0; + ray_sym_strings_borrow(&sym_strings, &dict_n); + ray_t* lut_hdr = NULL; + ray_t* seen_hdr = NULL; + uint8_t* lut = NULL; + uint8_t* seen = NULL; + if (dict_n > 0) { + lut = (uint8_t*)scratch_alloc (&lut_hdr, (size_t)dict_n); + seen = (uint8_t*)scratch_calloc(&seen_hdr, (size_t)dict_n); + } + if (lut && seen) { + int sym_w = (int)(input->attrs & RAY_SYM_W_MASK); + + /* Phase 1: mark used sym_ids. Width-specialised. */ + switch (sym_w) { + case RAY_SYM_W8: { + const uint8_t* d = (const uint8_t*)base; + for (int64_t i = 0; i < len; i++) { + uint64_t sid = d[i]; + if (sid < dict_n) seen[sid] = 1; + } + break; + } + case RAY_SYM_W16: { + const uint16_t* d = (const uint16_t*)base; + for (int64_t i = 0; i < len; i++) { + uint64_t sid = d[i]; + if (sid < dict_n) seen[sid] = 1; + } + break; + } + case RAY_SYM_W32: { + const uint32_t* d = (const uint32_t*)base; + for (int64_t i = 0; i < len; i++) { + uint64_t sid = d[i]; + if (sid < dict_n) seen[sid] = 1; + } + break; + } + case RAY_SYM_W64: + default: { + const int64_t* d = (const int64_t*)base; + for (int64_t i = 0; i < len; i++) { + int64_t sid = d[i]; + if ((uint64_t)sid < dict_n) seen[sid] = 1; + } + break; + } + } + + /* Phase 2: parallel pattern resolve over the dict range. */ + like_resolve_ctx_t rctx = { + .sym_strings = sym_strings, .seen = seen, .lut = lut, + .pc = &pc, .use_simple = use_simple, + .pat_str = pat_str, .pat_len = pat_len, + }; + ray_pool_t* pool = ray_pool_get(); + if (pool && (int64_t)dict_n >= 16384) { + ray_pool_dispatch(pool, like_resolve_fn, &rctx, (int64_t)dict_n); + } else { + like_resolve_fn(&rctx, 0, 0, (int64_t)dict_n); + } + + /* Phase 3: row projection (sequential — already a tight + * gather over a 1-byte LUT). Width-specialised. */ + #define LIKE_ROW_PASS(LOAD) \ + for (int64_t i = 0; i < len; i++) { \ + int64_t sid = (LOAD); \ + dst[i] = ((uint64_t)sid < (uint64_t)dict_n) ? lut[sid] : 0; \ + } + switch (sym_w) { + case RAY_SYM_W8: { const uint8_t* d = base; LIKE_ROW_PASS(d[i]) break; } + case RAY_SYM_W16: { const uint16_t* d = base; LIKE_ROW_PASS(d[i]) break; } + case RAY_SYM_W32: { const uint32_t* d = base; LIKE_ROW_PASS(d[i]) break; } + case RAY_SYM_W64: + default: { const int64_t* d = base; LIKE_ROW_PASS(d[i]) break; } + } + #undef LIKE_ROW_PASS + + scratch_free(lut_hdr); + scratch_free(seen_hdr); + } else { + /* OOM building the LUT: fall back to per-row scan. */ + if (lut_hdr) scratch_free(lut_hdr); + if (seen_hdr) scratch_free(seen_hdr); + for (int64_t i = 0; i < len; i++) { + int64_t sym_id = ray_read_sym(base, i, in_type, input->attrs); + ray_t* s = (sym_strings && (uint64_t)sym_id < (uint64_t)dict_n) + ? sym_strings[sym_id] : NULL; + if (!s) { dst[i] = 0; continue; } + const char* sp = ray_str_ptr(s); + size_t sl = ray_str_len(s); + dst[i] = (use_simple + ? ray_glob_match_compiled(&pc, sp, sl) + : ray_glob_match(sp, sl, pat_str, pat_len)) ? 1 : 0; + } } } else { memset(dst, 0, (size_t)len); @@ -105,12 +252,43 @@ ray_t* exec_ilike(ray_graph_t* g, ray_op_t* op) { dst[i] = ray_glob_match_ci(sp, sl, pat_str, pat_len) ? 1 : 0; } } else if (RAY_IS_SYM(in_type)) { + /* Dictionary-cached fast path — see exec_like. */ const void* base = ray_data(input); - for (int64_t i = 0; i < len; i++) { - int64_t sym_id = ray_read_sym(base, i, in_type, input->attrs); - ray_t* s = ray_sym_str(sym_id); - if (!s) { dst[i] = 0; continue; } - dst[i] = ray_glob_match_ci(ray_str_ptr(s), ray_str_len(s), pat_str, pat_len) ? 1 : 0; + uint32_t dict_n = ray_sym_count(); + ray_t* lut_hdr = NULL; + ray_t* seen_hdr = NULL; + uint8_t* lut = NULL; + uint8_t* seen = NULL; + if (dict_n > 0) { + lut = (uint8_t*)scratch_alloc (&lut_hdr, (size_t)dict_n); + seen = (uint8_t*)scratch_calloc(&seen_hdr, (size_t)dict_n); + } + if (lut && seen) { + for (int64_t i = 0; i < len; i++) { + int64_t sid = ray_read_sym(base, i, in_type, input->attrs); + if ((uint64_t)sid >= (uint64_t)dict_n) { dst[i] = 0; continue; } + if (!seen[sid]) { + ray_t* s = ray_sym_str(sid); + if (!s) { lut[sid] = 0; } + else { + lut[sid] = ray_glob_match_ci(ray_str_ptr(s), ray_str_len(s), + pat_str, pat_len) ? 1 : 0; + } + seen[sid] = 1; + } + dst[i] = lut[sid]; + } + scratch_free(lut_hdr); + scratch_free(seen_hdr); + } else { + if (lut_hdr) scratch_free(lut_hdr); + if (seen_hdr) scratch_free(seen_hdr); + for (int64_t i = 0; i < len; i++) { + int64_t sym_id = ray_read_sym(base, i, in_type, input->attrs); + ray_t* s = ray_sym_str(sym_id); + if (!s) { dst[i] = 0; continue; } + dst[i] = ray_glob_match_ci(ray_str_ptr(s), ray_str_len(s), pat_str, pat_len) ? 1 : 0; + } } } else { memset(dst, 0, (size_t)len); diff --git a/src/ops/strop.c b/src/ops/strop.c index 9744398b..4ff123e9 100644 --- a/src/ops/strop.c +++ b/src/ops/strop.c @@ -22,6 +22,7 @@ */ #include "lang/internal.h" +#include "ops/internal.h" #include "table/sym.h" #include "ops/glob.h" @@ -202,6 +203,13 @@ ray_t* ray_like_fn(ray_t* x, ray_t* pattern) { const char* pat = ray_str_ptr(pattern); size_t pat_len = ray_str_len(pattern); + /* Pre-compile the pattern once. Most ClickBench LIKE shapes are + * `*literal*` (substring) which collapses to a memmem call — the + * libc-provided implementation is SIMD on glibc/Apple/BSD. When the + * shape is RAY_GLOB_SHAPE_NONE we keep the iterative matcher. */ + ray_glob_compiled_t pc = ray_glob_compile(pat, pat_len); + bool use_simple = pc.shape != RAY_GLOB_SHAPE_NONE; + /* Atom: single match */ if (x->type == -RAY_STR || x->type == -RAY_SYM) { const char* s; size_t sl; @@ -214,7 +222,8 @@ ray_t* ray_like_fn(ray_t* x, ray_t* pattern) { s = ray_str_ptr(x); sl = ray_str_len(x); } - bool m = ray_glob_match(s, sl, pat, pat_len); + bool m = use_simple ? ray_glob_match_compiled(&pc, s, sl) + : ray_glob_match(s, sl, pat, pat_len); if (sym_str) ray_release(sym_str); return make_bool(m ? 1 : 0); } @@ -228,20 +237,118 @@ ray_t* ray_like_fn(ray_t* x, ray_t* pattern) { uint8_t* out = (uint8_t*)ray_data(result); if (x->type == RAY_SYM) { - int64_t* sym_ids = (int64_t*)ray_data(x); - for (int64_t i = 0; i < n; i++) { - ray_t* sym_str = ray_sym_str(sym_ids[i]); - const char* s = sym_str ? ray_str_ptr(sym_str) : ""; - size_t sl = sym_str ? ray_str_len(sym_str) : 0; - out[i] = ray_glob_match(s, sl, pat, pat_len) ? 1 : 0; - if (sym_str) ray_release(sym_str); + /* SYM column is dictionary-encoded with adaptive widths + * (W8/W16/W32/W64). Two bugs to avoid: + * (a) Reading the column as int64_t* is wrong for any + * width != W64 — must use ray_read_sym. + * (b) ray_sym_str returns a borrowed pointer; releasing + * it would decrement the global sym table entry. + * + * Fast path: a SYM column with N rows references at most + * D = ray_sym_count() distinct sym_ids. Build a + * sym_id → bool LUT with a "seen" bitmap so each sym_id + * runs the glob matcher at most once. For LIKE on URL + * (1.7M unique values, 5M rows) this turns an O(n_rows) + * pattern-scan into O(n_distinct + n_rows) — the second + * pass is a single byte load + table lookup per row. */ + const void* base = ray_data(x); + int8_t in_type = x->type; + uint8_t in_attrs = x->attrs; + + /* The global sym table can be much larger than the set of + * IDs this column references (e.g. BrowserCountry with 54 + * uniques in a process that's also loaded URL with 1.7M + * uniques). Lazy-resolve via the seen bitmap so we only + * match against sym_ids actually touched. ray_sym_strings_borrow + * snapshots the strings array under one lock so each lookup + * is a plain pointer load. */ + ray_t** sym_strings = NULL; + uint32_t dict_n = 0; + ray_sym_strings_borrow(&sym_strings, &dict_n); + ray_t* lut_hdr = NULL; + ray_t* seen_hdr = NULL; + uint8_t* lut = NULL; + uint8_t* seen = NULL; + if (dict_n > 0) { + lut = (uint8_t*)scratch_alloc (&lut_hdr, (size_t)dict_n); + seen = (uint8_t*)scratch_calloc(&seen_hdr, (size_t)dict_n); + } + if (lut && seen) { + /* First pass: discover the unique sym_ids referenced and + * resolve each pattern match exactly once. Second pass: + * width-specialised LUT projection so the per-row loop + * is a tight gather. */ + int sym_w = (int)(in_attrs & RAY_SYM_W_MASK); + #define DICT_PASS(LOAD) \ + for (int64_t i = 0; i < n; i++) { \ + int64_t sid = (LOAD); \ + if ((uint64_t)sid >= (uint64_t)dict_n) continue; \ + if (!seen[sid]) { \ + ray_t* s = sym_strings[sid]; \ + const char* sp = s ? ray_str_ptr(s) : ""; \ + size_t sl = s ? ray_str_len(s) : 0; \ + lut[sid] = (use_simple \ + ? ray_glob_match_compiled(&pc, sp, sl)\ + : ray_glob_match(sp, sl, pat, pat_len)) \ + ? 1 : 0; \ + seen[sid] = 1; \ + } \ + } + #define ROW_PASS(LOAD) \ + for (int64_t i = 0; i < n; i++) { \ + int64_t sid = (LOAD); \ + out[i] = ((uint64_t)sid < (uint64_t)dict_n) ? lut[sid] : 0; \ + } + switch (sym_w) { + case RAY_SYM_W8: { + const uint8_t* d = (const uint8_t*)base; + DICT_PASS(d[i]) ROW_PASS(d[i]) break; + } + case RAY_SYM_W16: { + const uint16_t* d = (const uint16_t*)base; + DICT_PASS(d[i]) ROW_PASS(d[i]) break; + } + case RAY_SYM_W32: { + const uint32_t* d = (const uint32_t*)base; + DICT_PASS(d[i]) ROW_PASS(d[i]) break; + } + case RAY_SYM_W64: + default: { + const int64_t* d = (const int64_t*)base; + DICT_PASS(d[i]) ROW_PASS(d[i]) break; + } + } + #undef DICT_PASS + #undef ROW_PASS + scratch_free(lut_hdr); + scratch_free(seen_hdr); + } else { + /* OOM building the LUT: fall back to per-row scan. Still + * uses ray_read_sym for adaptive-width correctness. */ + if (lut_hdr) scratch_free(lut_hdr); + if (seen_hdr) scratch_free(seen_hdr); + for (int64_t i = 0; i < n; i++) { + int64_t sid = ray_read_sym(base, i, in_type, in_attrs); + ray_t* s = (sym_strings && (uint64_t)sid < (uint64_t)dict_n) + ? sym_strings[sid] : NULL; + const char* sp = s ? ray_str_ptr(s) : ""; + size_t sl = s ? ray_str_len(s) : 0; + out[i] = (use_simple + ? ray_glob_match_compiled(&pc, sp, sl) + : ray_glob_match(sp, sl, pat, pat_len)) ? 1 : 0; + } } } else { /* RAY_STR vector */ for (int64_t i = 0; i < n; i++) { size_t slen; const char* s = ray_str_vec_get(x, i, &slen); - out[i] = (s && ray_glob_match(s, slen, pat, pat_len)) ? 1 : 0; + bool m = false; + if (s) { + m = use_simple ? ray_glob_match_compiled(&pc, s, slen) + : ray_glob_match(s, slen, pat, pat_len); + } + out[i] = m ? 1 : 0; } } return result; diff --git a/src/table/sym.c b/src/table/sym.c index 02d1e1a3..a788b3cd 100644 --- a/src/table/sym.c +++ b/src/table/sym.c @@ -833,6 +833,32 @@ uint32_t ray_sym_count(void) { return count; } +/* -------------------------------------------------------------------------- + * ray_sym_strings_borrow + * + * Single-shot snapshot of the sym→string table for hot read-only + * scanners (LIKE, dictionary projection, …). ray_sym_str takes a spin + * lock per call; iterating all 1.7M URL dict entries via ray_sym_str + * means 1.7M lock acquisitions. This routine takes the lock once, + * captures the array pointer + length, drops the lock, and lets the + * caller iterate lock-free. + * + * Validity: only safe during read-only phases (no concurrent + * ray_sym_intern). ray_sym_intern can realloc g_sym.strings, after + * which the returned pointer is dangling. Today's pipeline is one + * pass: bulk-intern at CSV load, then run queries against the frozen + * table — exactly the contract this borrow form needs. + * -------------------------------------------------------------------------- */ +void ray_sym_strings_borrow(ray_t*** out_strings, uint32_t* out_count) { + if (out_strings) *out_strings = NULL; + if (out_count) *out_count = 0; + if (!atomic_load_explicit(&g_sym_inited, memory_order_acquire)) return; + sym_lock(); + if (out_strings) *out_strings = g_sym.strings; + if (out_count) *out_count = g_sym.str_count; + sym_unlock(); +} + /* -------------------------------------------------------------------------- * ray_sym_ensure_cap -- pre-grow hash table and strings array * diff --git a/test/rfl/collection/at.rfl b/test/rfl/collection/at.rfl index ae879282..90571ff5 100644 --- a/test/rfl/collection/at.rfl +++ b/test/rfl/collection/at.rfl @@ -6,6 +6,10 @@ ;; vector of indices returns vector of elements (at [10 20 30 40 50] [0 2 4]) -- [10 30 50] +;; table row indices return a table, not a boxed list of row dicts +(type (at (table [a b] (list [1 2 3] [4 5 6])) [0 2])) -- 'TABLE +(at (at (table [a b] (list [1 2 3] [4 5 6])) [0 2]) 'a) -- [1 3] + ;; at 0 == first (set V (rand 50 1000)) (at V 0) -- (first V) diff --git a/test/rfl/integration/cross_type_workout.rfl b/test/rfl/integration/cross_type_workout.rfl index 4a78bd34..562947f4 100644 --- a/test/rfl/integration/cross_type_workout.rfl +++ b/test/rfl/integration/cross_type_workout.rfl @@ -199,6 +199,7 @@ ;; "corrupt" path with SYM that's tracked separately. Use only ;; numeric columns here. (set Tplain (table [id price qty] (list (at T 'id) (at T 'price) (at T 'qty)))) +(.sys.exec "rm -rf /tmp/cross_type_workout_splayed") (.db.splayed.set "/tmp/cross_type_workout_splayed/" Tplain) (set Sp (.db.splayed.get "/tmp/cross_type_workout_splayed/")) (count Sp) -- 200 diff --git a/test/rfl/integration/groupby_aggregators.rfl b/test/rfl/integration/groupby_aggregators.rfl index 6d1cc304..9a1832a6 100644 --- a/test/rfl/integration/groupby_aggregators.rfl +++ b/test/rfl/integration/groupby_aggregators.rfl @@ -59,10 +59,11 @@ (count (select {s: (sum v) from: T by: g where: (< v 500)})) -- 50 ;; ────────────── group-by no `by` clause: aggregate over whole table ────────────── -;; pure aggregations without grouping +;; pure aggregations without grouping → ONE row, not nrows broadcast. (set Whole (select {tot: (sum v) ct: (count v) avg_v: (avg v) from: T})) -(count Whole) -- 1000 +(count Whole) -- 1 (at (at Whole 'tot) 0) -- 499500 +(at (at Whole 'ct) 0) -- 1000 ;; ────────────── group-by SYM key ────────────── (set Tsym (table [k v] (list (take ['A 'B 'C 'D 'E] N) (til N)))) diff --git a/test/rfl/ops/group_coverage.rfl b/test/rfl/ops/group_coverage.rfl index 46131f0a..f823d99c 100644 --- a/test/rfl/ops/group_coverage.rfl +++ b/test/rfl/ops/group_coverage.rfl @@ -413,9 +413,12 @@ ;; ────────────── 43. Scalar agg with all-stat-aggs combination ────────────── ;; Multi-agg pack of var, var_pop, stddev, stddev_pop, sum, count, avg ;; in scalar mode (no by) — exercises full need_flags=SUM+SUMSQ+COUNT. +;; A scalar reduction collapses to ONE row (count v == nrows is the +;; row-count *value*, not the row count of the result table). (set Tall (table [v] (list (til 200)))) -(count (select {s: (sum v) c: (count v) av: (avg v) v: (var v) vp: (var_pop v) sd: (stddev v) sp: (stddev_pop v) from: Tall})) -- 200 +(count (select {s: (sum v) c: (count v) av: (avg v) v: (var v) vp: (var_pop v) sd: (stddev v) sp: (stddev_pop v) from: Tall})) -- 1 (at (at (select {s: (sum v) c: (count v) av: (avg v) v: (var v) vp: (var_pop v) sd: (stddev v) sp: (stddev_pop v) from: Tall}) 's) 0) -- 19900 +(at (at (select {s: (sum v) c: (count v) av: (avg v) v: (var v) vp: (var_pop v) sd: (stddev v) sp: (stddev_pop v) from: Tall}) 'c) 0) -- 200 (at (at (select {c: (count v) v: (var v) sd: (stddev v) from: Tall}) 'c) 0) -- 200 ;; ────────────── 44. Group var/stddev with mixed enough/insufficient ────────────── diff --git a/test/rfl/ops/query_coverage.rfl b/test/rfl/ops/query_coverage.rfl index ac045c2b..bb432960 100644 --- a/test/rfl/ops/query_coverage.rfl +++ b/test/rfl/ops/query_coverage.rfl @@ -149,6 +149,26 @@ (set TStr (table [Name v] (list (list "alpha" "beta" "alpha" "gamma" "beta") [10 20 30 40 50]))) (count (select {from: TStr by: Name})) -- 3 +;; COUNT(DISTINCT col) per group is a real aggregate, but `distinct` +;; must run on each group's slice rather than on the full column before +;; OP_GROUP. Numeric keys take the DAG group-boundary + per-group eval path. +(set TCD (table [g u] (list [1 1 2 2 2] [10 10 20 21 20]))) +(sum (at (select {u: (count (distinct u)) from: TCD by: g}) 'u)) -- 3 + +;; STR keys force the eval-level group fallback; the same count-distinct +;; expression must still be evaluated per group, not broadcast from the +;; whole table. +(set TCDS (table [k u] (list (as 'STR ["a" "a" "b" "b" ""]) [1 2 2 2 3]))) +(sum (at (select {u: (count (distinct u)) from: TCDS by: k}) 'u)) -- 4 + +;; Multi-key group-by with a materialised computed key plus a STR key: +;; by-dict pre-eval rewrites `{m: (...) s: S}` to a SYM-vector key list. +;; The DAG group path can't handle STR keys, so this takes the eval-level +;; composite-key fallback. +(set TG2S (table [ts s u] (list (as 'TIMESTAMP [0 60000000000 60000000000 120000000000]) (as 'STR ["a" "a" "b" "b"]) [10 11 12 13]))) +(count (select {c: (count u) from: TG2S by: {m: (minute ts) s: s}})) -- 4 +(sum (at (select {c: (count u) from: TG2S by: {m: (minute ts) s: s}}) 'c)) -- 4 + ;; ==================================================================== ;; GUID first-of-group fast path — query.c:1945-2099. Pure ;; `(select {from: t by: G})` with no agg/non-agg expressions takes diff --git a/test/rfl/system/read_csv.rfl b/test/rfl/system/read_csv.rfl index 946a745b..0258c233 100644 --- a/test/rfl/system/read_csv.rfl +++ b/test/rfl/system/read_csv.rfl @@ -15,5 +15,6 @@ (.sys.exec "awk 'BEGIN{print \"id,sym\"; for(i=0;i<20000;i++) printf(\"%d,s%d\\n\",i,i)}' > rf_test_syms.csv") -- 0 (count (.csv.read [I64 SYMBOL] "rf_test_syms.csv")) -- 20000 +(count (read-csv [I64 SYMBOL] "rf_test_syms.csv")) -- 20000 (.sys.exec "rm -f rf_test_syms.csv") -- 0 diff --git a/test/rfl/system/reserved_namespace.rfl b/test/rfl/system/reserved_namespace.rfl index 373c3b29..acceef7c 100644 --- a/test/rfl/system/reserved_namespace.rfl +++ b/test/rfl/system/reserved_namespace.rfl @@ -68,6 +68,9 @@ (nil? .ipc.send) -- false (nil? .csv.read) -- false (nil? .csv.write) -- false +;; Python compatibility aliases resolve to the same CSV builtins. +(nil? read-csv) -- false +(nil? write-csv) -- false ;; Old names must NOT resolve — we committed to no backward compat. gc !- name getenv !- name @@ -75,7 +78,6 @@ system !- name sysinfo !- name memstat !- name internals !- name -read-csv !- name ;; Negative: writes to `.*` are refused with `reserve`. (set .os.foo 1) !- reserve (set .sys.gc 0) !- reserve diff --git a/test/rfl/table/select.rfl b/test/rfl/table/select.rfl index 2f849506..e47b48a4 100644 --- a/test/rfl/table/select.rfl +++ b/test/rfl/table/select.rfl @@ -38,6 +38,17 @@ (at (at (select {m: (min size) from: trades where: (> price 200.0)}) 'm) 0) -- 40 (at (at (select {a: (avg size) from: trades where: (== sym 'AAPL)}) 'a) 0) -- 115.0 +;; ── scalar aggregation (no `by:`) collapses to ONE row, NOT N broadcast +;; copies of the same value. Regression test for the projection path +;; that used to compile `(sum c)` as a column expression and broadcast +;; the resulting scalar across the input row count. +(count (select {s: (sum size) from: trades})) -- 1 +(count (select {s: (sum size) from: trades where: (== sym 'AAPL)})) -- 1 +(count (select {s: (sum size) c: (count size) from: trades})) -- 1 +(count (select {a: (avg price) m: (max size) from: trades where: (> price 200)})) -- 1 +(at (at (select {s: (sum size) from: trades}) 's) 0) -- 1240 +(at (at (select {c: (count size) from: trades}) 'c) 0) -- 10 + ;; Larger fixture (>= RAY_PARALLEL_THRESHOLD) to exercise the parallel ;; reduction worker path of exec_reduction. (set big-T (table [v] (list (til 100000)))) diff --git a/test/test_csv.c b/test/test_csv.c index b910954b..a5dedbe2 100644 --- a/test/test_csv.c +++ b/test/test_csv.c @@ -26,6 +26,7 @@ #include #include "mem/heap.h" #include "io/csv.h" +#include "table/sym.h" #include #include @@ -1101,6 +1102,8 @@ static test_result_t test_csv_sym_narrowing(void) { ray_t* col = ray_table_get_col_idx(loaded, 0); TEST_ASSERT_EQ_I(col->type, RAY_SYM); /* Width is encoded in the lower 2 bits of attrs (RAY_SYM_W8 == 0). */ + TEST_ASSERT_EQ_I((int)(col->attrs & RAY_SYM_W_MASK), RAY_SYM_W8); + TEST_ASSERT_FALSE(col->attrs & RAY_ATTR_HAS_NULLS); /* Just sanity: rows exist and aren't null. */ TEST_ASSERT_EQ_I(ray_table_nrows(loaded), 200); TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); @@ -1151,5 +1154,3 @@ const test_entry_t csv_entries[] = { { "csv/sym_narrowing", test_csv_sym_narrowing, NULL, NULL }, { NULL, NULL, NULL, NULL }, }; - -