Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/core/block.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@
#include "../ops/ops.h"
#include "../table/sym.h"

/* Weak stub for ray_alloc — historically a fallback if no allocator is
* linked. Every build configuration in tree links src/mem/heap.c, whose
* strong ray_alloc always wins, so this body is dead code under llvm-cov.
* Compiled out so the symbol no longer drags coverage down; restore the
* #if 0 if a future build configuration ships without the buddy allocator. */
#if 0
/* Weak stub for ray_alloc — replaced by buddy allocator at link time.
* Uses ray_vm_alloc (mmap) — page-aligned and zero-filled. */
__attribute__((weak))
ray_t* ray_alloc(size_t size) {
if (size < 32) size = 32;
Expand All @@ -41,7 +37,6 @@ ray_t* ray_alloc(size_t size) {
if (!p) return ray_error("oom", NULL);
return (ray_t*)p;
}
#endif

size_t ray_block_size(ray_t* v) {
if (ray_is_atom(v)) return 32;
Expand Down
34 changes: 19 additions & 15 deletions src/core/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,12 @@ static ray_t* ipc_read_handshake(ray_poll_t* poll, ray_selector_t* sel);
static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel);
static ray_t* ipc_read_header(ray_poll_t* poll, ray_selector_t* sel);
static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel);
static ray_t* ipc_on_data(ray_poll_t* poll, ray_selector_t* sel, void* data);
static void ipc_on_close(ray_poll_t* poll, ray_selector_t* sel);

/* Wrappers matching ray_io_fn signature for socket recv/send */
/* Wrappers matching ray_io_fn signature for socket recv */
static int64_t ipc_recv_fn(int64_t fd, uint8_t* buf, int64_t len) {
return ray_sock_recv((ray_sock_t)fd, buf, (size_t)len);
}
static int64_t ipc_send_fn(int64_t fd, uint8_t* buf, int64_t len) {
return ray_sock_send((ray_sock_t)fd, buf, (size_t)len);
}

/* Accept callback — called when listener fd is readable */
static ray_t* ipc_accept(ray_poll_t* poll, ray_selector_t* sel)
Expand All @@ -449,9 +445,7 @@ static ray_t* ipc_accept(ray_poll_t* poll, ray_selector_t* sel)
reg.fd = (int64_t)new_fd;
reg.type = RAY_SEL_SOCKET;
reg.recv_fn = ipc_recv_fn;
reg.send_fn = ipc_send_fn;
reg.read_fn = ipc_read_handshake;
reg.data_fn = ipc_on_data;
reg.close_fn = ipc_on_close;
reg.data = cd;

Expand Down Expand Up @@ -505,8 +499,24 @@ static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel)
if (!sel->rx.buf || sel->rx.buf->offset < 1) return NULL;
uint8_t cred_len = sel->rx.buf->data[0];

if (sel->rx.buf->offset < 1 + cred_len) {
ray_poll_rx_request(poll, sel, 1 + cred_len);
/* The handshake first asks for 1 byte (the cred_len prefix); after
* reading it we need to grow the rx buffer to 1 + cred_len without
* losing the byte we already have. ray_poll_rx_request resets the
* buffer when it grows, so do the grow in-place here. */
int64_t need = 1 + (int64_t)cred_len;
if (sel->rx.buf->size < need) {
ray_poll_buf_t* old = sel->rx.buf;
ray_poll_buf_t* nb = ray_poll_buf_new(need);
if (!nb) { ray_poll_deregister(poll, sel->id); return NULL; }
nb->data[0] = cred_len;
nb->offset = 1;
nb->size = need;
ray_poll_buf_free(old);
sel->rx.buf = nb;
return NULL;
}
if (sel->rx.buf->offset < need) {
sel->rx.buf->size = need;
return NULL;
}

Expand Down Expand Up @@ -581,12 +591,6 @@ static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel)
return NULL;
}

static ray_t* ipc_on_data(ray_poll_t* poll, ray_selector_t* sel, void* data)
{
(void)poll; (void)sel; (void)data;
return NULL;
}

static void ipc_on_close(ray_poll_t* poll, ray_selector_t* sel)
{
(void)poll;
Expand Down
13 changes: 10 additions & 3 deletions src/ops/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -968,12 +968,19 @@ static inline void par_set_null(ray_t* vec, int64_t idx) {
(uint8_t)(1u << bit_idx), __ATOMIC_RELAXED);
}

/* Pre-allocate external nullmap so parallel threads can set bits safely. */
/* Pre-allocate external nullmap so parallel threads can set bits safely.
*
* Probe at idx>=128 (not idx=0): ray_vec_set_null_checked(vec, 0, true)
* stays in the inline-nullmap path because the inline 16-byte bitmap
* fits idx<128 — so it never promotes to ext_nullmap. par_set_null
* for idx>=128 would then race-crash on lazy ext alloc. Probing at
* len-1 forces the promotion path. */
static inline ray_err_t par_prepare_nullmap(ray_t* vec) {
if (vec->len <= 128) return RAY_OK;
ray_err_t err = ray_vec_set_null_checked(vec, 0, true);
int64_t probe = vec->len - 1; /* >= 128, forces ext promotion */
ray_err_t err = ray_vec_set_null_checked(vec, probe, true);
if (err != RAY_OK) return err;
ray_vec_set_null_checked(vec, 0, false);
ray_vec_set_null_checked(vec, probe, false);
vec->attrs &= (uint8_t)~RAY_ATTR_HAS_NULLS;
return RAY_OK;
}
Expand Down
15 changes: 11 additions & 4 deletions src/store/serde.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,16 @@ int64_t ray_ser_raw(uint8_t* buf, ray_t* obj) {
case RAY_TIME:
memcpy(buf, &obj->i32, 4);
return 1 + 1 + 4;
case RAY_F32:
memcpy(buf, &obj->i32, 4); /* same 4-byte slot */
case RAY_F32: {
/* F32 atoms store the value in obj->f64 (see ray_f32 in
* src/vec/atom.c). Earlier code read &obj->i32 hoping
* those bytes aliased the float — but f64 is 8 bytes, so
* the low half is just the lsb of the double bit pattern,
* not the float value. Narrow explicitly. */
float f = (float)obj->f64;
memcpy(buf, &f, 4);
return 1 + 1 + 4;
}
case RAY_I64:
case RAY_TIMESTAMP:
memcpy(buf, &obj->i64, 8);
Expand Down Expand Up @@ -539,8 +546,8 @@ ray_t* ray_de_raw(uint8_t* buf, int64_t* len) {
case RAY_F32:
if (*len < 4) return ray_error("domain", NULL);
{ float v; memcpy(&v, buf, 4); *len -= 4;
return is_null ? ray_typed_null(-RAY_F64)
: ray_f64((double)v); /* promote to f64 atom */ }
return is_null ? ray_typed_null(-RAY_F32)
: ray_f32(v); }
case RAY_I64:
if (*len < 8) return ray_error("domain", NULL);
{ int64_t v; memcpy(&v, buf, 8); *len -= 8;
Expand Down
16 changes: 13 additions & 3 deletions test/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extern const test_entry_t atom_entries[];
extern const test_entry_t audit_entries[];
extern const test_entry_t block_entries[];
extern const test_entry_t buddy_entries[];
extern const test_entry_t compile_entries[];
extern const test_entry_t cow_entries[];
extern const test_entry_t csr_entries[];
extern const test_entry_t csv_entries[];
Expand All @@ -108,8 +109,12 @@ extern const test_entry_t format_entries[];
extern const test_entry_t fvec_entries[];
extern const test_entry_t graph_entries[];
extern const test_entry_t graph_builtin_entries[];
extern const test_entry_t group_extra_entries[];
extern const test_entry_t hash_entries[];
extern const test_entry_t heap_entries[];
extern const test_entry_t index_entries[];
extern const test_entry_t ipc_entries[];
extern const test_entry_t journal_entries[];
extern const test_entry_t lang_entries[];
extern const test_entry_t link_entries[];
extern const test_entry_t lftj_entries[];
Expand All @@ -127,6 +132,8 @@ extern const test_entry_t repl_entries[];
extern const test_entry_t rowsel_entries[];
extern const test_entry_t runtime_entries[];
extern const test_entry_t sel_entries[];
extern const test_entry_t sort_entries[];
extern const test_entry_t splay_entries[];
extern const test_entry_t store_entries[];
extern const test_entry_t str_entries[];
extern const test_entry_t sym_entries[];
Expand All @@ -139,19 +146,22 @@ extern const test_entry_t window_entries[];

static const test_entry_t* const compiled_groups[] = {
err_entries, arena_entries, atom_entries, audit_entries,
block_entries, buddy_entries, cow_entries, csr_entries,
block_entries, buddy_entries, compile_entries, cow_entries, csr_entries,
csv_entries, datalog_entries, dict_entries, dump_entries,
embedding_entries, exec_entries,
format_entries, fvec_entries, graph_entries, graph_builtin_entries,
group_extra_entries,
hash_entries,
heap_entries,
index_entries,
index_entries, ipc_entries,
journal_entries,
lang_entries, link_entries,
lftj_entries, list_entries, meta_entries, morsel_entries,
numparse_entries, opt_entries, partition_exec_entries,
pipe_entries, platform_entries,
pool_entries, progress_entries,
repl_entries, rowsel_entries, runtime_entries, sel_entries,
store_entries,
sort_entries, splay_entries, store_entries,
str_entries, sym_entries, sys_entries, table_entries,
term_entries,
types_entries, vec_entries, window_entries,
Expand Down
Loading
Loading