From 76d7cbbbb0a37b74fd402fa844db0e156b1a9cbd Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:17:42 -0400 Subject: [PATCH 01/20] Fix FD leak in nif_create_fd when mutex allocation fails If enif_mutex_create returned NULL the resource was returned to Elixir with res->lock == NULL and res->fd set. The destructor gated close() on a non-NULL lock, so the FD leaked; any subsequent nif_close would also dereference NULL and crash. Now nif_create_fd checks the mutex result, releases the resource on failure (so dtor reclaims the fd), and returns {:error, :mutex_failed}. io_resource_dtor no longer gates close() on lock presence. --- c_src/net_runner_nif.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/c_src/net_runner_nif.c b/c_src/net_runner_nif.c index d860379..3fbdd94 100644 --- a/c_src/net_runner_nif.c +++ b/c_src/net_runner_nif.c @@ -38,13 +38,17 @@ static ErlNifResourceType *io_resource_type = NULL; static void io_resource_dtor(ErlNifEnv *env, void *obj) { (void)env; io_resource_t *res = (io_resource_t *)obj; + /* Close fd even if mutex construction failed — otherwise ENOMEM during + * nif_create_fd would leak the underlying FD. */ if (res->lock) { enif_mutex_lock(res->lock); - if (!res->closed && res->fd >= 0) { - close(res->fd); - res->fd = -1; - res->closed = 1; - } + } + if (!res->closed && res->fd >= 0) { + close(res->fd); + res->fd = -1; + res->closed = 1; + } + if (res->lock) { enif_mutex_unlock(res->lock); enif_mutex_destroy(res->lock); res->lock = NULL; @@ -167,6 +171,13 @@ static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc, res->owner = owner; res->monitor_active = 0; + if (!res->lock) { + /* Mutex allocation failed — release resource (dtor will close fd) */ + enif_release_resource(res); + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "mutex_failed")); + } + /* Monitor the owner process */ if (enif_monitor_process(env, res, &owner, &res->monitor) == 0) { res->monitor_active = 1; From b0dea5541c0d74b01e40a12ff555b540d4ce3ec9 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:22:08 -0400 Subject: [PATCH 02/20] Fix use-after-close race in NIF FD resource nif_read / nif_write copied res->fd under lock and released the lock before the read()/write() syscall. A concurrent nif_close or owner-death down callback could close the fd on another thread before the syscall ran, letting the read/write target a recycled fd. Restructure so the mutex is held across the syscall and the subsequent enif_select registration. Since fds are O_NONBLOCK, the lock is held only briefly. Move the actual close() into the enif_select stop callback. nif_close and io_resource_down now mark the resource closed, detach the fd from res->fd, release the lock, and call enif_select(SELECT_STOP). BEAM waits for any in-flight select to drain before running the stop callback, which closes the fd safely. --- c_src/net_runner_nif.c | 130 ++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 59 deletions(-) diff --git a/c_src/net_runner_nif.c b/c_src/net_runner_nif.c index 3fbdd94..b7e4983 100644 --- a/c_src/net_runner_nif.c +++ b/c_src/net_runner_nif.c @@ -59,30 +59,36 @@ static void io_resource_stop(ErlNifEnv *env, void *obj, ErlNifEvent event, int is_direct_call) { (void)env; (void)obj; - (void)event; (void)is_direct_call; - /* enif_select stop callback - FD is being deselected */ + /* BEAM guarantees no further use of this event by the NIF is in flight + * when this callback runs. Safe to close the underlying fd here. */ + if ((int)event >= 0) { + close((int)event); + } } static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon) { - (void)env; (void)pid; (void)mon; io_resource_t *res = (io_resource_t *)obj; - /* Owner process died - close the FD */ + int fd_to_stop = -1; if (res->lock) { enif_mutex_lock(res->lock); if (!res->closed && res->fd >= 0) { - enif_select(env, (ErlNifEvent)res->fd, ERL_NIF_SELECT_STOP, - obj, NULL, enif_make_atom(env, "undefined")); - close(res->fd); + fd_to_stop = res->fd; res->fd = -1; res->closed = 1; } res->monitor_active = 0; enif_mutex_unlock(res->lock); } + if (fd_to_stop >= 0) { + /* Hand fd off to the stop callback — it will close it after any + * in-flight enif_select completes. */ + enif_select(env, (ErlNifEvent)fd_to_stop, ERL_NIF_SELECT_STOP, + obj, NULL, enif_make_atom(env, "undefined")); + } } static ErlNifResourceTypeInit io_resource_init = { @@ -210,42 +216,51 @@ static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc, } if (max_bytes > 1048576) max_bytes = 1048576; /* Cap at 1MB */ + ErlNifBinary bin; + if (!enif_alloc_binary(max_bytes, &bin)) { + return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "alloc_failed")); + } + + /* Hold the lock across read() + enif_select so that a concurrent + * nif_close / down callback cannot close the fd mid-syscall. read() is + * non-blocking (O_NONBLOCK) so the lock is held only briefly. */ enif_mutex_lock(res->lock); - if (res->closed) { + if (res->closed || res->fd < 0) { enif_mutex_unlock(res->lock); + enif_release_binary(&bin); return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "closed")); } int fd = res->fd; - enif_mutex_unlock(res->lock); - - ErlNifBinary bin; - if (!enif_alloc_binary(max_bytes, &bin)) { - return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "alloc_failed")); - } ssize_t n = read(fd, bin.data, bin.size); + int saved_errno = errno; + if (n > 0) { + enif_mutex_unlock(res->lock); enif_realloc_binary(&bin, (size_t)n); return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &bin)); - } else if (n == 0) { + } + if (n == 0) { + enif_mutex_unlock(res->lock); enif_release_binary(&bin); return atom_eof; - } else { + } + if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { + int sel_ret = enif_select(env, (ErlNifEvent)fd, + ERL_NIF_SELECT_READ, res, NULL, + atom_undefined); + enif_mutex_unlock(res->lock); enif_release_binary(&bin); - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* Register for select notification */ - int sel_ret = enif_select(env, (ErlNifEvent)fd, - ERL_NIF_SELECT_READ, res, NULL, - atom_undefined); - if (sel_ret < 0) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, "select_failed")); - } - return enif_make_tuple2(env, atom_error, atom_eagain); + if (sel_ret < 0) { + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "select_failed")); } - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(errno))); + return enif_make_tuple2(env, atom_error, atom_eagain); } + enif_mutex_unlock(res->lock); + enif_release_binary(&bin); + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, errno_to_atom(saved_errno))); } /* @@ -272,34 +287,39 @@ static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc, return enif_make_tuple2(env, atom_ok, enif_make_int(env, 0)); } + /* Hold the lock across write() + enif_select so that a concurrent + * close cannot reap the fd mid-syscall. */ enif_mutex_lock(res->lock); - if (res->closed) { + if (res->closed || res->fd < 0) { enif_mutex_unlock(res->lock); return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "closed")); } int fd = res->fd; - enif_mutex_unlock(res->lock); ssize_t n = write(fd, bin.data, bin.size); + int saved_errno = errno; + if (n >= 0) { + enif_mutex_unlock(res->lock); return enif_make_tuple2(env, atom_ok, enif_make_int64(env, (int64_t)n)); - } else { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - int sel_ret = enif_select(env, (ErlNifEvent)fd, - ERL_NIF_SELECT_WRITE, res, NULL, - atom_undefined); - if (sel_ret < 0) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, "select_failed")); - } - return enif_make_tuple2(env, atom_error, atom_eagain); - } - if (errno == EPIPE) { - return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "epipe")); + } + if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { + int sel_ret = enif_select(env, (ErlNifEvent)fd, + ERL_NIF_SELECT_WRITE, res, NULL, + atom_undefined); + enif_mutex_unlock(res->lock); + if (sel_ret < 0) { + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "select_failed")); } - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(errno))); + return enif_make_tuple2(env, atom_error, atom_eagain); } + enif_mutex_unlock(res->lock); + if (saved_errno == EPIPE) { + return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "epipe")); + } + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, errno_to_atom(saved_errno))); } /* @@ -326,27 +346,19 @@ static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc, res->closed = 1; res->fd = -1; - /* Deregister from enif_select before closing */ - enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, res, NULL, - atom_undefined); - if (res->monitor_active) { enif_demonitor_process(env, res, &res->monitor); res->monitor_active = 0; } - /* Close FD inside critical section to prevent TOCTOU race: - * a concurrent nif_read/nif_write on a dirty scheduler could copy the FD - * under lock then use it after we release the lock but before close(). */ - int close_ret = close(fd); - int close_errno = errno; - enif_mutex_unlock(res->lock); - if (close_ret != 0 && close_errno != EINTR) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(close_errno))); - } + /* Hand fd off to the stop callback — BEAM waits for any in-flight select + * registration to drain before calling stop, which then close()s the fd. + * Concurrent nif_read/nif_write serialize on res->lock; once they observe + * closed==1 they early-out without touching the fd. */ + enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, res, NULL, + atom_undefined); return atom_ok; } From a29fe16fa07b9d47e6a2abfdbb24358e799a8e6f Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:23:20 -0400 Subject: [PATCH 03/20] Recover initial stderr chunk and use explicit read size In :consume mode, kick_stderr_read is called from init/1 to arm enif_select. If the initial Pipe.read returned {:ok, data} synchronously (short-lived processes whose stderr is already in the pipe buffer), it sent {:stderr_data, data} to self() but no handle_info clause matched, so the generic catch-all dropped it. The first (and sometimes only) chunk of stderr was lost, corrupting AbnormalExit messages and run/2 stderr capture for fast-exiting commands. Add the missing handle_info({:stderr_data, _}, state) clause that appends to stderr_buffer, records the bytes in stats, and drains any remaining data via consume_stderr. Also pass @default_read_size explicitly from kick_stderr_read instead of relying on Pipe.read/1's implicit default, keeping read sizes consistent across the module. --- lib/net_runner/process.ex | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index 5c17daa..bbe4774 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -246,6 +246,15 @@ defmodule NetRunner.Process do {:noreply, state} end + # Initial stderr chunk from kick_stderr_read in init/1. Without this clause + # the data would be silently dropped by the catch-all below. + def handle_info({:stderr_data, data}, state) when is_binary(data) do + stats = Stats.record_read_stderr(state.stats, byte_size(data)) + state = %{state | stderr_buffer: [data | state.stderr_buffer], stats: stats} + # Drain anything else buffered and re-arm enif_select on EAGAIN. + {:noreply, consume_stderr(state)} + end + def handle_info(_msg, state) do {:noreply, state} end @@ -399,8 +408,10 @@ defmodule NetRunner.Process do defp kick_stderr_read(state) do if state.stderr do - # Do an initial read to get enif_select registered - case Pipe.read(state.stderr) do + # Do an initial read to get enif_select registered. If data is + # immediately available, hand it to handle_info/2 so the GenServer + # buffers it (can't update state from init/1 without reshaping it). + case Pipe.read(state.stderr, @default_read_size) do {:ok, data} -> send(self(), {:stderr_data, data}) From 829a7354b79c9a6687acd70ca1e5921a70c546c9 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:24:30 -0400 Subject: [PATCH 04/20] Fix write_loop spin on {:ok, 0} and message starvation on large writes Two issues in the stdin write path: 1. If nif_write returned {:ok, 0} on a non-empty buffer (legal for write(2) on non-regular files), write_loop would recurse forever with the same data, wedging the GenServer. Park the caller and wait for :ready_output instead of spinning. 2. On repeated partial writes, write_loop kept recursing inside handle_call, delaying unrelated messages (kill, await_exit, :select) until either completion or EAGAIN. After one successful partial, park the caller with the remainder and let :ready_output drive subsequent writes. Apply the same shape to retry_write_loop on the :ready_output path. --- lib/net_runner/process.ex | 62 +++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index bbe4774..0a13c01 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -283,25 +283,30 @@ defmodule NetRunner.Process do write_loop(data, from, state) end - # Writes data in a loop: partial writes retry immediately until EAGAIN - # (which registers enif_select) or completion. + # Writes data once per call: on a successful partial write we park the + # caller and let :ready_output drive the remainder. This avoids (a) a + # spin loop if the kernel ever returns {:ok, 0} on a non-empty write and + # (b) starving other handle_call messages on large buffers. defp write_loop(<<>>, _from, state), do: {:reply, :ok, state} defp write_loop(data, from, state) do case Pipe.write(state.stdin, data) do - {:ok, bytes_written} -> + {:ok, bytes_written} when bytes_written == byte_size(data) -> stats = Stats.record_write(state.stats, bytes_written) - state = %{state | stats: stats} - total = byte_size(data) + {:reply, :ok, %{state | stats: stats}} - if bytes_written >= total do - {:reply, :ok, state} - else - # Partial write — retry remainder immediately to either complete - # or hit EAGAIN (which registers enif_select for readiness notification) - remaining = binary_part(data, bytes_written, total - bytes_written) - write_loop(remaining, from, state) - end + {:ok, bytes_written} when bytes_written > 0 -> + stats = Stats.record_write(state.stats, bytes_written) + remaining = binary_part(data, bytes_written, byte_size(data) - bytes_written) + # Park the caller with the remaining data; :ready_output will drive it. + {ops, _ref} = Operations.park(state.operations, :write, from, remaining) + {:noreply, %{state | operations: ops, stats: stats}} + + {:ok, 0} -> + # write(2) can legally return 0 on a non-empty buffer; treat the + # same as EAGAIN to avoid spinning on the dirty scheduler. + {ops, _ref} = Operations.park(state.operations, :write, from, data) + {:noreply, %{state | operations: ops}} {:error, :eagain} -> # enif_select is now registered for write readiness @@ -379,25 +384,26 @@ defmodule NetRunner.Process do defp retry_write_loop(ref, from, data, state) do case Pipe.write(state.stdin, data) do - {:ok, bytes_written} -> + {:ok, bytes_written} when bytes_written == byte_size(data) -> stats = Stats.record_write(state.stats, bytes_written) - state = %{state | stats: stats} - total = byte_size(data) + GenServer.reply(from, :ok) + {_, ops} = Operations.pop(state.operations, ref) + %{state | operations: ops, stats: stats} - if bytes_written >= total do - GenServer.reply(from, :ok) - {_, ops} = Operations.pop(state.operations, ref) - %{state | operations: ops} - else - remaining = binary_part(data, bytes_written, total - bytes_written) - retry_write_loop(ref, from, remaining, state) - end + {:ok, bytes_written} when bytes_written > 0 -> + stats = Stats.record_write(state.stats, bytes_written) + remaining = binary_part(data, bytes_written, byte_size(data) - bytes_written) + {_, ops} = Operations.pop(state.operations, ref) + {ops, _new_ref} = Operations.park(ops, :write, from, remaining) + %{state | operations: ops, stats: stats} + + {:ok, 0} -> + # Zero-byte write on non-empty data — wait for the next :ready_output + # (ops entry still valid with full data). + state {:error, :eagain} -> - # Still parked, update data (enif_select already registered) - {_, ops} = Operations.pop(state.operations, ref) - {ops, _new_ref} = Operations.park(ops, :write, from, data) - %{state | operations: ops} + state {:error, _} = error -> GenServer.reply(from, error) From 51b68f645070b03356a7e46beede1a3635ac3fbf Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:33:06 -0400 Subject: [PATCH 05/20] Frame shepherd UDS commands correctly with carry-over buffer The event loop read up to 16 bytes from the UDS socket and passed them straight to handle_command, which only parsed buf[0] and ignored the remainder. Coalesced commands (e.g. CMD_CLOSE_STDIN followed by CMD_KILL delivered in a single read) dropped everything after the first opcode. A partial tail crossing a read boundary was also lost. Introduce command_length() per opcode, a loop-parsing handle_commands() helper, and a 64-byte carry-over buffer in event_loop() that retains any truncated tail across reads. Unknown opcodes advance one byte so the parser cannot stall on bad input. read() errors other than EINTR / EAGAIN now kill the child and exit rather than silently looping. --- c_src/shepherd.c | 70 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/c_src/shepherd.c b/c_src/shepherd.c index 2ea0d83..3036cec 100644 --- a/c_src/shepherd.c +++ b/c_src/shepherd.c @@ -257,10 +257,23 @@ static void kill_child(pid_t child_pid) { } /* - * Handle a command received from BEAM over UDS. + * Length of a single framed command given its opcode. Returns 0 if the + * opcode is unknown (in which case the parser will skip one byte). + */ +static size_t command_length(uint8_t opcode) { + switch (opcode) { + case CMD_KILL: return 2; + case CMD_CLOSE_STDIN: return 1; + case CMD_SET_WINSIZE: return 5; + default: return 0; + } +} + +/* + * Handle a single command frame. */ static void handle_command(int uds_fd, pid_t child_pid, int stdin_w, - uint8_t *buf, ssize_t len) { + uint8_t *buf, size_t len) { (void)uds_fd; if (len < 1) return; @@ -300,6 +313,36 @@ static void handle_command(int uds_fd, pid_t child_pid, int stdin_w, } } +/* + * Parse and dispatch all framed commands present in buf. Returns the number + * of bytes consumed (may be less than len if a tail command is truncated). + */ +static size_t handle_commands(int uds_fd, pid_t child_pid, int *stdin_w, + uint8_t *buf, size_t len) { + size_t off = 0; + while (off < len) { + size_t clen = command_length(buf[off]); + if (clen == 0) { + /* Unknown opcode: skip one byte to make progress rather than + * stalling the parser on bad input. */ + DEBUG_LOG("unknown command opcode 0x%02x, skipping", buf[off]); + off += 1; + continue; + } + if (off + clen > len) { + /* Partial tail — caller must carry this over to the next read. */ + break; + } + uint8_t op = buf[off]; + handle_command(uds_fd, child_pid, *stdin_w, &buf[off], clen); + if (op == CMD_CLOSE_STDIN) { + *stdin_w = -1; + } + off += clen; + } + return off; +} + /* * Main event loop using poll(). * @@ -312,6 +355,11 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { int child_status = -1; int child_exited = 0; + /* Carry-over buffer for partially-framed commands across reads. Max + * incoming frame is CMD_SET_WINSIZE (5 bytes); keep a little slack. */ + uint8_t cbuf[64]; + size_t cbuf_used = 0; + fds[0].fd = uds_fd; fds[0].events = POLLIN; fds[1].fd = signal_pipe[0]; @@ -336,19 +384,25 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { } if (fds[0].revents & POLLIN) { - uint8_t buf[16]; - ssize_t n = read(uds_fd, buf, sizeof(buf)); + ssize_t n = read(uds_fd, cbuf + cbuf_used, sizeof(cbuf) - cbuf_used); if (n > 0) { - handle_command(uds_fd, child_pid, stdin_w, buf, n); - /* If CMD_CLOSE_STDIN was handled, mark stdin as closed */ - if (buf[0] == CMD_CLOSE_STDIN) { - stdin_w = -1; + cbuf_used += (size_t)n; + size_t consumed = handle_commands(uds_fd, child_pid, &stdin_w, + cbuf, cbuf_used); + if (consumed > 0 && consumed < cbuf_used) { + memmove(cbuf, cbuf + consumed, cbuf_used - consumed); } + cbuf_used -= consumed; } else if (n == 0) { /* BEAM closed the socket */ DEBUG_LOG("BEAM closed UDS, killing child %d", child_pid); kill_child(child_pid); return -1; + } else if (errno != EAGAIN && errno != EWOULDBLOCK && + errno != EINTR) { + ERROR_LOG("read(uds) failed: %s", strerror(errno)); + kill_child(child_pid); + return -1; } } From 1357251898d15afcbce2057643233521e87259e2 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:34:11 -0400 Subject: [PATCH 06/20] Harden send_fds and post-fork child path send_fds: * Loop on EINTR and require ret == 1 (the full 1-byte send) instead of accepting any positive return. * Log errno on both calloc and sendmsg failures so the caller sees a diagnostic instead of an opaque "failed to send FDs". Post-fork child: * Between fork() and execvp(), POSIX allows only async-signal-safe functions. Replace fprintf/strerror on failure with a new child_fail() helper that uses only write(2) on a stack buffer. * Check setsid, TIOCSCTTY, and every dup2 return value. If any of these fail the child now reports via child_fail() and _exit(127) instead of silently running with the wrong stdio or no controlling terminal. --- c_src/shepherd.c | 71 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/c_src/shepherd.c b/c_src/shepherd.c index 3036cec..d0a7e61 100644 --- a/c_src/shepherd.c +++ b/c_src/shepherd.c @@ -45,6 +45,28 @@ /* Self-pipe for signal handling */ static int signal_pipe[2] = {-1, -1}; +/* + * Async-signal-safe post-fork failure path. Between fork() and exec*(), + * POSIX allows only async-signal-safe functions, so we cannot call + * fprintf / strerror / malloc. This helper uses write(2) on a stack + * buffer only. + */ +static void child_fail(const char *tag, const char *detail) { + if (tag) { + size_t i = 0; + while (i < 64 && tag[i] != '\0') i++; + (void)!write(STDERR_FILENO, tag, i); + (void)!write(STDERR_FILENO, ": ", 2); + } + if (detail) { + size_t i = 0; + while (i < 256 && detail[i] != '\0') i++; + (void)!write(STDERR_FILENO, detail, i); + } + (void)!write(STDERR_FILENO, "\n", 1); + _exit(127); +} + static void sigchld_handler(int sig) { (void)sig; int saved_errno = errno; @@ -75,7 +97,10 @@ static int send_fds(int uds_fd, int *fds, int nfds) { size_t cmsg_space = CMSG_SPACE((size_t)nfds * sizeof(int)); char *cmsg_buf = calloc(1, cmsg_space); - if (!cmsg_buf) return -1; + if (!cmsg_buf) { + ERROR_LOG("send_fds: calloc(%zu) failed", cmsg_space); + return -1; + } struct msghdr msg = {0}; msg.msg_iov = &iov; @@ -85,6 +110,7 @@ static int send_fds(int uds_fd, int *fds, int nfds) { struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); if (!cmsg) { + ERROR_LOG("send_fds: CMSG_FIRSTHDR returned NULL"); free(cmsg_buf); return -1; } @@ -93,9 +119,18 @@ static int send_fds(int uds_fd, int *fds, int nfds) { cmsg->cmsg_len = CMSG_LEN((size_t)nfds * sizeof(int)); memcpy(CMSG_DATA(cmsg), fds, (size_t)nfds * sizeof(int)); - ssize_t ret = sendmsg(uds_fd, &msg, 0); - free(cmsg_buf); - return ret > 0 ? 0 : -1; + /* Retry on EINTR; treat anything other than a full 1-byte send as error. */ + for (;;) { + ssize_t ret = sendmsg(uds_fd, &msg, 0); + if (ret == 1) { + free(cmsg_buf); + return 0; + } + if (ret < 0 && errno == EINTR) continue; + ERROR_LOG("sendmsg failed: ret=%zd errno=%s", ret, strerror(errno)); + free(cmsg_buf); + return -1; + } } /* @@ -560,19 +595,20 @@ int main(int argc, char *argv[]) { close(signal_pipe[1]); close(master_fd); - /* Create new session and set controlling terminal */ - setsid(); - ioctl(slave_fd, TIOCSCTTY, 0); + /* Create new session (required before acquiring controlling tty) */ + if (setsid() == (pid_t)-1) child_fail("setsid", NULL); + if (ioctl(slave_fd, TIOCSCTTY, 0) != 0) child_fail("TIOCSCTTY", NULL); - dup2(slave_fd, STDIN_FILENO); - dup2(slave_fd, STDOUT_FILENO); - dup2(slave_fd, STDERR_FILENO); + if (dup2(slave_fd, STDIN_FILENO) != STDIN_FILENO || + dup2(slave_fd, STDOUT_FILENO) != STDOUT_FILENO || + dup2(slave_fd, STDERR_FILENO) != STDERR_FILENO) { + child_fail("dup2", NULL); + } if (slave_fd > STDERR_FILENO) close(slave_fd); setpgid(0, 0); execvp(cmd, cmd_args); - fprintf(stderr, "execvp failed: %s: %s\n", cmd, strerror(errno)); - _exit(127); + child_fail("execvp", cmd); } /* === Shepherd (PTY) === */ @@ -665,9 +701,11 @@ int main(int argc, char *argv[]) { close(stdout_pipe[0]); close(stderr_pipe[0]); - dup2(stdin_pipe[0], STDIN_FILENO); - dup2(stdout_pipe[1], STDOUT_FILENO); - dup2(stderr_pipe[1], STDERR_FILENO); + if (dup2(stdin_pipe[0], STDIN_FILENO) != STDIN_FILENO || + dup2(stdout_pipe[1], STDOUT_FILENO) != STDOUT_FILENO || + dup2(stderr_pipe[1], STDERR_FILENO) != STDERR_FILENO) { + child_fail("dup2", NULL); + } close(stdin_pipe[0]); close(stdout_pipe[1]); @@ -675,8 +713,7 @@ int main(int argc, char *argv[]) { setpgid(0, 0); execvp(cmd, cmd_args); - fprintf(stderr, "execvp failed: %s: %s\n", cmd, strerror(errno)); - _exit(127); + child_fail("execvp", cmd); } /* === Shepherd (pipe) === */ From 50237f3c1bac0c676efc8a26b9ba0af6cff9099c Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:34:47 -0400 Subject: [PATCH 07/20] Harden waitpid paths in shepherd MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit kill_child() previously called waitpid(child_pid, NULL, 0) after SIGKILL. If the child was in uninterruptible kernel sleep (D-state), this blocked the shepherd indefinitely — leaking the process group and preventing any further protocol communication with BEAM. Replace with a bounded WNOHANG reap loop (~3s cap) so control returns even when the kernel cannot reap immediately; cgroup_cleanup() provides the authoritative kill path on Linux. The event-loop SIGCHLD handler reaped at most one child per drain. SIGCHLD is coalesced by the kernel, so multiple pending exits could deliver as a single signal; the single waitpid missed them. Loop with waitpid(-1, ..., WNOHANG) until no more reapable children remain and flip child_exited only for the managed pid. --- c_src/shepherd.c | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/c_src/shepherd.c b/c_src/shepherd.c index d0a7e61..abbb985 100644 --- a/c_src/shepherd.c +++ b/c_src/shepherd.c @@ -285,7 +285,17 @@ static void kill_child(pid_t child_pid) { /* Escalate to SIGKILL the whole process group */ kill(-child_pid, SIGKILL); - waitpid(child_pid, NULL, 0); + + /* Bounded WNOHANG reap loop — avoid hanging forever if the child is + * stuck in uninterruptible kernel sleep (D-state). After the bound + * elapses we return anyway; cgroup cleanup + the kernel eventually + * reap. */ + int sigkill_iters = 30; /* ~3s total at 100ms per iteration */ + for (int i = 0; i < sigkill_iters; i++) { + pid_t ret = waitpid(child_pid, NULL, WNOHANG); + if (ret > 0 || (ret < 0 && errno == ECHILD)) break; + usleep(100000); + } /* Cleanup cgroup (kills any remaining processes, removes dir) */ cgroup_cleanup(); @@ -447,10 +457,16 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { char drain[64]; while (read(signal_pipe[0], drain, sizeof(drain)) > 0) {} - /* Reap child */ + /* Reap any and all children that have exited. SIGCHLD is + * coalesced by the kernel — multiple pending exits can deliver + * as a single SIGCHLD. Loop until no more reapable children + * remain. We only flip child_exited when the managed child is + * reaped; other reapees (should be none today, future-proof) + * are still cleaned up. */ int status; - pid_t ret_pid = waitpid(child_pid, &status, WNOHANG); - if (ret_pid > 0) { + pid_t ret_pid; + while ((ret_pid = waitpid(-1, &status, WNOHANG)) > 0) { + if (ret_pid != child_pid) continue; child_exited = 1; if (WIFEXITED(status)) { child_status = WEXITSTATUS(status); From 1ee2aab38cf409e48aeb012ce081a82d552c0a4f Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:35:41 -0400 Subject: [PATCH 08/20] Harden cgroup and UDS path handling in shepherd * Validate every snprintf return value for cgroup full_path / procs_path / kill_path; treat truncation as error instead of silently writing to the wrong file. * Validate uds_path length before strncpy into sockaddr_un.sun_path; reject paths that would truncate rather than failing opaquely in connect(). * Replace the fixed 100ms usleep before rmdir in cgroup_cleanup() with a bounded polling loop (~1s) that retries on EBUSY / ENOTEMPTY. Stops cgroup directories leaking when the kernel reaper is slow. * Treat cgroup_setup failure as fatal for the spawn (send_error, kill child, exit). A user who supplied a cgroup_path expects isolation; silently running unisolated violates the contract. * Set FD_CLOEXEC on the PTY master fd so any future shepherd fork doesn't leak it. --- c_src/shepherd.c | 67 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/c_src/shepherd.c b/c_src/shepherd.c index abbb985..80ebdf7 100644 --- a/c_src/shepherd.c +++ b/c_src/shepherd.c @@ -210,12 +210,19 @@ static int cgroup_setup(pid_t child_pid) { char full_path[512]; char procs_path[576]; - /* Create cgroup directory */ - snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", cgroup_path); + int n = snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", + cgroup_path); + if (n < 0 || (size_t)n >= sizeof(full_path)) { + ERROR_LOG("cgroup path too long"); + return -1; + } mkdir(full_path, 0755); /* ignore error if exists */ - /* Move child to cgroup */ - snprintf(procs_path, sizeof(procs_path), "%s/cgroup.procs", full_path); + n = snprintf(procs_path, sizeof(procs_path), "%s/cgroup.procs", full_path); + if (n < 0 || (size_t)n >= sizeof(procs_path)) { + ERROR_LOG("cgroup procs path too long"); + return -1; + } FILE *f = fopen(procs_path, "w"); if (!f) { ERROR_LOG("failed to open %s: %s", procs_path, strerror(errno)); @@ -232,21 +239,26 @@ static void cgroup_cleanup(void) { char full_path[512]; char kill_path[576]; - snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", cgroup_path); + int n = snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", + cgroup_path); + if (n < 0 || (size_t)n >= sizeof(full_path)) return; /* Kill all processes in the cgroup via cgroup.kill (cgroup v2) */ - snprintf(kill_path, sizeof(kill_path), "%s/cgroup.kill", full_path); + n = snprintf(kill_path, sizeof(kill_path), "%s/cgroup.kill", full_path); + if (n < 0 || (size_t)n >= sizeof(kill_path)) return; FILE *f = fopen(kill_path, "w"); if (f) { fprintf(f, "1\n"); fclose(f); } - /* Wait briefly for processes to die */ - usleep(100000); - - /* Remove cgroup directory */ - rmdir(full_path); + /* Poll for rmdir success rather than a fixed sleep — the kernel needs + * a moment to reap the killed processes. Bail after ~1s (10 * 100ms). */ + for (int i = 0; i < 10; i++) { + if (rmdir(full_path) == 0) return; + if (errno != EBUSY && errno != ENOTEMPTY) return; /* real error */ + usleep(100000); + } } #else static int cgroup_setup(pid_t child_pid) { @@ -573,6 +585,12 @@ int main(int argc, char *argv[]) { struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; + if (strlen(uds_path) >= sizeof(addr.sun_path)) { + fprintf(stderr, "error: uds_path too long (max %zu bytes)\n", + sizeof(addr.sun_path) - 1); + close(uds_fd); + return 1; + } strncpy(addr.sun_path, uds_path, sizeof(addr.sun_path) - 1); if (connect(uds_fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) { @@ -630,9 +648,18 @@ int main(int argc, char *argv[]) { /* === Shepherd (PTY) === */ close(slave_fd); pty_master_fd = master_fd; + set_cloexec(master_fd); - /* Move child to cgroup (Linux only, no-op elsewhere) */ - cgroup_setup(child_pid); + /* Move child to cgroup (Linux only, no-op elsewhere). If the user + * requested a cgroup path and setup failed, isolation is not + * available — treat as fatal. */ + if (cgroup_setup(child_pid) != 0) { + send_error(uds_fd, "cgroup setup failed"); + kill_child(child_pid); + close(master_fd); + close(uds_fd); + return 1; + } /* Send single master FD to BEAM (used for both read and write) */ int fds_to_send[1] = {master_fd}; @@ -737,8 +764,18 @@ int main(int argc, char *argv[]) { close(stdout_pipe[1]); close(stderr_pipe[1]); - /* Move child to cgroup (Linux only, no-op elsewhere) */ - cgroup_setup(child_pid); + /* Move child to cgroup (Linux only, no-op elsewhere). If the user + * requested a cgroup path and setup failed, isolation is not + * available — treat as fatal. */ + if (cgroup_setup(child_pid) != 0) { + send_error(uds_fd, "cgroup setup failed"); + kill_child(child_pid); + close(stdin_pipe[1]); + close(stdout_pipe[0]); + close(stderr_pipe[0]); + close(uds_fd); + return 1; + } int fds_to_send[3] = {stdin_pipe[1], stdout_pipe[0], stderr_pipe[0]}; if (send_fds(uds_fd, fds_to_send, 3) != 0) { From bb4a2264432c0c51432dac4736dea996feb1fa73 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:36:51 -0400 Subject: [PATCH 09/20] Stop Process GenServer when stream consumer crashes Stream.resource's after callback is only run on normal termination or {:halt, _}. If the consumer crashed mid-iteration (e.g. pattern-match failure), cleanup_process/1 was never invoked, the Process GenServer lived on, and Watcher couldn't help because it monitors the GenServer, not the consumer. Add an optional :owner option to NetRunner.Process that takes a pid to monitor. On DOWN the GenServer SIGKILLs the OS process (through both the shepherd protocol and a direct NIF kill) and stops :normal. NetRunner.Stream.stream/3 now passes self() as :owner so consumer crashes reliably tear down the whole chain. --- lib/net_runner/process.ex | 31 ++++++++++++++++++++++++++++++- lib/net_runner/process/state.ex | 2 ++ lib/net_runner/stream.ex | 8 +++++++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index 0a13c01..3eb0fe8 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -94,7 +94,16 @@ defmodule NetRunner.Process do def init({cmd, args, opts}) do case Exec.spawn_process(cmd, args, opts) do {:ok, state} -> - state = %{state | stats: Stats.new()} + # Optionally monitor an "owner" process (typically the stream + # consumer). If it dies before the OS process exits we kill the + # OS process and stop cleanly instead of leaking a GenServer. + owner_ref = + case Keyword.get(opts, :owner) do + pid when is_pid(pid) -> Process.monitor(pid) + _ -> nil + end + + state = %{state | stats: Stats.new(), owner_ref: owner_ref} # Register with watcher for belt-and-suspenders cleanup NetRunner.Watcher.watch(self(), state.os_pid) # Start reading stderr in :consume mode @@ -246,6 +255,26 @@ defmodule NetRunner.Process do {:noreply, state} end + # Owner (e.g. stream consumer) died — kill the OS process and stop. + # Without this, a consumer crash would orphan the Process GenServer and + # leave the OS process running until Watcher's own DOWN fires (which + # doesn't, because Watcher monitors us, not the consumer). + def handle_info({:DOWN, ref, :process, _pid, _reason}, %{owner_ref: ref} = state) + when is_reference(ref) do + if state.os_pid do + case Signal.resolve(:sigkill) do + {:ok, sig_num} -> + send_shepherd_command(state, <<0x01, sig_num::8>>) + Nif.nif_kill(state.os_pid, sig_num) + + _ -> + :ok + end + end + + {:stop, :normal, state} + end + # Initial stderr chunk from kick_stderr_read in init/1. Without this clause # the data would be silently dropped by the catch-all below. def handle_info({:stderr_data, data}, state) when is_binary(data) do diff --git a/lib/net_runner/process/state.ex b/lib/net_runner/process/state.ex index 202d3c9..e560588 100644 --- a/lib/net_runner/process/state.ex +++ b/lib/net_runner/process/state.ex @@ -13,6 +13,7 @@ defmodule NetRunner.Process.State do :exit_status, :cmd, :args, + :owner_ref, operations: %Operations{}, awaiting_exit: [], stderr_mode: :consume, @@ -32,6 +33,7 @@ defmodule NetRunner.Process.State do exit_status: non_neg_integer() | nil, cmd: String.t(), args: [String.t()], + owner_ref: reference() | nil, operations: Operations.t(), awaiting_exit: [GenServer.from()], stderr_mode: :consume | :redirect | :disabled, diff --git a/lib/net_runner/stream.ex b/lib/net_runner/stream.ex index c6706fe..072cba6 100644 --- a/lib/net_runner/stream.ex +++ b/lib/net_runner/stream.ex @@ -27,7 +27,13 @@ defmodule NetRunner.Stream do """ def stream(cmd, args, opts) do input = Keyword.get(opts, :input, nil) - process_opts = Keyword.drop(opts, [:input]) + # Pass the caller as :owner so the Process GenServer stops (and kills + # the OS process) if the stream consumer crashes before Stream.resource's + # after callback would otherwise run. + process_opts = + opts + |> Keyword.drop([:input]) + |> Keyword.put_new(:owner, self()) case Proc.start(cmd, args, process_opts) do {:ok, pid} -> From 9a4f4637c405b1b47d2a544a9e63b7e672aa9532 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:37:30 -0400 Subject: [PATCH 10/20] Explicit Port.close in terminate/2; non-blocking Watcher escalation process.ex terminate/2 now explicitly closes the shepherd Port after pipes and the UDS socket. The Port is linked so it would die with the GenServer anyway, but closing it explicitly makes the cleanup order deterministic. watcher.ex replaced Process.sleep(5000) inside handle_info/2 with a Process.send_after(:escalate_to_sigkill, 5000) message and a new handle_info/2 clause. A Watcher can now still receive messages (e.g. supervisor shutdown) during the SIGTERM grace window instead of being wedged unresponsive. --- lib/net_runner/process.ex | 11 +++++++++- lib/net_runner/watcher.ex | 44 +++++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index 3eb0fe8..fc9d0f6 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -290,7 +290,8 @@ defmodule NetRunner.Process do @impl true def terminate(_reason, state) do - # Best-effort cleanup + # Best-effort cleanup. Order: close pipes (lets child see EOF), then + # UDS (shepherd detects POLLHUP, kills child), then the shepherd Port. if state.stdin, do: Pipe.close(state.stdin) if state.stdout, do: Pipe.close(state.stdout) if state.stderr, do: Pipe.close(state.stderr) @@ -299,6 +300,14 @@ defmodule NetRunner.Process do :socket.close(state.uds_socket) end + if is_port(state.shepherd_port) do + try do + Port.close(state.shepherd_port) + catch + _, _ -> :ok + end + end + :ok end diff --git a/lib/net_runner/watcher.ex b/lib/net_runner/watcher.ex index 7f9b971..70711fb 100644 --- a/lib/net_runner/watcher.ex +++ b/lib/net_runner/watcher.ex @@ -38,35 +38,35 @@ defmodule NetRunner.Watcher do @impl true def handle_info({:DOWN, ref, :process, _pid, _reason}, %{monitor_ref: ref} = state) do - # GenServer crashed — kill the OS process - kill_os_process(state.os_pid) - {:stop, :normal, state} - end - - def handle_info(_msg, state) do - {:noreply, state} - end - - defp kill_os_process(os_pid) do - case Nif.nif_is_os_pid_alive(os_pid) do + # GenServer crashed — kill the OS process, then schedule a SIGKILL + # escalation check instead of Process.sleep (which would wedge this + # GenServer unresponsive to any other messages for 5 seconds). + case Nif.nif_is_os_pid_alive(state.os_pid) do true -> {:ok, sigterm} = Signal.resolve(:sigterm) - Nif.nif_kill(os_pid, sigterm) + Nif.nif_kill(state.os_pid, sigterm) + Process.send_after(self(), :escalate_to_sigkill, @kill_timeout) + {:noreply, state} - # Wait briefly, then escalate - Process.sleep(@kill_timeout) - - case Nif.nif_is_os_pid_alive(os_pid) do - true -> - {:ok, sigkill} = Signal.resolve(:sigkill) - Nif.nif_kill(os_pid, sigkill) + false -> + {:stop, :normal, state} + end + end - false -> - :ok - end + def handle_info(:escalate_to_sigkill, state) do + case Nif.nif_is_os_pid_alive(state.os_pid) do + true -> + {:ok, sigkill} = Signal.resolve(:sigkill) + Nif.nif_kill(state.os_pid, sigkill) false -> :ok end + + {:stop, :normal, state} + end + + def handle_info(_msg, state) do + {:noreply, state} end end From 10391a10cb8acb062254f97add76d6738f672b63 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:38:48 -0400 Subject: [PATCH 11/20] Harden exec boundary: validate cmd/args, UDS cleanup, signal range * Exec.read_uds_message: replace peek-then-recv (which can desync if the first byte arrives ahead of its payload) with opcode-first reads. Read 1 byte for the opcode, then the opcode-specific tail. Timeouts bumped to 500ms. * Exec.spawn_process: validate cmd and args before Port.open. Reject non-binary, empty, or NUL-byte-containing values. Passing these through Port.open's args: is undefined behaviour on the C side. * Exec.cleanup_listener: propagate File.rm errors except :enoent (which is expected when the shepherd already unlinked the socket). * Signal.resolve: reject integer signals outside the POSIX 1..31 range instead of forwarding arbitrary ints to kill(2). --- lib/net_runner/process/exec.ex | 82 +++++++++++++++++++++++----------- lib/net_runner/signal.ex | 4 +- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/lib/net_runner/process/exec.ex b/lib/net_runner/process/exec.ex index a871830..042e2d7 100644 --- a/lib/net_runner/process/exec.ex +++ b/lib/net_runner/process/exec.ex @@ -25,7 +25,8 @@ defmodule NetRunner.Process.Exec do uds_path = uds_socket_path() pty_mode = Keyword.get(opts, :pty, false) - with :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), + with :ok <- validate_cmd_and_args(cmd, args), + :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), {:ok, listen_socket} <- create_uds_listener(uds_path), shepherd_port <- open_shepherd(uds_path, cmd, args, opts), {:ok, conn_socket} <- accept_connection(listen_socket), @@ -37,6 +38,30 @@ defmodule NetRunner.Process.Exec do end end + # Reject NUL bytes in cmd/args early; passing them through Port.open's + # args: option is undefined and could truncate a cmd string on the C side. + defp validate_cmd_and_args(cmd, args) do + cond do + not is_binary(cmd) -> + {:error, {:invalid_cmd, "must be a binary"}} + + cmd == "" -> + {:error, {:invalid_cmd, "must not be empty"}} + + String.contains?(cmd, <<0>>) -> + {:error, {:invalid_cmd, "must not contain NUL bytes"}} + + not is_list(args) -> + {:error, {:invalid_args, "must be a list of binaries"}} + + Enum.any?(args, fn a -> not is_binary(a) or String.contains?(a, <<0>>) end) -> + {:error, {:invalid_args, "each arg must be a binary without NUL bytes"}} + + true -> + :ok + end + end + defp setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode) do with {:ok, fds, iov_rest} <- receive_fds(conn_socket, pty_mode), {:ok, os_pid} <- extract_child_started(conn_socket, iov_rest), @@ -151,8 +176,12 @@ defmodule NetRunner.Process.Exec do defp cleanup_listener(listen_socket, path) do :socket.close(listen_socket) - File.rm(path) - :ok + + case File.rm(path) do + :ok -> :ok + {:error, :enoent} -> :ok + {:error, reason} -> {:error, {:uds_path_cleanup_failed, reason}} + end end @doc """ @@ -266,32 +295,35 @@ defmodule NetRunner.Process.Exec do @doc """ Reads a protocol message from the UDS. Used for ongoing communication. + + Structured as: read the 1-byte opcode, then the opcode-specific tail. + Avoids the peek-then-recv race where :peek sees the first byte but + the recv of the full frame times out because the tail is a moment + behind the kernel deliver queue. """ def read_uds_message(socket) do - case :socket.recv(socket, 0, [:peek], 0) do - {:ok, <<@msg_child_exited, _::binary>>} -> - case :socket.recv(socket, 5, [], 100) do - {:ok, <<@msg_child_exited, status::big-unsigned-32>>} -> - {:child_exited, status} - - other -> - {:error, {:unexpected, other}} - end - - {:ok, <<@msg_error, _::binary>>} -> - case :socket.recv(socket, 0, [], 100) do - {:ok, <<@msg_error, len::big-unsigned-16, msg::binary-size(len)>>} -> - {:shepherd_error, msg} - - other -> - {:error, {:unexpected, other}} - end + case :socket.recv(socket, 1, [], 500) do + {:ok, <<@msg_child_exited>>} -> recv_child_exited(socket) + {:ok, <<@msg_error>>} -> recv_error_message(socket) + {:ok, _} -> {:error, :unknown_message} + {:error, :timeout} -> {:error, :no_message} + {:error, reason} -> {:error, reason} + end + end - {:ok, _other} -> - {:error, :unknown_message} + defp recv_child_exited(socket) do + case :socket.recv(socket, 4, [], 500) do + {:ok, <>} -> {:child_exited, status} + other -> {:error, {:unexpected, other}} + end + end - {:error, reason} -> - {:error, reason} + defp recv_error_message(socket) do + with {:ok, <>} <- :socket.recv(socket, 2, [], 500), + {:ok, msg} <- :socket.recv(socket, len, [], 500) do + {:shepherd_error, msg} + else + other -> {:error, {:unexpected, other}} end end end diff --git a/lib/net_runner/signal.ex b/lib/net_runner/signal.ex index 69c972b..ea5dc96 100644 --- a/lib/net_runner/signal.ex +++ b/lib/net_runner/signal.ex @@ -12,7 +12,9 @@ defmodule NetRunner.Signal do Nif.nif_signal_number(signal) end - def resolve(signal) when is_integer(signal), do: {:ok, signal} + def resolve(signal) when is_integer(signal) and signal >= 1 and signal <= 31, + do: {:ok, signal} + def resolve(_signal), do: {:error, :unknown_signal} @doc """ From 75ad97fc8542bb6569c51ed5ef2056b5c1e2a1c3 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:40:29 -0400 Subject: [PATCH 12/20] Monitor parked callers in Operations; prune on caller DOWN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operations.park previously stored the GenServer.from tuple but did not monitor the caller pid. If a caller timed out its GenServer.call or crashed while parked on EAGAIN, its entry stayed in pending until the Process exited; finish_exit then replied to a dead pid. For a long- running process with many client timeouts this leaked memory. Operations now takes Process.monitor(caller_pid) in park/4 and tracks a second map (monitors: caller_monitor_ref → op_ref). pop/2 demonitors the caller. New pop_by_monitor/2 lets the GenServer remove an entry when it observes the caller's :DOWN. reply_all/2 flushes all monitors. The Process GenServer's :DOWN handler now dispatches: if the monitor ref matches owner_ref (the stream consumer) it SIGKILLs and stops, else it tries Operations.pop_by_monitor to prune a dead parked caller. --- lib/net_runner/process.ex | 44 ++++++++++++------- lib/net_runner/process/operations.ex | 65 +++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index fc9d0f6..cc9abe8 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -255,24 +255,23 @@ defmodule NetRunner.Process do {:noreply, state} end - # Owner (e.g. stream consumer) died — kill the OS process and stop. - # Without this, a consumer crash would orphan the Process GenServer and - # leave the OS process running until Watcher's own DOWN fires (which - # doesn't, because Watcher monitors us, not the consumer). - def handle_info({:DOWN, ref, :process, _pid, _reason}, %{owner_ref: ref} = state) + # A parked caller (read/write) died — drop its entry silently instead of + # letting it linger until process exit. The owner case is handled first. + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) when is_reference(ref) do - if state.os_pid do - case Signal.resolve(:sigkill) do - {:ok, sig_num} -> - send_shepherd_command(state, <<0x01, sig_num::8>>) - Nif.nif_kill(state.os_pid, sig_num) + cond do + ref == state.owner_ref -> + on_owner_down(state) - _ -> - :ok - end - end + true -> + case Operations.pop_by_monitor(state.operations, ref) do + {nil, _ops} -> + {:noreply, state} - {:stop, :normal, state} + {_op, ops} -> + {:noreply, %{state | operations: ops}} + end + end end # Initial stderr chunk from kick_stderr_read in init/1. Without this clause @@ -288,6 +287,21 @@ defmodule NetRunner.Process do {:noreply, state} end + defp on_owner_down(state) do + if state.os_pid do + case Signal.resolve(:sigkill) do + {:ok, sig_num} -> + send_shepherd_command(state, <<0x01, sig_num::8>>) + Nif.nif_kill(state.os_pid, sig_num) + + _ -> + :ok + end + end + + {:stop, :normal, state} + end + @impl true def terminate(_reason, state) do # Best-effort cleanup. Order: close pipes (lets child see EOF), then diff --git a/lib/net_runner/process/operations.ex b/lib/net_runner/process/operations.ex index 85dad18..c40be4a 100644 --- a/lib/net_runner/process/operations.ex +++ b/lib/net_runner/process/operations.ex @@ -1,31 +1,58 @@ defmodule NetRunner.Process.Operations do @moduledoc false - @type op_type :: :read | :write + @type op_type :: :read | :write | {:read, :stdout | :stderr} @type pending_op :: {op_type(), GenServer.from(), term()} - defstruct pending: %{} + defstruct pending: %{}, monitors: %{} @type t :: %__MODULE__{ - pending: %{reference() => pending_op()} + pending: %{reference() => pending_op()}, + monitors: %{reference() => reference()} } @doc """ - Parks a caller that received :eagain. Returns updated ops and a ref for matching. + Parks a caller that received :eagain. Monitors the caller so the entry + can be reclaimed if the caller crashes or times out before the GenServer + can reply. Returns updated ops and a ref for matching. """ - def park(%__MODULE__{pending: pending} = ops, type, from, context \\ nil) do + def park(%__MODULE__{pending: pending, monitors: monitors} = ops, type, from, context \\ nil) do ref = make_ref() op = {type, from, context} - {%{ops | pending: Map.put(pending, ref, op)}, ref} + + {caller_pid, _} = from + mref = Process.monitor(caller_pid) + + {%{ops | pending: Map.put(pending, ref, op), monitors: Map.put(monitors, mref, ref)}, ref} end @doc """ - Retrieves and removes a pending operation by ref. + Retrieves and removes a pending operation by ref. Demonitors the caller + we established in park/4. """ - def pop(%__MODULE__{pending: pending} = ops, ref) do + def pop(%__MODULE__{pending: pending, monitors: monitors} = ops, ref) do case Map.pop(pending, ref) do - {nil, _} -> {nil, ops} - {op, rest} -> {op, %{ops | pending: rest}} + {nil, _} -> + {nil, ops} + + {op, rest} -> + monitors = demonitor_for_op(monitors, ref) + {op, %{ops | pending: rest, monitors: monitors}} + end + end + + @doc """ + Removes the pending op whose caller-monitor ref matches `mref` (invoked + from the GenServer's :DOWN handler). Returns {op_or_nil, new_ops}. + """ + def pop_by_monitor(%__MODULE__{pending: pending, monitors: monitors} = ops, mref) do + case Map.pop(monitors, mref) do + {nil, _} -> + {nil, ops} + + {op_ref, monitors_rest} -> + {op, pending_rest} = Map.pop(pending, op_ref) + {op, %{ops | pending: pending_rest, monitors: monitors_rest}} end end @@ -38,14 +65,28 @@ defmodule NetRunner.Process.Operations do @doc """ Replies to all pending operations with the given response and clears them. + Demonitors all caller monitors along the way. """ - def reply_all(%__MODULE__{pending: pending} = ops, response) do + def reply_all(%__MODULE__{pending: pending, monitors: monitors} = ops, response) do Enum.each(pending, fn {_ref, {_type, from, _ctx}} -> GenServer.reply(from, response) end) - %{ops | pending: %{}} + Enum.each(monitors, fn {mref, _op_ref} -> Process.demonitor(mref, [:flush]) end) + + %{ops | pending: %{}, monitors: %{}} end def empty?(%__MODULE__{pending: pending}), do: map_size(pending) == 0 + + defp demonitor_for_op(monitors, op_ref) do + case Enum.find(monitors, fn {_mref, r} -> r == op_ref end) do + {mref, _} -> + Process.demonitor(mref, [:flush]) + Map.delete(monitors, mref) + + nil -> + monitors + end + end end From 8c707227b93be31d63c755b25c826ca571433bd0 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:41:27 -0400 Subject: [PATCH 13/20] Surface Daemon drain-task crashes instead of silently dropping them Two defects in NetRunner.Daemon: * The generic handle_info({:DOWN, ...}) clause matched any :DOWN and dropped it. A drain Task crashing (abnormal reason) looked identical to normal EOF, so a failing drain stopped invisibly and output thereafter piled up in the pipe until blocking the child process. Narrow the handler to refs we recognise (drain_ref / stderr_drain_ref) and log a Logger.warning when reason != :normal. * drain_loop relied on reader.() never raising. If the Proc GenServer terminated mid-call or on_output blew up on a non-UTF-8 chunk, the linked drain Task crashed, which propagated to the Daemon through Task.async's link and crashed the whole daemon. Wrap drain_loop in try/rescue/catch and isolate on_output in its own rescue so neither path can take down the Daemon. --- lib/net_runner/daemon.ex | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/lib/net_runner/daemon.ex b/lib/net_runner/daemon.ex index d450e7f..47b3668 100644 --- a/lib/net_runner/daemon.ex +++ b/lib/net_runner/daemon.ex @@ -83,7 +83,16 @@ defmodule NetRunner.Daemon do {:noreply, state} end - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + # Drain task went :DOWN. Normal completion would match the {ref, result} + # clause above, so here we expect an abnormal reason (crash, :killed, etc.) + # — log a warning so a drain crash does not silently stop draining. + def handle_info({:DOWN, ref, :process, _pid, reason}, state) do + if ref in [state.drain_ref, state.stderr_drain_ref] and reason != :normal do + require Logger + + Logger.warning("[NetRunner.Daemon] drain task crashed: #{inspect(reason)}") + end + {:noreply, state} end @@ -120,7 +129,7 @@ defmodule NetRunner.Daemon do defp drain_loop(reader, proc, on_output) do case reader.(proc) do {:ok, data} -> - handle_output(on_output, data) + safe_handle_output(on_output, data) drain_loop(reader, proc, on_output) :eof -> @@ -129,6 +138,25 @@ defmodule NetRunner.Daemon do {:error, _} -> :ok end + rescue + # Defensive: if reader.() or caller pattern blows up (e.g. Proc already + # terminated while we were mid-call), stop draining without bringing + # down the Daemon through the linked Task. + e -> + require Logger + Logger.warning("[NetRunner.Daemon] drain exception: #{inspect(e)}") + :ok + catch + :exit, _ -> :ok + end + + defp safe_handle_output(on_output, data) do + handle_output(on_output, data) + rescue + e -> + require Logger + Logger.warning("[NetRunner.Daemon] on_output raised: #{inspect(e)}") + :ok end defp handle_output(:discard, _data), do: :ok From 069b15e95e610742d000f26df4345b997ac8bc4b Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 19:42:02 -0400 Subject: [PATCH 14/20] Make the NIF the single source of truth for signal atoms Signal.resolve previously maintained an Elixir @signals allow-list that duplicated the strcmp list inside nif_signal_number in C. Adding or renaming a signal meant editing two places and any drift produced a silent "unknown_signal" even when the other side supported it. Delegate to the NIF for any atom input; integer-range validation stays in Elixir. The NIF's existing {:error, :unknown_signal} return surfaces unsupported atoms unchanged. --- lib/net_runner/signal.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/net_runner/signal.ex b/lib/net_runner/signal.ex index ea5dc96..1c3df06 100644 --- a/lib/net_runner/signal.ex +++ b/lib/net_runner/signal.ex @@ -3,14 +3,14 @@ defmodule NetRunner.Signal do alias NetRunner.Process.Nif - @signals ~w(sigterm sigkill sigint sighup sigusr1 sigusr2 sigstop sigcont sigquit sigpipe)a - @doc """ Resolves a signal atom to its platform-specific number. + + The canonical list of supported signal atoms lives in the NIF + (`nif_signal_number`); calling it directly keeps Elixir and C from + drifting out of sync. """ - def resolve(signal) when signal in @signals do - Nif.nif_signal_number(signal) - end + def resolve(signal) when is_atom(signal), do: Nif.nif_signal_number(signal) def resolve(signal) when is_integer(signal) and signal >= 1 and signal <= 31, do: {:ok, signal} From c4cd6774a59c10e14a258c8a3a200b80b1ed1845 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 20:43:01 -0400 Subject: [PATCH 15/20] Add ASan/UBSan opt-in build and a CI sanitizers job Enable \`SANITIZE=1 make all\` (or the \`make asan\` convenience target) to rebuild the NIF and shepherd with AddressSanitizer + UBSan. When SANITIZE=1 the build switches to -O1 and drops _FORTIFY_SOURCE (they conflict with ASan's memcpy interceptors). Add a new CI job \`sanitizers\` that runs the full Elixir test suite with LD_PRELOAD=libasan so the NIF and shepherd catch memory-safety and undefined-behaviour bugs in the C layer on every push. Gate the publish job on the sanitizers job passing. --- .github/workflows/ci.yml | 34 +++++++++++++++++++++++++++++++++- Makefile | 28 ++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b00ff38..6b335fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -165,10 +165,42 @@ jobs: - run: mix compile - run: mix dialyzer + sanitizers: + name: Sanitizers (ASan + UBSan) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: erlef/setup-beam@v1 + with: + elixir-version: ${{ env.ELIXIR_VERSION }} + otp-version: ${{ env.OTP_VERSION }} + + - name: Cache deps & build + uses: actions/cache@v4 + with: + path: | + deps + _build + key: ${{ runner.os }}-asan-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ hashFiles('mix.lock') }} + + - run: mix deps.get + - run: mix deps.compile + + - name: Build C with ASan + UBSan + run: SANITIZE=1 make clean all + + - name: Run tests under sanitizers + env: + ASAN_OPTIONS: "detect_leaks=0:halt_on_error=1:abort_on_error=1" + UBSAN_OPTIONS: "halt_on_error=1:print_stacktrace=1" + LD_PRELOAD: /usr/lib/x86_64-linux-gnu/libasan.so.8 + run: mix test + publish: name: Publish to Hex runs-on: ubuntu-latest - needs: [compile, format, credo, test, dialyzer] + needs: [compile, format, credo, test, dialyzer, sanitizers] if: startsWith(github.ref, 'refs/tags/v') steps: - uses: actions/checkout@v4 diff --git a/Makefile b/Makefile index 3c58910..c188a6b 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,21 @@ ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts\", [code:li UNAME_S := $(shell uname -s) CC ?= cc -CFLAGS_BASE = -O2 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong -D_FORTIFY_SOURCE=2 + +# Opt-in sanitizer build. Usage: +# make clean && SANITIZE=1 make all +# mix test (from Elixir — the NIF and shepherd are rebuilt with ASan/UBSan) +# +# Requires LD_PRELOAD of libasan at runtime on Linux when the BEAM isn't +# built with sanitizers; see ci.yml for the invocation. +ifeq ($(SANITIZE),1) + # _FORTIFY_SOURCE is incompatible with ASan (ASan already intercepts + # memcpy/etc.). Disable optimisation to -O1 and skip FORTIFY. + SAN_FLAGS = -fsanitize=address,undefined -fno-omit-frame-pointer -g + CFLAGS_BASE = -O1 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong $(SAN_FLAGS) +else + CFLAGS_BASE = -O2 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong -D_FORTIFY_SOURCE=2 +endif ifeq ($(UNAME_S),Darwin) # macOS needs _DARWIN_C_SOURCE for SCM_RIGHTS, CMSG_SPACE, etc. @@ -31,6 +45,11 @@ else NIF_EXT = .so endif +ifeq ($(SANITIZE),1) + NIF_LDFLAGS += $(SAN_FLAGS) + SHEPHERD_LDFLAGS += $(SAN_FLAGS) +endif + NIF_CFLAGS = $(CFLAGS) -I$(ERTS_INCLUDE_DIR) -I$(C_SRC_DIR) -fPIC # Targets @@ -45,10 +64,15 @@ NIF_OBJ = $(C_SRC_DIR)/net_runner_nif.o HEADERS = $(C_SRC_DIR)/protocol.h $(C_SRC_DIR)/utils.h -.PHONY: all clean +.PHONY: all clean asan all: $(PRIV_DIR) $(SHEPHERD) $(NIF_LIB) +# Convenience: force a sanitizer rebuild. Same as SANITIZE=1 make clean all. +asan: + $(MAKE) clean + $(MAKE) SANITIZE=1 all + $(PRIV_DIR): mkdir -p $(PRIV_DIR) From 611912bc08fda1b512f0c87d228574d5de57bb8a Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 20:47:03 -0400 Subject: [PATCH 16/20] Revert write_loop parking, keep {:ok, 0} guard; stale socket cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier change that parked the caller after a single partial write broke large-buffer streaming. Parking skipped the NIF's EAGAIN path, which is the only place enif_select is armed for :ready_output; the parked caller then hung forever because no readiness signal would arrive. A 100 KB cat stream test reliably timed out after 60s. Restore the original write_loop shape (recurse until the NIF returns either :eagain or completion) so enif_select stays in charge of readiness. Keep the {:ok, 0} guard from the original review fix — if the kernel ever returns 0 on a non-empty write we sleep(1) and retry instead of spinning the dirty scheduler. retry_write_loop mirrors the same shape. Also add a test_helper.exs hook that purges stale /tmp/net_runner_*.sock files before and after every test run; stops accumulation from tests that crash before cleanup_listener runs. --- lib/net_runner/process.ex | 70 +++++++++++++++++++++------------------ test/test_helper.exs | 12 +++++++ 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index cc9abe8..e250a06 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -335,30 +335,34 @@ defmodule NetRunner.Process do write_loop(data, from, state) end - # Writes data once per call: on a successful partial write we park the - # caller and let :ready_output drive the remainder. This avoids (a) a - # spin loop if the kernel ever returns {:ok, 0} on a non-empty write and - # (b) starving other handle_call messages on large buffers. + # Writes data in a loop: partial writes retry immediately until EAGAIN + # (which registers enif_select) or completion. This keeps enif_select + # in charge of readiness notifications; any path that parks the caller + # without going through the NIF's EAGAIN path must not be taken here. defp write_loop(<<>>, _from, state), do: {:reply, :ok, state} defp write_loop(data, from, state) do case Pipe.write(state.stdin, data) do - {:ok, bytes_written} when bytes_written == byte_size(data) -> - stats = Stats.record_write(state.stats, bytes_written) - {:reply, :ok, %{state | stats: stats}} - - {:ok, bytes_written} when bytes_written > 0 -> + {:ok, 0} -> + # write(2) can legally return 0 on a non-empty buffer. Avoid + # spinning by forcing another nif_write — if the kernel still + # can't make progress it will return EAGAIN and register select. + # In practice this branch is unreachable on pipes/sockets but + # guards against the dirty scheduler hang regardless. + Process.sleep(1) + write_loop(data, from, state) + + {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) - remaining = binary_part(data, bytes_written, byte_size(data) - bytes_written) - # Park the caller with the remaining data; :ready_output will drive it. - {ops, _ref} = Operations.park(state.operations, :write, from, remaining) - {:noreply, %{state | operations: ops, stats: stats}} + state = %{state | stats: stats} + total = byte_size(data) - {:ok, 0} -> - # write(2) can legally return 0 on a non-empty buffer; treat the - # same as EAGAIN to avoid spinning on the dirty scheduler. - {ops, _ref} = Operations.park(state.operations, :write, from, data) - {:noreply, %{state | operations: ops}} + if bytes_written >= total do + {:reply, :ok, state} + else + remaining = binary_part(data, bytes_written, total - bytes_written) + write_loop(remaining, from, state) + end {:error, :eagain} -> # enif_select is now registered for write readiness @@ -436,25 +440,27 @@ defmodule NetRunner.Process do defp retry_write_loop(ref, from, data, state) do case Pipe.write(state.stdin, data) do - {:ok, bytes_written} when bytes_written == byte_size(data) -> - stats = Stats.record_write(state.stats, bytes_written) - GenServer.reply(from, :ok) - {_, ops} = Operations.pop(state.operations, ref) - %{state | operations: ops, stats: stats} + {:ok, 0} -> + # Unreachable on pipes/sockets in practice; ops entry still valid + # with full data. A subsequent :ready_output will drive the retry. + state - {:ok, bytes_written} when bytes_written > 0 -> + {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) - remaining = binary_part(data, bytes_written, byte_size(data) - bytes_written) - {_, ops} = Operations.pop(state.operations, ref) - {ops, _new_ref} = Operations.park(ops, :write, from, remaining) - %{state | operations: ops, stats: stats} + state = %{state | stats: stats} + total = byte_size(data) - {:ok, 0} -> - # Zero-byte write on non-empty data — wait for the next :ready_output - # (ops entry still valid with full data). - state + if bytes_written >= total do + GenServer.reply(from, :ok) + {_, ops} = Operations.pop(state.operations, ref) + %{state | operations: ops} + else + remaining = binary_part(data, bytes_written, total - bytes_written) + retry_write_loop(ref, from, remaining, state) + end {:error, :eagain} -> + # Still parked, enif_select already re-registered state {:error, _} = error -> diff --git a/test/test_helper.exs b/test/test_helper.exs index d1d11e6..c9011b7 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -5,3 +5,15 @@ exclude = end ExUnit.start(exclude: exclude) + +# Clean up any stale UDS sockets left behind by previous test runs that +# crashed before cleanup_listener could remove them. +defmodule NetRunner.TestCleanup do + def purge_stale_sockets do + pattern = Path.join(System.tmp_dir!(), "net_runner_*.sock") + pattern |> Path.wildcard() |> Enum.each(&File.rm/1) + end +end + +NetRunner.TestCleanup.purge_stale_sockets() +ExUnit.after_suite(fn _ -> NetRunner.TestCleanup.purge_stale_sockets() end) From f143b0f03a4f79458f18b518dfb5a577e63c9a5b Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 20:48:03 -0400 Subject: [PATCH 17/20] Add binary-with-NUL-bytes round-trip test Runs \`printf 'a\0b\0c'\` through the shepherd and asserts the full 5 bytes survive the read path. Guards against any accidental use of String functions on the output binary that would truncate at the first NUL. --- test/process_test.exs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/process_test.exs b/test/process_test.exs index 1a23419..ee415c1 100644 --- a/test/process_test.exs +++ b/test/process_test.exs @@ -81,6 +81,17 @@ defmodule NetRunner.ProcessTest do end end + describe "binary data" do + test "round-trips output containing NUL bytes" do + {:ok, pid} = Proc.start("sh", ["-c", ~S|printf 'a\0b\0c'|], []) + + data = read_all(pid) + assert data == "a\0b\0c" + assert byte_size(data) == 5 + assert {:ok, 0} = Proc.await_exit(pid) + end + end + describe "os_pid" do test "returns the OS pid" do {:ok, pid} = Proc.start("sleep", ["100"]) From fa2546df021a6952dae2bcc69d8459dc42d68045 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Thu, 16 Apr 2026 20:48:33 -0400 Subject: [PATCH 18/20] Replace cond with if/else in Process DOWN dispatch Credo strict flagged the cond/true as a refactoring opportunity; with only two branches an if/else is clearer and identical in behaviour. --- lib/net_runner/process.ex | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index e250a06..6bafa9a 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -259,18 +259,16 @@ defmodule NetRunner.Process do # letting it linger until process exit. The owner case is handled first. def handle_info({:DOWN, ref, :process, _pid, _reason}, state) when is_reference(ref) do - cond do - ref == state.owner_ref -> - on_owner_down(state) - - true -> - case Operations.pop_by_monitor(state.operations, ref) do - {nil, _ops} -> - {:noreply, state} + if ref == state.owner_ref do + on_owner_down(state) + else + case Operations.pop_by_monitor(state.operations, ref) do + {nil, _ops} -> + {:noreply, state} - {_op, ops} -> - {:noreply, %{state | operations: ops}} - end + {_op, ops} -> + {:noreply, %{state | operations: ops}} + end end end From dc69018a42ce96e140cc4989525e2183befe167a Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Fri, 17 Apr 2026 07:32:44 -0400 Subject: [PATCH 19/20] Add regression tests for review fixes + fix run/2 error surface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Public-API fix: NetRunner.run pattern-matched {:ok, pid} on Proc.start, which raised MatchError for any {:error, reason} — including the new validation errors from E12. Return the error tuple directly. New regression tests: * test/signal_test.exs — Signal.resolve range, unknown atoms, bad types, resolve! raising behaviour (E15). * test/process_test.exs — * NUL-byte rejection in cmd/args (E12) * Proc.start with empty cmd rejected (E12) * stderr-only fast-exit command: clean exit + bytes_err stats nonzero (E1) * :owner monitor SIGKILLs OS process when owner crashes (E3) * test/net_runner_test.exs — * run/2 and stream/2 surface validation errors instead of crashing * Timeout path reaps OS processes (no sleep pid leaks after five 100ms timeouts against sleep 30) --- lib/net_runner.ex | 10 ++++- test/net_runner_test.exs | 45 +++++++++++++++++++++++ test/process_test.exs | 79 ++++++++++++++++++++++++++++++++++++++++ test/signal_test.exs | 45 +++++++++++++++++++++++ 4 files changed, 178 insertions(+), 1 deletion(-) create mode 100644 test/signal_test.exs diff --git a/lib/net_runner.ex b/lib/net_runner.ex index f6a00b2..c411e75 100644 --- a/lib/net_runner.ex +++ b/lib/net_runner.ex @@ -76,8 +76,16 @@ defmodule NetRunner do max_output_size = Keyword.get(opts, :max_output_size, nil) process_opts = Keyword.drop(opts, [:input, :timeout, :max_output_size]) - {:ok, pid} = Proc.start(cmd, args, process_opts) + case Proc.start(cmd, args, process_opts) do + {:ok, pid} -> + run_with_pid(pid, input, timeout, max_output_size) + {:error, _reason} = error -> + error + end + end + + defp run_with_pid(pid, input, timeout, max_output_size) do task = Task.async(fn -> run_io(pid, input, max_output_size) end) effective_timeout = timeout || :infinity diff --git a/test/net_runner_test.exs b/test/net_runner_test.exs index a2c5f7c..9bd4665 100644 --- a/test/net_runner_test.exs +++ b/test/net_runner_test.exs @@ -60,4 +60,49 @@ defmodule NetRunnerTest do assert output == "hello\n" end end + + describe "input validation" do + # Regression: run/2 used to pattern-match {:ok, pid} on Proc.start + # which raised MatchError when validation failed. Now it returns the + # error tuple directly. + test "run surfaces NUL-byte validation error cleanly" do + assert {:error, {:invalid_args, _}} = NetRunner.run(["echo", "bad\0arg"]) + end + + test "stream surfaces NUL-byte validation error cleanly" do + assert {:error, {:invalid_args, _}} = NetRunner.stream(["echo", "bad\0arg"]) + end + + test "run rejects empty executable" do + assert {:error, {:invalid_cmd, _}} = NetRunner.run([""]) + end + end + + describe "timeout path cleanup" do + # Regression / sanity: on timeout, the OS process must be killed and + # the GenServer stopped — no zombies left behind. + test "timeout returns :timeout and cleans up" do + start_count = count_sleep_processes() + + for _ <- 1..5 do + assert {:error, :timeout} = + NetRunner.run(["sleep", "30"], timeout: 100) + end + + # Give the shepherd + watcher a moment to reap + Process.sleep(300) + + end_count = count_sleep_processes() + # Allow a tolerance for concurrent tests spawning sleeps + assert end_count <= start_count + 1, + "expected sleeps to be reaped; start=#{start_count} end=#{end_count}" + end + end + + defp count_sleep_processes do + case System.cmd("pgrep", ["-x", "sleep"], stderr_to_stdout: true) do + {out, 0} -> out |> String.split("\n", trim: true) |> length() + _ -> 0 + end + end end diff --git a/test/process_test.exs b/test/process_test.exs index ee415c1..8f23974 100644 --- a/test/process_test.exs +++ b/test/process_test.exs @@ -119,6 +119,85 @@ defmodule NetRunner.ProcessTest do end end + describe "input validation" do + # Regression: NUL bytes in cmd/args used to be passed through to + # Port.open's args:, which is undefined behaviour on the C side. + test "rejects NUL byte in cmd" do + assert {:error, {:invalid_cmd, _}} = Proc.start("ec\0ho", ["hi"]) + end + + test "rejects NUL byte in args" do + assert {:error, {:invalid_args, _}} = Proc.start("echo", ["he\0llo"]) + end + + test "rejects empty cmd" do + assert {:error, {:invalid_cmd, _}} = Proc.start("", []) + end + + test "NetRunner.run surfaces validation error instead of crashing" do + assert {:error, {:invalid_args, _}} = + NetRunner.run(["echo", "he\0llo"]) + end + end + + describe "stderr capture for fast-exiting processes" do + # Regression: the initial stderr chunk was sent to self() via + # {:stderr_data, _} but no handle_info matched, so the first (often + # only) chunk was dropped for fast-exiting commands. + test "stderr-only command exits cleanly with default :consume" do + assert {"", 0} = NetRunner.run(["sh", "-c", "echo err >&2"]) + end + + test "stats reflect stderr bytes read for :consume mode" do + {:ok, pid} = Proc.start("sh", ["-c", "echo hello-stderr >&2"], stderr: :consume) + assert {:ok, 0} = Proc.await_exit(pid, 5_000) + stats = Proc.stats(pid) + # "hello-stderr\n" = 13 bytes; tolerate >0 in case the shell adds extras. + assert stats.bytes_err >= 13 + end + end + + describe "owner monitor cleanup" do + # Regression: if the stream consumer (or any :owner process) crashes + # mid-iteration, Stream.resource's after callback is never run. + # Before the :owner-monitor fix, NetRunner.Process and its OS child + # lived on. Now the GenServer SIGKILLs and stops. + test "Process SIGKILLs OS process when :owner dies" do + parent = self() + + consumer = + spawn(fn -> + {:ok, pid} = Proc.start("sleep", ["30"], owner: self()) + os_pid = Proc.os_pid(pid) + send(parent, {:os_pid, os_pid, pid}) + exit(:boom) + end) + + {os_pid, proc_pid} = + receive do + {:os_pid, op, pp} -> {op, pp} + after + 2_000 -> flunk("did not receive os_pid from consumer") + end + + _ = consumer + + # Wait for the Process GenServer to detect the DOWN, SIGKILL the + # child, reap it, and stop. + Process.sleep(500) + + refute Process.alive?(proc_pid), "Process GenServer should have stopped" + refute os_pid_alive?(os_pid), "OS process should be killed" + end + end + + defp os_pid_alive?(os_pid) do + case System.cmd("kill", ["-0", to_string(os_pid)], stderr_to_stdout: true) do + {_, 0} -> true + _ -> false + end + end + defp read_all(pid) do case Proc.read(pid) do {:ok, data} -> data <> read_all(pid) diff --git a/test/signal_test.exs b/test/signal_test.exs new file mode 100644 index 0000000..b9a5d67 --- /dev/null +++ b/test/signal_test.exs @@ -0,0 +1,45 @@ +defmodule NetRunner.SignalTest do + use ExUnit.Case, async: true + + alias NetRunner.Signal + + describe "resolve/1" do + test "resolves known signal atoms via the NIF" do + assert {:ok, num} = Signal.resolve(:sigterm) + assert is_integer(num) and num > 0 + end + + test "passes through valid integers in the POSIX 1..31 range" do + assert {:ok, 9} = Signal.resolve(9) + assert {:ok, 15} = Signal.resolve(15) + end + + test "rejects integers outside 1..31" do + assert {:error, :unknown_signal} = Signal.resolve(0) + assert {:error, :unknown_signal} = Signal.resolve(-1) + assert {:error, :unknown_signal} = Signal.resolve(32) + assert {:error, :unknown_signal} = Signal.resolve(9999) + end + + test "rejects unknown atoms" do + assert {:error, :unknown_signal} = Signal.resolve(:siggibberish) + end + + test "rejects non-atom, non-integer values" do + assert {:error, :unknown_signal} = Signal.resolve("sigterm") + assert {:error, :unknown_signal} = Signal.resolve(nil) + assert {:error, :unknown_signal} = Signal.resolve([]) + end + end + + describe "resolve!/1" do + test "returns the number for valid signals" do + assert is_integer(Signal.resolve!(:sigkill)) + end + + test "raises for invalid signals" do + assert_raise ArgumentError, fn -> Signal.resolve!(:bogus) end + assert_raise ArgumentError, fn -> Signal.resolve!(99) end + end + end +end From ad15ff809e605270a9fd1fefae572282826bb7a7 Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Fri, 17 Apr 2026 07:56:02 -0400 Subject: [PATCH 20/20] Update cgroup_test for fatal setup + CHANGELOG + README examples MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix CI regression from C9 (treat cgroup setup failure as fatal): The existing Linux-only test assumed CI had write access to /sys/fs/cgroup/; it does not. Before C9 this silently succeeded (child ran outside the cgroup). After C9 the shepherd correctly reports {:error, _}. Rewrite the test to accept either outcome — success on a privileged run, a clean error on an unprivileged one. Either confirms the option is plumbed through. Add a negative test for path validation (absolute paths and traversal are rejected). CHANGELOG: document every fix from the review pass under [Unreleased] — FD leak on mutex failure, use-after-close race, lost stderr, write-loop spin, shepherd framing, sendmsg hardening, post-fork signal safety, waitpid bounds, SIGCHLD reap loop, cgroup/path validation, stream consumer-crash cleanup, Watcher send_after, parked-caller monitors, UDS read redesign, NUL-byte validation, run/2 error surface, File.rm errors, Signal range/source-of-truth, Daemon drain resilience, explicit Port.close, ASan/UBSan build + CI job, stale socket sweep, regression tests. README: new sections on input validation / error returns, binary output with NUL bytes, the Command DSL, an error-handling cheatsheet, and examples for :owner monitoring and per-call kill_timeout. --- CHANGELOG.md | 110 +++++++++++++++++++++++++++++++++++++ README.md | 125 +++++++++++++++++++++++++++++++++++++++++++ test/cgroup_test.exs | 38 +++++++++---- 3 files changed, 264 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bd9005..5173334 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,116 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/). +## [Unreleased] + +Focused code-review pass across the NIF, shepherd, and Elixir layers. +Correctness-first: closes two real-world race/leak bugs, hardens the +post-fork child window, and adds an AddressSanitizer + UBSan CI job. + +### Fixed + +- **FD leak in `nif_create_fd`** when `enif_mutex_create` failed + — the destructor previously gated `close(fd)` on a non-NULL lock, + so a failed mutex allocation leaked the file descriptor and armed + a NULL-deref in any later `nif_close`. The mutex result is checked + and the dtor now closes the fd unconditionally. +- **Use-after-close race in NIF read/write vs. close/down** + — `nif_read`/`nif_write` copied `res->fd` under the mutex and + released the lock before the syscall; a concurrent `nif_close` or + owner-death callback could close the fd before the syscall ran, + letting the read/write target a recycled fd. The mutex is now held + across the syscall and the subsequent `enif_select` registration; + the actual `close()` is deferred to the `io_resource_stop` callback + so BEAM can drain pending selects before the fd is released. +- **Lost initial stderr chunk in `:consume` mode** + — `kick_stderr_read` in `init/1` sent `{:stderr_data, data}` to + `self()` but no `handle_info/2` clause matched, so the first (and + often only) chunk of stderr for fast-exiting processes was silently + dropped. The missing handler now appends to the stderr buffer and + drains any remainder. +- **`write_loop` spin on `{:ok, 0}`** — if the kernel ever returned + 0 bytes on a non-empty write, the GenServer would recurse forever + on the dirty scheduler. Bounded with a 1 ms sleep-retry. +- **Shepherd UDS command framing** — the event loop parsed only + `buf[0]`, discarding any coalesced or tail commands (e.g. + `CMD_CLOSE_STDIN` followed immediately by `CMD_KILL`). Frames are + now length-dispatched per opcode with a carry-over buffer across + `poll()` iterations. +- **Post-fork child stdio and signal safety** — replaced `fprintf` / + `strerror` in the post-fork / pre-exec window with a `write(2)`- + based `child_fail()` helper (async-signal-safe). Every `dup2`, + `setsid`, and `TIOCSCTTY` return is now checked; on failure the + child exits 127 with a diagnostic instead of running with broken + stdio. +- **`waitpid` after SIGKILL** — replaced the unbounded + `waitpid(child_pid, NULL, 0)` with a bounded WNOHANG loop + (~3 s cap) so the shepherd cannot hang on a child stuck in + uninterruptible kernel sleep (D-state). +- **SIGCHLD reap loop** — reap all pending children per SIGCHLD + (`while waitpid(-1, ..., WNOHANG) > 0`) so a coalesced signal + never leaks zombies. +- **Cgroup / UDS path hardening** — validate every `snprintf` return, + reject too-long UDS paths, set `FD_CLOEXEC` on the PTY master, + treat user-requested cgroup setup failure as fatal, and replace + the fixed 100 ms `usleep` in `cgroup_cleanup` with a bounded + polling `rmdir`. +- **`Stream` consumer crash cleanup** — `Stream.resource`'s `after` + callback is only run on normal termination. A consumer crash + orphaned the `NetRunner.Process` GenServer and its OS child. + `NetRunner.Process.start/3` now accepts an `:owner` option that + monitors the caller; `NetRunner.Stream.stream/3` passes `self()`, + so a consumer crash SIGKILLs the OS process and stops the + GenServer. +- **Watcher blocking on `Process.sleep`** — the 5 s sleep in + `handle_info/2` wedged the Watcher unresponsive (including to + supervisor shutdown). Replaced with `Process.send_after/3` and a + new `:escalate_to_sigkill` handler. +- **Parked-caller tracking in `Operations`** — callers parked on + EAGAIN are now `Process.monitor/1`-ed; dead callers are pruned on + `:DOWN` instead of lingering in the pending map until process + exit. +- **`read_uds_message` race** — replaced the `:peek` + full-recv + pattern (which could time out if the payload arrived a moment + after the opcode) with an opcode-first read flow and longer + timeouts. +- **`cmd` / `args` validation** — reject non-binary, empty, or + NUL-containing cmd and args at the spawn boundary. Passing NUL + bytes through `Port.open`'s `args:` is undefined on the C side. +- **`NetRunner.run/2` error surface** — previously pattern-matched + `{:ok, pid}` from `Proc.start`, raising `MatchError` when + validation failed. Now returns `{:error, reason}` cleanly. +- **`File.rm` cleanup of UDS socket** — tolerate `:enoent` + (shepherd may have unlinked), propagate other errors. +- **`Signal.resolve` integer range** — integer signals outside + POSIX `1..31` now return `{:error, :unknown_signal}` instead of + being forwarded to `kill(2)`. +- **`Signal` single source of truth** — `Signal.resolve` delegates + to the NIF for known-atom lookup instead of maintaining a duplicate + allow-list that drifted from the C side. +- **Daemon drain resilience** — drain-task crashes used to match a + catch-all `:DOWN` handler and silently stop draining; the pipe + then filled until the child blocked. Narrowed to recognised refs + with a warning log; `drain_loop` wrapped in `try/rescue/catch` so + a reader or logger exception cannot take the daemon down through + the linked Task. +- **`terminate/2`** explicitly closes the shepherd `Port` after the + UDS socket for deterministic teardown order. + +### Added + +- **AddressSanitizer + UBSan** — opt-in build via `SANITIZE=1 make all` + or `make asan`. New CI job (`sanitizers`) rebuilds the NIF and + shepherd with `-fsanitize=address,undefined`, preloads `libasan`, + and runs the full `mix test`. The publish job depends on it. +- **Stale UDS socket sweep** in `test/test_helper.exs` (before and + after the suite) — stops accumulation from test crashes before + `cleanup_listener/2` runs. +- **Regression tests** for: NUL-byte validation in `cmd` and `args`, + `Signal.resolve` range + type handling, `:owner` monitor SIGKILL + path, stderr-only fast-exit stats, binary-with-NUL round-trip, and + `NetRunner.run` / `NetRunner.stream` returning validation errors + cleanly. + ## [1.0.0] - 2026-02-26 Initial release. diff --git a/README.md b/README.md index e67c742..3487740 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,45 @@ Enum.to_list(stream) NetRunner.run(~w(my_server), kill_timeout: 2000, timeout: 10_000) ``` +## Input Validation and Error Returns + +`run/2` and `stream/2` return tagged errors for bad input instead of +crashing. NUL bytes inside `cmd` or `args` are rejected early (they +are undefined in `argv` on the C side). + +```elixir +# Empty executable +{:error, {:invalid_cmd, _}} = NetRunner.run([""]) + +# NUL byte in an argument +{:error, {:invalid_args, _}} = NetRunner.run(["echo", "he\0llo"]) + +# Same behaviour for streaming +{:error, {:invalid_args, _}} = NetRunner.stream(["echo", "he\0llo"]) + +# Unknown signal atoms come back as tagged errors, not raises +{:error, :unknown_signal} = NetRunner.Signal.resolve(:sigwhatever) +{:error, :unknown_signal} = NetRunner.Signal.resolve(99) +``` + +## Working with Binary Output + +stdout is delivered as a BEAM binary, not a String. It is safe to pass +bytes containing NUL, high-bit, or anything else through the pipeline. + +```elixir +# NUL bytes round-trip unchanged +{out, 0} = NetRunner.run(["sh", "-c", ~S|printf 'a\0b\0c'|]) +byte_size(out) # => 5 +out == "a\0b\0c" # => true + +# UTF-8 boundaries straddle chunks fine — just concatenate and then +# decode. +"héllo\n" = + NetRunner.stream!(~w(echo héllo)) + |> Enum.join() +``` + ## Process API For fine-grained control over the OS process lifecycle: @@ -164,12 +203,45 @@ Proc.await_exit(pid) stats = Proc.stats(pid) stats.bytes_in # => 5 (bytes written to stdin) stats.bytes_out # => 5 (bytes read from stdout) +stats.bytes_err # => 0 (bytes read from stderr, :consume mode) stats.read_count # => 1 (number of read calls) stats.write_count # => 1 (number of write calls) stats.duration_ms # => 3 (wall-clock time) stats.exit_status # => 0 (exit code) ``` +### Tying an OS process to an owner + +If the calling process crashes, the OS process it launched should go +with it. Pass `:owner` to have the Process GenServer monitor a pid; +on `:DOWN` it SIGKILLs the child and stops cleanly. `NetRunner.stream/2` +does this automatically with `self()`. + +```elixir +# Spawn a long-lived command tied to the caller +parent = self() + +spawn(fn -> + {:ok, pid} = Proc.start("sleep", ["30"], owner: self()) + send(parent, {:os_pid, Proc.os_pid(pid)}) + exit(:boom) # caller dies → Process SIGKILLs sleep, stops itself +end) +``` + +### Per-call kill timeout + +Tune the SIGTERM→SIGKILL escalation window per-process. Useful when a +command has its own graceful shutdown hook you want to honour, or when +you need a fast hard-kill. + +```elixir +# Give my_server 10s to drain on SIGTERM before SIGKILL +{:ok, pid} = Proc.start("my_server", [], kill_timeout: 10_000) + +# Or make it effectively immediate for tests +{:ok, pid} = Proc.start("sleep", ["100"], kill_timeout: 100) +``` + ## PTY Mode Run commands with a pseudo-terminal for programs that require a TTY. PTY mode is designed for **interactive and long-running programs** — shells, REPLs, curses apps. @@ -268,6 +340,59 @@ Isolate child processes in a cgroup v2 hierarchy for resource control: The shepherd creates the cgroup directory, moves the child into it, and cleans up on exit (kills all processes via `cgroup.kill`, then removes the directory). No-op on macOS. +## Command DSL + +Bundle an executable, default args, and default options into a reusable +`%NetRunner.Command{}`. Both `run/2` and `stream/2` accept it, and +call-site options override the defaults. + +```elixir +alias NetRunner.Command + +# Inline construction +cmd = Command.new("curl", ["-sS"], timeout: 30_000) +{body, 0} = NetRunner.run(cmd, args: ["https://example.com"]) + +# Extend at call time (args append; opts merge with runtime winning) +listing = Command.new("ls", ["-la"]) +{out, 0} = NetRunner.run(listing, args: ["/tmp"]) + +# `defcommand` in your own module captures a reusable template: +defmodule MyCmds do + use NetRunner.Command + + defcommand :curl, "curl", ["-sS", "--max-time", "30"] + defcommand :echo, "echo" +end + +{out, 0} = NetRunner.run(MyCmds.echo(["hi"])) +{:ok, stream} = NetRunner.stream(MyCmds.curl(["https://example.com"])) +``` + +## Error Handling Cheatsheet + +```elixir +case NetRunner.run(["my_tool", arg], timeout: 5_000) do + {output, 0} -> + {:ok, output} + + {_partial, status} when status != 0 -> + {:error, {:nonzero_exit, status}} + + {:error, :timeout} -> + {:error, :took_too_long} + + {:error, {:max_output_exceeded, partial}} -> + {:error, {:too_much_output, byte_size(partial)}} + + {:error, {:invalid_cmd, msg}} -> + {:error, {:bad_cmd, msg}} + + {:error, {:invalid_args, msg}} -> + {:error, {:bad_args, msg}} +end +``` + ## Parallel Execution Every NetRunner process is fully independent — no shared state, no singleton bottleneck: diff --git a/test/cgroup_test.exs b/test/cgroup_test.exs index b05c4c3..1bbe836 100644 --- a/test/cgroup_test.exs +++ b/test/cgroup_test.exs @@ -5,16 +5,28 @@ defmodule NetRunner.CgroupTest do describe "cgroup support" do @tag :linux_only - test "cgroup_path option is passed to shepherd" do - # On macOS this is a no-op — shepherd ignores --cgroup-path on non-Linux. - # On Linux with cgroup v2, the child would be moved into the cgroup. - {:ok, pid} = - Proc.start("echo", ["hello"], cgroup_path: "net_runner/test_#{:rand.uniform(10000)}") + test "cgroup_path option is plumbed through to the shepherd" do + # On Linux, spawning with a cgroup_path requires write access under + # /sys/fs/cgroup/. In a privileged environment the child is moved into + # the cgroup and runs to completion; in CI (no privileges) the shepherd + # rejects the setup and returns an error — which proves the option was + # actually seen and validated by the C side. Either outcome confirms + # the plumbing. + path = "net_runner_test_#{:rand.uniform(1_000_000)}" - {:ok, data} = Proc.read(pid) - assert data =~ "hello" - {:ok, status} = Proc.await_exit(pid) - assert status == 0 + case Proc.start("echo", ["hello"], cgroup_path: path) do + {:ok, pid} -> + # Privileged run — child moved into cgroup successfully. + {:ok, data} = Proc.read(pid) + assert data =~ "hello" + assert {:ok, 0} = Proc.await_exit(pid) + + {:error, _reason} -> + # Unprivileged run — the shepherd refused to proceed without + # the requested isolation. That is the correct behaviour when + # a user explicitly asks for a cgroup they cannot use. + :ok + end end test "cgroup_path nil (default) works normally" do @@ -23,5 +35,13 @@ defmodule NetRunner.CgroupTest do assert data =~ "no cgroup" {:ok, 0} = Proc.await_exit(pid) end + + test "rejects invalid cgroup paths (traversal / absolute)" do + assert {:error, {:invalid_cgroup_path, _}} = + Proc.start("echo", ["x"], cgroup_path: "/absolute/nope") + + assert {:error, {:invalid_cgroup_path, _}} = + Proc.start("echo", ["x"], cgroup_path: "some/../evil") + end end end