Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion src/io/csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ typedef enum {
CSV_TYPE_DATE,
CSV_TYPE_TIME,
CSV_TYPE_TIMESTAMP,
CSV_TYPE_GUID
CSV_TYPE_GUID,
/* Narrow-int parse types — selected only via explicit schema (never from
* inference, so they do not appear in promote_csv_type). Each parses the
* field as int64 and narrows at write time to match the column width. */
CSV_TYPE_U8,
CSV_TYPE_I16,
CSV_TYPE_I32
} csv_type_t;

static csv_type_t detect_type(const char* f, size_t len) {
Expand Down Expand Up @@ -795,6 +801,9 @@ static void csv_parse_fn(void* arg, uint32_t worker_id,
for (; c < ctx->n_cols; c++) {
switch (ctx->col_types[c]) {
case CSV_TYPE_BOOL: ((uint8_t*)ctx->col_data[c])[row] = 0; break;
case CSV_TYPE_U8: ((uint8_t*)ctx->col_data[c])[row] = 0; break;
case CSV_TYPE_I16: ((int16_t*)ctx->col_data[c])[row] = 0; break;
case CSV_TYPE_I32: ((int32_t*)ctx->col_data[c])[row] = 0; break;
case CSV_TYPE_I64: ((int64_t*)ctx->col_data[c])[row] = 0; break;
case CSV_TYPE_F64: ((double*)ctx->col_data[c])[row] = 0.0; break;
case CSV_TYPE_DATE: ((int32_t*)ctx->col_data[c])[row] = 0; break;
Expand Down Expand Up @@ -846,6 +855,36 @@ static void csv_parse_fn(void* arg, uint32_t worker_id,
}
break;
}
case CSV_TYPE_U8: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((uint8_t*)ctx->col_data[c])[row] = (uint8_t)v;
if (is_null) {
ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
my_had_null[c] = true;
}
break;
}
case CSV_TYPE_I16: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((int16_t*)ctx->col_data[c])[row] = (int16_t)v;
if (is_null) {
ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
my_had_null[c] = true;
}
break;
}
case CSV_TYPE_I32: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((int32_t*)ctx->col_data[c])[row] = (int32_t)v;
if (is_null) {
ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
my_had_null[c] = true;
}
break;
}
case CSV_TYPE_F64: {
bool is_null;
double v = fast_f64(fld, flen, &is_null);
Expand Down Expand Up @@ -953,6 +992,9 @@ static void csv_parse_serial(const char* buf, size_t buf_size,
for (; c < n_cols; c++) {
switch (col_types[c]) {
case CSV_TYPE_BOOL: ((uint8_t*)col_data[c])[row] = 0; break;
case CSV_TYPE_U8: ((uint8_t*)col_data[c])[row] = 0; break;
case CSV_TYPE_I16: ((int16_t*)col_data[c])[row] = 0; break;
case CSV_TYPE_I32: ((int32_t*)col_data[c])[row] = 0; break;
case CSV_TYPE_I64: ((int64_t*)col_data[c])[row] = 0; break;
case CSV_TYPE_F64: ((double*)col_data[c])[row] = 0.0; break;
case CSV_TYPE_DATE: ((int32_t*)col_data[c])[row] = 0; break;
Expand Down Expand Up @@ -1004,6 +1046,36 @@ static void csv_parse_serial(const char* buf, size_t buf_size,
}
break;
}
case CSV_TYPE_U8: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((uint8_t*)col_data[c])[row] = (uint8_t)v;
if (is_null) {
col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
col_had_null[c] = true;
}
break;
}
case CSV_TYPE_I16: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((int16_t*)col_data[c])[row] = (int16_t)v;
if (is_null) {
col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
col_had_null[c] = true;
}
break;
}
case CSV_TYPE_I32: {
bool is_null;
int64_t v = fast_i64(fld, flen, &is_null);
((int32_t*)col_data[c])[row] = (int32_t)v;
if (is_null) {
col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7));
col_had_null[c] = true;
}
break;
}
case CSV_TYPE_F64: {
bool is_null;
double v = fast_f64(fld, flen, &is_null);
Expand Down Expand Up @@ -1298,6 +1370,9 @@ ray_t* ray_read_csv_opts(const char* path, char delimiter, bool header,
for (int c = 0; c < ncols; c++) {
switch (resolved_types[c]) {
case RAY_BOOL: parse_types[c] = CSV_TYPE_BOOL; break;
case RAY_U8: parse_types[c] = CSV_TYPE_U8; break;
case RAY_I16: parse_types[c] = CSV_TYPE_I16; break;
case RAY_I32: parse_types[c] = CSV_TYPE_I32; break;
case RAY_I64: parse_types[c] = CSV_TYPE_I64; break;
case RAY_F64: parse_types[c] = CSV_TYPE_F64; break;
case RAY_DATE: parse_types[c] = CSV_TYPE_DATE; break;
Expand Down
82 changes: 74 additions & 8 deletions src/ops/sort.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,28 @@ void radix_encode_fn(void* arg, uint32_t wid, int64_t start, int64_t end) {
}
case RAY_I16: {
const int16_t* d = (const int16_t*)c->data;
if (c->desc) {
bool has_nulls = c->col && (c->col->attrs & RAY_ATTR_HAS_NULLS);
bool nf = c->nulls_first;
bool desc = c->desc;
if (has_nulls) {
/* Shift non-null encoded values up by 1 so the null
* sentinel sits outside the [1, 0x10000] data range and
* cannot tie with INT16_MIN or INT16_MAX. Costs one
* extra radix-byte pass; sort_indices_ex bumps
* key_nbytes_max accordingly. */
uint64_t null_e = (nf ^ desc) ? 0 : 0x10001ULL;
if (desc) {
for (int64_t i = start; i < end; i++) {
if (ray_vec_is_null(c->col, i)) c->keys[i] = ~null_e;
else c->keys[i] = ~((uint64_t)((uint16_t)d[i] ^ ((uint16_t)1 << 15)) + 1);
}
} else {
for (int64_t i = start; i < end; i++) {
if (ray_vec_is_null(c->col, i)) c->keys[i] = null_e;
else c->keys[i] = (uint64_t)((uint16_t)d[i] ^ ((uint16_t)1 << 15)) + 1;
}
}
} else if (desc) {
for (int64_t i = start; i < end; i++)
c->keys[i] = ~((uint64_t)((uint16_t)d[i] ^ ((uint16_t)1 << 15)));
} else {
Expand All @@ -1031,7 +1052,26 @@ void radix_encode_fn(void* arg, uint32_t wid, int64_t start, int64_t end) {
}
case RAY_BOOL: case RAY_U8: {
const uint8_t* d = (const uint8_t*)c->data;
if (c->desc) {
bool has_nulls = c->col && (c->col->attrs & RAY_ATTR_HAS_NULLS);
bool nf = c->nulls_first;
bool desc = c->desc;
if (has_nulls) {
/* Shift non-null encoded values up by 1; null sentinel
* lives at 0 (NF) or 0x101 (NL), beyond every U8/BOOL
* data value. */
uint64_t null_e = (nf ^ desc) ? 0 : 0x101ULL;
if (desc) {
for (int64_t i = start; i < end; i++) {
if (ray_vec_is_null(c->col, i)) c->keys[i] = ~null_e;
else c->keys[i] = ~((uint64_t)d[i] + 1);
}
} else {
for (int64_t i = start; i < end; i++) {
if (ray_vec_is_null(c->col, i)) c->keys[i] = null_e;
else c->keys[i] = (uint64_t)d[i] + 1;
}
}
} else if (desc) {
for (int64_t i = start; i < end; i++)
c->keys[i] = ~(uint64_t)d[i];
} else {
Expand Down Expand Up @@ -2466,6 +2506,15 @@ static ray_t* sort_indices_ex(ray_t** cols, uint8_t* descs, uint8_t* nulls_first
if (can_radix && n_cols == 1) {
/* --- Single-key sort --- */
uint8_t key_nbytes_max = radix_key_bytes(cols[0]->type);
/* Narrow-int + has_nulls uses a +1-shifted encoding so
* the null sentinel sits one byte beyond the data
* range; reserve that extra byte for the radix pass. */
if ((cols[0]->attrs & RAY_ATTR_HAS_NULLS) &&
(cols[0]->type == RAY_BOOL || cols[0]->type == RAY_U8 ||
cols[0]->type == RAY_I16) &&
key_nbytes_max < 8) {
key_nbytes_max++;
}

/* Skip pool for small arrays - dispatch overhead dominates */
ray_pool_t* sk_pool = (nrows >= SMALL_POOL_THRESHOLD) ? pool : NULL;
Expand Down Expand Up @@ -3365,7 +3414,11 @@ ray_t* ray_sort(ray_t** cols, uint8_t* descs, uint8_t* nulls_first,
&sorted_keys, &keys_hdr);
if (!idx || RAY_IS_ERR(idx)) return idx;

if (sorted_keys && !RAY_IS_SYM(cols[0]->type)) {
bool c0_shifted = (cols[0]->attrs & RAY_ATTR_HAS_NULLS) &&
(cols[0]->type == RAY_BOOL ||
cols[0]->type == RAY_U8 ||
cols[0]->type == RAY_I16);
if (sorted_keys && !RAY_IS_SYM(cols[0]->type) && !c0_shifted) {
/* Decode path: sequential writes, no random access */
ray_t* result = ray_vec_new(cols[0]->type, nrows);
if (!result || RAY_IS_ERR(result)) {
Expand Down Expand Up @@ -3581,9 +3634,16 @@ ray_t* exec_sort(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t limit) {

/* Decode-gather optimisation: decode the sort key column directly from
* sorted radix keys (sequential writes) instead of random-access gather.
* Only for single-key, non-SYM sorts where radix keys are available. */
* Only for single-key, non-SYM sorts where radix keys are available.
* Narrow-int + has_nulls uses a +1-shifted encoding that radix_decode_into
* doesn't invert, so fall back to gather there. */
int8_t sk0_type = sort_vecs[0] ? sort_vecs[0]->type : 0;
bool sk0_shifted = sort_vecs[0] &&
(sort_vecs[0]->attrs & RAY_ATTR_HAS_NULLS) &&
(sk0_type == RAY_BOOL || sk0_type == RAY_U8 ||
sk0_type == RAY_I16);
int64_t sort_key_sym = -1;
if (sorted_keys && n_sort == 1 && !RAY_IS_SYM(sort_vecs[0]->type)) {
if (sorted_keys && n_sort == 1 && !RAY_IS_SYM(sk0_type) && !sk0_shifted) {
ray_op_ext_t* key_ext = find_ext(g, ext->sort.columns[0]->id);
if (key_ext && key_ext->base.opcode == OP_SCAN)
sort_key_sym = key_ext->sym;
Expand Down Expand Up @@ -3910,9 +3970,15 @@ ray_t* sort_table_by_keys(ray_t* tbl, ray_t* keys, uint8_t descending) {
/* Decode sort key column directly from sorted radix keys when
* available — sequential write, much faster than random-access
* gather. Only for single-key sorts where sort_indices_ex
* produced sorted_keys (non-packed path). */
* produced sorted_keys (non-packed path). Narrow-int + has_nulls
* uses a +1-shifted encoding that radix_decode_into doesn't invert,
* so fall back to gather in that case. */
int64_t decode_col_idx = -1;
if (sorted_keys && n_keys == 1 && !RAY_IS_SYM(key_cols[0]->type)) {
int8_t k0_type = key_cols[0]->type;
bool k0_shifted = (key_cols[0]->attrs & RAY_ATTR_HAS_NULLS) &&
(k0_type == RAY_BOOL || k0_type == RAY_U8 ||
k0_type == RAY_I16);
if (sorted_keys && n_keys == 1 && !RAY_IS_SYM(k0_type) && !k0_shifted) {
for (int64_t c = 0; c < ncols; c++) {
if (col_names[c] == key_ids[0] && new_cols[c]) {
decode_col_idx = c;
Expand All @@ -3922,7 +3988,7 @@ ray_t* sort_table_by_keys(ray_t* tbl, ray_t* keys, uint8_t descending) {
}
if (decode_col_idx >= 0) {
radix_decode_into(ray_data(new_cols[decode_col_idx]),
key_cols[0]->type, sorted_keys,
k0_type, sorted_keys,
nrows, descs[0]);
}

Expand Down
9 changes: 9 additions & 0 deletions src/ops/window.c
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,15 @@ ray_t* exec_window(ray_graph_t* g, ray_op_t* op, ray_t* tbl) {
if (can_radix && n_sort == 1) {
/* Single-key sort */
uint8_t key_nbytes = radix_key_bytes(sort_vecs[0]->type);
/* Narrow-int + has_nulls uses a +1-shifted encoding —
* keep the radix pass aligned with the wider key. */
if ((sort_vecs[0]->attrs & RAY_ATTR_HAS_NULLS) &&
(sort_vecs[0]->type == RAY_BOOL ||
sort_vecs[0]->type == RAY_U8 ||
sort_vecs[0]->type == RAY_I16) &&
key_nbytes < 8) {
key_nbytes++;
}
ray_pool_t* sk_pool = (nrows >= SMALL_POOL_THRESHOLD) ? pool : NULL;
ray_t *keys_hdr;
uint64_t* keys = (uint64_t*)scratch_alloc(&keys_hdr,
Expand Down
Loading
Loading