diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b00ff38..6b335fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -165,10 +165,42 @@ jobs: - run: mix compile - run: mix dialyzer + sanitizers: + name: Sanitizers (ASan + UBSan) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: erlef/setup-beam@v1 + with: + elixir-version: ${{ env.ELIXIR_VERSION }} + otp-version: ${{ env.OTP_VERSION }} + + - name: Cache deps & build + uses: actions/cache@v4 + with: + path: | + deps + _build + key: ${{ runner.os }}-asan-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ hashFiles('mix.lock') }} + + - run: mix deps.get + - run: mix deps.compile + + - name: Build C with ASan + UBSan + run: SANITIZE=1 make clean all + + - name: Run tests under sanitizers + env: + ASAN_OPTIONS: "detect_leaks=0:halt_on_error=1:abort_on_error=1" + UBSAN_OPTIONS: "halt_on_error=1:print_stacktrace=1" + LD_PRELOAD: /usr/lib/x86_64-linux-gnu/libasan.so.8 + run: mix test + publish: name: Publish to Hex runs-on: ubuntu-latest - needs: [compile, format, credo, test, dialyzer] + needs: [compile, format, credo, test, dialyzer, sanitizers] if: startsWith(github.ref, 'refs/tags/v') steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bd9005..5173334 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,116 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/). +## [Unreleased] + +Focused code-review pass across the NIF, shepherd, and Elixir layers. +Correctness-first: closes two real-world race/leak bugs, hardens the +post-fork child window, and adds an AddressSanitizer + UBSan CI job. + +### Fixed + +- **FD leak in `nif_create_fd`** when `enif_mutex_create` failed + — the destructor previously gated `close(fd)` on a non-NULL lock, + so a failed mutex allocation leaked the file descriptor and armed + a NULL-deref in any later `nif_close`. The mutex result is checked + and the dtor now closes the fd unconditionally. +- **Use-after-close race in NIF read/write vs. close/down** + — `nif_read`/`nif_write` copied `res->fd` under the mutex and + released the lock before the syscall; a concurrent `nif_close` or + owner-death callback could close the fd before the syscall ran, + letting the read/write target a recycled fd. The mutex is now held + across the syscall and the subsequent `enif_select` registration; + the actual `close()` is deferred to the `io_resource_stop` callback + so BEAM can drain pending selects before the fd is released. +- **Lost initial stderr chunk in `:consume` mode** + — `kick_stderr_read` in `init/1` sent `{:stderr_data, data}` to + `self()` but no `handle_info/2` clause matched, so the first (and + often only) chunk of stderr for fast-exiting processes was silently + dropped. The missing handler now appends to the stderr buffer and + drains any remainder. +- **`write_loop` spin on `{:ok, 0}`** — if the kernel ever returned + 0 bytes on a non-empty write, the GenServer would recurse forever + on the dirty scheduler. Bounded with a 1 ms sleep-retry. +- **Shepherd UDS command framing** — the event loop parsed only + `buf[0]`, discarding any coalesced or tail commands (e.g. + `CMD_CLOSE_STDIN` followed immediately by `CMD_KILL`). Frames are + now length-dispatched per opcode with a carry-over buffer across + `poll()` iterations. +- **Post-fork child stdio and signal safety** — replaced `fprintf` / + `strerror` in the post-fork / pre-exec window with a `write(2)`- + based `child_fail()` helper (async-signal-safe). Every `dup2`, + `setsid`, and `TIOCSCTTY` return is now checked; on failure the + child exits 127 with a diagnostic instead of running with broken + stdio. +- **`waitpid` after SIGKILL** — replaced the unbounded + `waitpid(child_pid, NULL, 0)` with a bounded WNOHANG loop + (~3 s cap) so the shepherd cannot hang on a child stuck in + uninterruptible kernel sleep (D-state). +- **SIGCHLD reap loop** — reap all pending children per SIGCHLD + (`while waitpid(-1, ..., WNOHANG) > 0`) so a coalesced signal + never leaks zombies. +- **Cgroup / UDS path hardening** — validate every `snprintf` return, + reject too-long UDS paths, set `FD_CLOEXEC` on the PTY master, + treat user-requested cgroup setup failure as fatal, and replace + the fixed 100 ms `usleep` in `cgroup_cleanup` with a bounded + polling `rmdir`. +- **`Stream` consumer crash cleanup** — `Stream.resource`'s `after` + callback is only run on normal termination. A consumer crash + orphaned the `NetRunner.Process` GenServer and its OS child. + `NetRunner.Process.start/3` now accepts an `:owner` option that + monitors the caller; `NetRunner.Stream.stream/3` passes `self()`, + so a consumer crash SIGKILLs the OS process and stops the + GenServer. +- **Watcher blocking on `Process.sleep`** — the 5 s sleep in + `handle_info/2` wedged the Watcher unresponsive (including to + supervisor shutdown). Replaced with `Process.send_after/3` and a + new `:escalate_to_sigkill` handler. +- **Parked-caller tracking in `Operations`** — callers parked on + EAGAIN are now `Process.monitor/1`-ed; dead callers are pruned on + `:DOWN` instead of lingering in the pending map until process + exit. +- **`read_uds_message` race** — replaced the `:peek` + full-recv + pattern (which could time out if the payload arrived a moment + after the opcode) with an opcode-first read flow and longer + timeouts. +- **`cmd` / `args` validation** — reject non-binary, empty, or + NUL-containing cmd and args at the spawn boundary. Passing NUL + bytes through `Port.open`'s `args:` is undefined on the C side. +- **`NetRunner.run/2` error surface** — previously pattern-matched + `{:ok, pid}` from `Proc.start`, raising `MatchError` when + validation failed. Now returns `{:error, reason}` cleanly. +- **`File.rm` cleanup of UDS socket** — tolerate `:enoent` + (shepherd may have unlinked), propagate other errors. +- **`Signal.resolve` integer range** — integer signals outside + POSIX `1..31` now return `{:error, :unknown_signal}` instead of + being forwarded to `kill(2)`. +- **`Signal` single source of truth** — `Signal.resolve` delegates + to the NIF for known-atom lookup instead of maintaining a duplicate + allow-list that drifted from the C side. +- **Daemon drain resilience** — drain-task crashes used to match a + catch-all `:DOWN` handler and silently stop draining; the pipe + then filled until the child blocked. Narrowed to recognised refs + with a warning log; `drain_loop` wrapped in `try/rescue/catch` so + a reader or logger exception cannot take the daemon down through + the linked Task. +- **`terminate/2`** explicitly closes the shepherd `Port` after the + UDS socket for deterministic teardown order. + +### Added + +- **AddressSanitizer + UBSan** — opt-in build via `SANITIZE=1 make all` + or `make asan`. New CI job (`sanitizers`) rebuilds the NIF and + shepherd with `-fsanitize=address,undefined`, preloads `libasan`, + and runs the full `mix test`. The publish job depends on it. +- **Stale UDS socket sweep** in `test/test_helper.exs` (before and + after the suite) — stops accumulation from test crashes before + `cleanup_listener/2` runs. +- **Regression tests** for: NUL-byte validation in `cmd` and `args`, + `Signal.resolve` range + type handling, `:owner` monitor SIGKILL + path, stderr-only fast-exit stats, binary-with-NUL round-trip, and + `NetRunner.run` / `NetRunner.stream` returning validation errors + cleanly. + ## [1.0.0] - 2026-02-26 Initial release. diff --git a/Makefile b/Makefile index 3c58910..c188a6b 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,21 @@ ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts\", [code:li UNAME_S := $(shell uname -s) CC ?= cc -CFLAGS_BASE = -O2 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong -D_FORTIFY_SOURCE=2 + +# Opt-in sanitizer build. Usage: +# make clean && SANITIZE=1 make all +# mix test (from Elixir — the NIF and shepherd are rebuilt with ASan/UBSan) +# +# Requires LD_PRELOAD of libasan at runtime on Linux when the BEAM isn't +# built with sanitizers; see ci.yml for the invocation. +ifeq ($(SANITIZE),1) + # _FORTIFY_SOURCE is incompatible with ASan (ASan already intercepts + # memcpy/etc.). Disable optimisation to -O1 and skip FORTIFY. + SAN_FLAGS = -fsanitize=address,undefined -fno-omit-frame-pointer -g + CFLAGS_BASE = -O1 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong $(SAN_FLAGS) +else + CFLAGS_BASE = -O2 -Wall -Wextra -Werror -std=c99 -fstack-protector-strong -D_FORTIFY_SOURCE=2 +endif ifeq ($(UNAME_S),Darwin) # macOS needs _DARWIN_C_SOURCE for SCM_RIGHTS, CMSG_SPACE, etc. @@ -31,6 +45,11 @@ else NIF_EXT = .so endif +ifeq ($(SANITIZE),1) + NIF_LDFLAGS += $(SAN_FLAGS) + SHEPHERD_LDFLAGS += $(SAN_FLAGS) +endif + NIF_CFLAGS = $(CFLAGS) -I$(ERTS_INCLUDE_DIR) -I$(C_SRC_DIR) -fPIC # Targets @@ -45,10 +64,15 @@ NIF_OBJ = $(C_SRC_DIR)/net_runner_nif.o HEADERS = $(C_SRC_DIR)/protocol.h $(C_SRC_DIR)/utils.h -.PHONY: all clean +.PHONY: all clean asan all: $(PRIV_DIR) $(SHEPHERD) $(NIF_LIB) +# Convenience: force a sanitizer rebuild. Same as SANITIZE=1 make clean all. +asan: + $(MAKE) clean + $(MAKE) SANITIZE=1 all + $(PRIV_DIR): mkdir -p $(PRIV_DIR) diff --git a/README.md b/README.md index e67c742..3487740 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,45 @@ Enum.to_list(stream) NetRunner.run(~w(my_server), kill_timeout: 2000, timeout: 10_000) ``` +## Input Validation and Error Returns + +`run/2` and `stream/2` return tagged errors for bad input instead of +crashing. NUL bytes inside `cmd` or `args` are rejected early (they +are undefined in `argv` on the C side). + +```elixir +# Empty executable +{:error, {:invalid_cmd, _}} = NetRunner.run([""]) + +# NUL byte in an argument +{:error, {:invalid_args, _}} = NetRunner.run(["echo", "he\0llo"]) + +# Same behaviour for streaming +{:error, {:invalid_args, _}} = NetRunner.stream(["echo", "he\0llo"]) + +# Unknown signal atoms come back as tagged errors, not raises +{:error, :unknown_signal} = NetRunner.Signal.resolve(:sigwhatever) +{:error, :unknown_signal} = NetRunner.Signal.resolve(99) +``` + +## Working with Binary Output + +stdout is delivered as a BEAM binary, not a String. It is safe to pass +bytes containing NUL, high-bit, or anything else through the pipeline. + +```elixir +# NUL bytes round-trip unchanged +{out, 0} = NetRunner.run(["sh", "-c", ~S|printf 'a\0b\0c'|]) +byte_size(out) # => 5 +out == "a\0b\0c" # => true + +# UTF-8 boundaries straddle chunks fine — just concatenate and then +# decode. +"héllo\n" = + NetRunner.stream!(~w(echo héllo)) + |> Enum.join() +``` + ## Process API For fine-grained control over the OS process lifecycle: @@ -164,12 +203,45 @@ Proc.await_exit(pid) stats = Proc.stats(pid) stats.bytes_in # => 5 (bytes written to stdin) stats.bytes_out # => 5 (bytes read from stdout) +stats.bytes_err # => 0 (bytes read from stderr, :consume mode) stats.read_count # => 1 (number of read calls) stats.write_count # => 1 (number of write calls) stats.duration_ms # => 3 (wall-clock time) stats.exit_status # => 0 (exit code) ``` +### Tying an OS process to an owner + +If the calling process crashes, the OS process it launched should go +with it. Pass `:owner` to have the Process GenServer monitor a pid; +on `:DOWN` it SIGKILLs the child and stops cleanly. `NetRunner.stream/2` +does this automatically with `self()`. + +```elixir +# Spawn a long-lived command tied to the caller +parent = self() + +spawn(fn -> + {:ok, pid} = Proc.start("sleep", ["30"], owner: self()) + send(parent, {:os_pid, Proc.os_pid(pid)}) + exit(:boom) # caller dies → Process SIGKILLs sleep, stops itself +end) +``` + +### Per-call kill timeout + +Tune the SIGTERM→SIGKILL escalation window per-process. Useful when a +command has its own graceful shutdown hook you want to honour, or when +you need a fast hard-kill. + +```elixir +# Give my_server 10s to drain on SIGTERM before SIGKILL +{:ok, pid} = Proc.start("my_server", [], kill_timeout: 10_000) + +# Or make it effectively immediate for tests +{:ok, pid} = Proc.start("sleep", ["100"], kill_timeout: 100) +``` + ## PTY Mode Run commands with a pseudo-terminal for programs that require a TTY. PTY mode is designed for **interactive and long-running programs** — shells, REPLs, curses apps. @@ -268,6 +340,59 @@ Isolate child processes in a cgroup v2 hierarchy for resource control: The shepherd creates the cgroup directory, moves the child into it, and cleans up on exit (kills all processes via `cgroup.kill`, then removes the directory). No-op on macOS. +## Command DSL + +Bundle an executable, default args, and default options into a reusable +`%NetRunner.Command{}`. Both `run/2` and `stream/2` accept it, and +call-site options override the defaults. + +```elixir +alias NetRunner.Command + +# Inline construction +cmd = Command.new("curl", ["-sS"], timeout: 30_000) +{body, 0} = NetRunner.run(cmd, args: ["https://example.com"]) + +# Extend at call time (args append; opts merge with runtime winning) +listing = Command.new("ls", ["-la"]) +{out, 0} = NetRunner.run(listing, args: ["/tmp"]) + +# `defcommand` in your own module captures a reusable template: +defmodule MyCmds do + use NetRunner.Command + + defcommand :curl, "curl", ["-sS", "--max-time", "30"] + defcommand :echo, "echo" +end + +{out, 0} = NetRunner.run(MyCmds.echo(["hi"])) +{:ok, stream} = NetRunner.stream(MyCmds.curl(["https://example.com"])) +``` + +## Error Handling Cheatsheet + +```elixir +case NetRunner.run(["my_tool", arg], timeout: 5_000) do + {output, 0} -> + {:ok, output} + + {_partial, status} when status != 0 -> + {:error, {:nonzero_exit, status}} + + {:error, :timeout} -> + {:error, :took_too_long} + + {:error, {:max_output_exceeded, partial}} -> + {:error, {:too_much_output, byte_size(partial)}} + + {:error, {:invalid_cmd, msg}} -> + {:error, {:bad_cmd, msg}} + + {:error, {:invalid_args, msg}} -> + {:error, {:bad_args, msg}} +end +``` + ## Parallel Execution Every NetRunner process is fully independent — no shared state, no singleton bottleneck: diff --git a/c_src/net_runner_nif.c b/c_src/net_runner_nif.c index d860379..b7e4983 100644 --- a/c_src/net_runner_nif.c +++ b/c_src/net_runner_nif.c @@ -38,13 +38,17 @@ static ErlNifResourceType *io_resource_type = NULL; static void io_resource_dtor(ErlNifEnv *env, void *obj) { (void)env; io_resource_t *res = (io_resource_t *)obj; + /* Close fd even if mutex construction failed — otherwise ENOMEM during + * nif_create_fd would leak the underlying FD. */ if (res->lock) { enif_mutex_lock(res->lock); - if (!res->closed && res->fd >= 0) { - close(res->fd); - res->fd = -1; - res->closed = 1; - } + } + if (!res->closed && res->fd >= 0) { + close(res->fd); + res->fd = -1; + res->closed = 1; + } + if (res->lock) { enif_mutex_unlock(res->lock); enif_mutex_destroy(res->lock); res->lock = NULL; @@ -55,30 +59,36 @@ static void io_resource_stop(ErlNifEnv *env, void *obj, ErlNifEvent event, int is_direct_call) { (void)env; (void)obj; - (void)event; (void)is_direct_call; - /* enif_select stop callback - FD is being deselected */ + /* BEAM guarantees no further use of this event by the NIF is in flight + * when this callback runs. Safe to close the underlying fd here. */ + if ((int)event >= 0) { + close((int)event); + } } static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon) { - (void)env; (void)pid; (void)mon; io_resource_t *res = (io_resource_t *)obj; - /* Owner process died - close the FD */ + int fd_to_stop = -1; if (res->lock) { enif_mutex_lock(res->lock); if (!res->closed && res->fd >= 0) { - enif_select(env, (ErlNifEvent)res->fd, ERL_NIF_SELECT_STOP, - obj, NULL, enif_make_atom(env, "undefined")); - close(res->fd); + fd_to_stop = res->fd; res->fd = -1; res->closed = 1; } res->monitor_active = 0; enif_mutex_unlock(res->lock); } + if (fd_to_stop >= 0) { + /* Hand fd off to the stop callback — it will close it after any + * in-flight enif_select completes. */ + enif_select(env, (ErlNifEvent)fd_to_stop, ERL_NIF_SELECT_STOP, + obj, NULL, enif_make_atom(env, "undefined")); + } } static ErlNifResourceTypeInit io_resource_init = { @@ -167,6 +177,13 @@ static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc, res->owner = owner; res->monitor_active = 0; + if (!res->lock) { + /* Mutex allocation failed — release resource (dtor will close fd) */ + enif_release_resource(res); + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "mutex_failed")); + } + /* Monitor the owner process */ if (enif_monitor_process(env, res, &owner, &res->monitor) == 0) { res->monitor_active = 1; @@ -199,42 +216,51 @@ static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc, } if (max_bytes > 1048576) max_bytes = 1048576; /* Cap at 1MB */ + ErlNifBinary bin; + if (!enif_alloc_binary(max_bytes, &bin)) { + return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "alloc_failed")); + } + + /* Hold the lock across read() + enif_select so that a concurrent + * nif_close / down callback cannot close the fd mid-syscall. read() is + * non-blocking (O_NONBLOCK) so the lock is held only briefly. */ enif_mutex_lock(res->lock); - if (res->closed) { + if (res->closed || res->fd < 0) { enif_mutex_unlock(res->lock); + enif_release_binary(&bin); return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "closed")); } int fd = res->fd; - enif_mutex_unlock(res->lock); - - ErlNifBinary bin; - if (!enif_alloc_binary(max_bytes, &bin)) { - return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "alloc_failed")); - } ssize_t n = read(fd, bin.data, bin.size); + int saved_errno = errno; + if (n > 0) { + enif_mutex_unlock(res->lock); enif_realloc_binary(&bin, (size_t)n); return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &bin)); - } else if (n == 0) { + } + if (n == 0) { + enif_mutex_unlock(res->lock); enif_release_binary(&bin); return atom_eof; - } else { + } + if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { + int sel_ret = enif_select(env, (ErlNifEvent)fd, + ERL_NIF_SELECT_READ, res, NULL, + atom_undefined); + enif_mutex_unlock(res->lock); enif_release_binary(&bin); - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* Register for select notification */ - int sel_ret = enif_select(env, (ErlNifEvent)fd, - ERL_NIF_SELECT_READ, res, NULL, - atom_undefined); - if (sel_ret < 0) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, "select_failed")); - } - return enif_make_tuple2(env, atom_error, atom_eagain); + if (sel_ret < 0) { + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "select_failed")); } - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(errno))); + return enif_make_tuple2(env, atom_error, atom_eagain); } + enif_mutex_unlock(res->lock); + enif_release_binary(&bin); + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, errno_to_atom(saved_errno))); } /* @@ -261,34 +287,39 @@ static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc, return enif_make_tuple2(env, atom_ok, enif_make_int(env, 0)); } + /* Hold the lock across write() + enif_select so that a concurrent + * close cannot reap the fd mid-syscall. */ enif_mutex_lock(res->lock); - if (res->closed) { + if (res->closed || res->fd < 0) { enif_mutex_unlock(res->lock); return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "closed")); } int fd = res->fd; - enif_mutex_unlock(res->lock); ssize_t n = write(fd, bin.data, bin.size); + int saved_errno = errno; + if (n >= 0) { + enif_mutex_unlock(res->lock); return enif_make_tuple2(env, atom_ok, enif_make_int64(env, (int64_t)n)); - } else { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - int sel_ret = enif_select(env, (ErlNifEvent)fd, - ERL_NIF_SELECT_WRITE, res, NULL, - atom_undefined); - if (sel_ret < 0) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, "select_failed")); - } - return enif_make_tuple2(env, atom_error, atom_eagain); - } - if (errno == EPIPE) { - return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "epipe")); + } + if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { + int sel_ret = enif_select(env, (ErlNifEvent)fd, + ERL_NIF_SELECT_WRITE, res, NULL, + atom_undefined); + enif_mutex_unlock(res->lock); + if (sel_ret < 0) { + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, "select_failed")); } - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(errno))); + return enif_make_tuple2(env, atom_error, atom_eagain); } + enif_mutex_unlock(res->lock); + if (saved_errno == EPIPE) { + return enif_make_tuple2(env, atom_error, MAKE_ATOM(env, "epipe")); + } + return enif_make_tuple2(env, atom_error, + MAKE_ATOM(env, errno_to_atom(saved_errno))); } /* @@ -315,27 +346,19 @@ static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc, res->closed = 1; res->fd = -1; - /* Deregister from enif_select before closing */ - enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, res, NULL, - atom_undefined); - if (res->monitor_active) { enif_demonitor_process(env, res, &res->monitor); res->monitor_active = 0; } - /* Close FD inside critical section to prevent TOCTOU race: - * a concurrent nif_read/nif_write on a dirty scheduler could copy the FD - * under lock then use it after we release the lock but before close(). */ - int close_ret = close(fd); - int close_errno = errno; - enif_mutex_unlock(res->lock); - if (close_ret != 0 && close_errno != EINTR) { - return enif_make_tuple2(env, atom_error, - MAKE_ATOM(env, errno_to_atom(close_errno))); - } + /* Hand fd off to the stop callback — BEAM waits for any in-flight select + * registration to drain before calling stop, which then close()s the fd. + * Concurrent nif_read/nif_write serialize on res->lock; once they observe + * closed==1 they early-out without touching the fd. */ + enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, res, NULL, + atom_undefined); return atom_ok; } diff --git a/c_src/shepherd.c b/c_src/shepherd.c index 2ea0d83..80ebdf7 100644 --- a/c_src/shepherd.c +++ b/c_src/shepherd.c @@ -45,6 +45,28 @@ /* Self-pipe for signal handling */ static int signal_pipe[2] = {-1, -1}; +/* + * Async-signal-safe post-fork failure path. Between fork() and exec*(), + * POSIX allows only async-signal-safe functions, so we cannot call + * fprintf / strerror / malloc. This helper uses write(2) on a stack + * buffer only. + */ +static void child_fail(const char *tag, const char *detail) { + if (tag) { + size_t i = 0; + while (i < 64 && tag[i] != '\0') i++; + (void)!write(STDERR_FILENO, tag, i); + (void)!write(STDERR_FILENO, ": ", 2); + } + if (detail) { + size_t i = 0; + while (i < 256 && detail[i] != '\0') i++; + (void)!write(STDERR_FILENO, detail, i); + } + (void)!write(STDERR_FILENO, "\n", 1); + _exit(127); +} + static void sigchld_handler(int sig) { (void)sig; int saved_errno = errno; @@ -75,7 +97,10 @@ static int send_fds(int uds_fd, int *fds, int nfds) { size_t cmsg_space = CMSG_SPACE((size_t)nfds * sizeof(int)); char *cmsg_buf = calloc(1, cmsg_space); - if (!cmsg_buf) return -1; + if (!cmsg_buf) { + ERROR_LOG("send_fds: calloc(%zu) failed", cmsg_space); + return -1; + } struct msghdr msg = {0}; msg.msg_iov = &iov; @@ -85,6 +110,7 @@ static int send_fds(int uds_fd, int *fds, int nfds) { struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); if (!cmsg) { + ERROR_LOG("send_fds: CMSG_FIRSTHDR returned NULL"); free(cmsg_buf); return -1; } @@ -93,9 +119,18 @@ static int send_fds(int uds_fd, int *fds, int nfds) { cmsg->cmsg_len = CMSG_LEN((size_t)nfds * sizeof(int)); memcpy(CMSG_DATA(cmsg), fds, (size_t)nfds * sizeof(int)); - ssize_t ret = sendmsg(uds_fd, &msg, 0); - free(cmsg_buf); - return ret > 0 ? 0 : -1; + /* Retry on EINTR; treat anything other than a full 1-byte send as error. */ + for (;;) { + ssize_t ret = sendmsg(uds_fd, &msg, 0); + if (ret == 1) { + free(cmsg_buf); + return 0; + } + if (ret < 0 && errno == EINTR) continue; + ERROR_LOG("sendmsg failed: ret=%zd errno=%s", ret, strerror(errno)); + free(cmsg_buf); + return -1; + } } /* @@ -175,12 +210,19 @@ static int cgroup_setup(pid_t child_pid) { char full_path[512]; char procs_path[576]; - /* Create cgroup directory */ - snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", cgroup_path); + int n = snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", + cgroup_path); + if (n < 0 || (size_t)n >= sizeof(full_path)) { + ERROR_LOG("cgroup path too long"); + return -1; + } mkdir(full_path, 0755); /* ignore error if exists */ - /* Move child to cgroup */ - snprintf(procs_path, sizeof(procs_path), "%s/cgroup.procs", full_path); + n = snprintf(procs_path, sizeof(procs_path), "%s/cgroup.procs", full_path); + if (n < 0 || (size_t)n >= sizeof(procs_path)) { + ERROR_LOG("cgroup procs path too long"); + return -1; + } FILE *f = fopen(procs_path, "w"); if (!f) { ERROR_LOG("failed to open %s: %s", procs_path, strerror(errno)); @@ -197,21 +239,26 @@ static void cgroup_cleanup(void) { char full_path[512]; char kill_path[576]; - snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", cgroup_path); + int n = snprintf(full_path, sizeof(full_path), "/sys/fs/cgroup/%s", + cgroup_path); + if (n < 0 || (size_t)n >= sizeof(full_path)) return; /* Kill all processes in the cgroup via cgroup.kill (cgroup v2) */ - snprintf(kill_path, sizeof(kill_path), "%s/cgroup.kill", full_path); + n = snprintf(kill_path, sizeof(kill_path), "%s/cgroup.kill", full_path); + if (n < 0 || (size_t)n >= sizeof(kill_path)) return; FILE *f = fopen(kill_path, "w"); if (f) { fprintf(f, "1\n"); fclose(f); } - /* Wait briefly for processes to die */ - usleep(100000); - - /* Remove cgroup directory */ - rmdir(full_path); + /* Poll for rmdir success rather than a fixed sleep — the kernel needs + * a moment to reap the killed processes. Bail after ~1s (10 * 100ms). */ + for (int i = 0; i < 10; i++) { + if (rmdir(full_path) == 0) return; + if (errno != EBUSY && errno != ENOTEMPTY) return; /* real error */ + usleep(100000); + } } #else static int cgroup_setup(pid_t child_pid) { @@ -250,17 +297,40 @@ static void kill_child(pid_t child_pid) { /* Escalate to SIGKILL the whole process group */ kill(-child_pid, SIGKILL); - waitpid(child_pid, NULL, 0); + + /* Bounded WNOHANG reap loop — avoid hanging forever if the child is + * stuck in uninterruptible kernel sleep (D-state). After the bound + * elapses we return anyway; cgroup cleanup + the kernel eventually + * reap. */ + int sigkill_iters = 30; /* ~3s total at 100ms per iteration */ + for (int i = 0; i < sigkill_iters; i++) { + pid_t ret = waitpid(child_pid, NULL, WNOHANG); + if (ret > 0 || (ret < 0 && errno == ECHILD)) break; + usleep(100000); + } /* Cleanup cgroup (kills any remaining processes, removes dir) */ cgroup_cleanup(); } /* - * Handle a command received from BEAM over UDS. + * Length of a single framed command given its opcode. Returns 0 if the + * opcode is unknown (in which case the parser will skip one byte). + */ +static size_t command_length(uint8_t opcode) { + switch (opcode) { + case CMD_KILL: return 2; + case CMD_CLOSE_STDIN: return 1; + case CMD_SET_WINSIZE: return 5; + default: return 0; + } +} + +/* + * Handle a single command frame. */ static void handle_command(int uds_fd, pid_t child_pid, int stdin_w, - uint8_t *buf, ssize_t len) { + uint8_t *buf, size_t len) { (void)uds_fd; if (len < 1) return; @@ -300,6 +370,36 @@ static void handle_command(int uds_fd, pid_t child_pid, int stdin_w, } } +/* + * Parse and dispatch all framed commands present in buf. Returns the number + * of bytes consumed (may be less than len if a tail command is truncated). + */ +static size_t handle_commands(int uds_fd, pid_t child_pid, int *stdin_w, + uint8_t *buf, size_t len) { + size_t off = 0; + while (off < len) { + size_t clen = command_length(buf[off]); + if (clen == 0) { + /* Unknown opcode: skip one byte to make progress rather than + * stalling the parser on bad input. */ + DEBUG_LOG("unknown command opcode 0x%02x, skipping", buf[off]); + off += 1; + continue; + } + if (off + clen > len) { + /* Partial tail — caller must carry this over to the next read. */ + break; + } + uint8_t op = buf[off]; + handle_command(uds_fd, child_pid, *stdin_w, &buf[off], clen); + if (op == CMD_CLOSE_STDIN) { + *stdin_w = -1; + } + off += clen; + } + return off; +} + /* * Main event loop using poll(). * @@ -312,6 +412,11 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { int child_status = -1; int child_exited = 0; + /* Carry-over buffer for partially-framed commands across reads. Max + * incoming frame is CMD_SET_WINSIZE (5 bytes); keep a little slack. */ + uint8_t cbuf[64]; + size_t cbuf_used = 0; + fds[0].fd = uds_fd; fds[0].events = POLLIN; fds[1].fd = signal_pipe[0]; @@ -336,19 +441,25 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { } if (fds[0].revents & POLLIN) { - uint8_t buf[16]; - ssize_t n = read(uds_fd, buf, sizeof(buf)); + ssize_t n = read(uds_fd, cbuf + cbuf_used, sizeof(cbuf) - cbuf_used); if (n > 0) { - handle_command(uds_fd, child_pid, stdin_w, buf, n); - /* If CMD_CLOSE_STDIN was handled, mark stdin as closed */ - if (buf[0] == CMD_CLOSE_STDIN) { - stdin_w = -1; + cbuf_used += (size_t)n; + size_t consumed = handle_commands(uds_fd, child_pid, &stdin_w, + cbuf, cbuf_used); + if (consumed > 0 && consumed < cbuf_used) { + memmove(cbuf, cbuf + consumed, cbuf_used - consumed); } + cbuf_used -= consumed; } else if (n == 0) { /* BEAM closed the socket */ DEBUG_LOG("BEAM closed UDS, killing child %d", child_pid); kill_child(child_pid); return -1; + } else if (errno != EAGAIN && errno != EWOULDBLOCK && + errno != EINTR) { + ERROR_LOG("read(uds) failed: %s", strerror(errno)); + kill_child(child_pid); + return -1; } } @@ -358,10 +469,16 @@ static int event_loop(int uds_fd, pid_t child_pid, int stdin_w) { char drain[64]; while (read(signal_pipe[0], drain, sizeof(drain)) > 0) {} - /* Reap child */ + /* Reap any and all children that have exited. SIGCHLD is + * coalesced by the kernel — multiple pending exits can deliver + * as a single SIGCHLD. Loop until no more reapable children + * remain. We only flip child_exited when the managed child is + * reaped; other reapees (should be none today, future-proof) + * are still cleaned up. */ int status; - pid_t ret_pid = waitpid(child_pid, &status, WNOHANG); - if (ret_pid > 0) { + pid_t ret_pid; + while ((ret_pid = waitpid(-1, &status, WNOHANG)) > 0) { + if (ret_pid != child_pid) continue; child_exited = 1; if (WIFEXITED(status)) { child_status = WEXITSTATUS(status); @@ -468,6 +585,12 @@ int main(int argc, char *argv[]) { struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; + if (strlen(uds_path) >= sizeof(addr.sun_path)) { + fprintf(stderr, "error: uds_path too long (max %zu bytes)\n", + sizeof(addr.sun_path) - 1); + close(uds_fd); + return 1; + } strncpy(addr.sun_path, uds_path, sizeof(addr.sun_path) - 1); if (connect(uds_fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) { @@ -506,27 +629,37 @@ int main(int argc, char *argv[]) { close(signal_pipe[1]); close(master_fd); - /* Create new session and set controlling terminal */ - setsid(); - ioctl(slave_fd, TIOCSCTTY, 0); + /* Create new session (required before acquiring controlling tty) */ + if (setsid() == (pid_t)-1) child_fail("setsid", NULL); + if (ioctl(slave_fd, TIOCSCTTY, 0) != 0) child_fail("TIOCSCTTY", NULL); - dup2(slave_fd, STDIN_FILENO); - dup2(slave_fd, STDOUT_FILENO); - dup2(slave_fd, STDERR_FILENO); + if (dup2(slave_fd, STDIN_FILENO) != STDIN_FILENO || + dup2(slave_fd, STDOUT_FILENO) != STDOUT_FILENO || + dup2(slave_fd, STDERR_FILENO) != STDERR_FILENO) { + child_fail("dup2", NULL); + } if (slave_fd > STDERR_FILENO) close(slave_fd); setpgid(0, 0); execvp(cmd, cmd_args); - fprintf(stderr, "execvp failed: %s: %s\n", cmd, strerror(errno)); - _exit(127); + child_fail("execvp", cmd); } /* === Shepherd (PTY) === */ close(slave_fd); pty_master_fd = master_fd; + set_cloexec(master_fd); - /* Move child to cgroup (Linux only, no-op elsewhere) */ - cgroup_setup(child_pid); + /* Move child to cgroup (Linux only, no-op elsewhere). If the user + * requested a cgroup path and setup failed, isolation is not + * available — treat as fatal. */ + if (cgroup_setup(child_pid) != 0) { + send_error(uds_fd, "cgroup setup failed"); + kill_child(child_pid); + close(master_fd); + close(uds_fd); + return 1; + } /* Send single master FD to BEAM (used for both read and write) */ int fds_to_send[1] = {master_fd}; @@ -611,9 +744,11 @@ int main(int argc, char *argv[]) { close(stdout_pipe[0]); close(stderr_pipe[0]); - dup2(stdin_pipe[0], STDIN_FILENO); - dup2(stdout_pipe[1], STDOUT_FILENO); - dup2(stderr_pipe[1], STDERR_FILENO); + if (dup2(stdin_pipe[0], STDIN_FILENO) != STDIN_FILENO || + dup2(stdout_pipe[1], STDOUT_FILENO) != STDOUT_FILENO || + dup2(stderr_pipe[1], STDERR_FILENO) != STDERR_FILENO) { + child_fail("dup2", NULL); + } close(stdin_pipe[0]); close(stdout_pipe[1]); @@ -621,8 +756,7 @@ int main(int argc, char *argv[]) { setpgid(0, 0); execvp(cmd, cmd_args); - fprintf(stderr, "execvp failed: %s: %s\n", cmd, strerror(errno)); - _exit(127); + child_fail("execvp", cmd); } /* === Shepherd (pipe) === */ @@ -630,8 +764,18 @@ int main(int argc, char *argv[]) { close(stdout_pipe[1]); close(stderr_pipe[1]); - /* Move child to cgroup (Linux only, no-op elsewhere) */ - cgroup_setup(child_pid); + /* Move child to cgroup (Linux only, no-op elsewhere). If the user + * requested a cgroup path and setup failed, isolation is not + * available — treat as fatal. */ + if (cgroup_setup(child_pid) != 0) { + send_error(uds_fd, "cgroup setup failed"); + kill_child(child_pid); + close(stdin_pipe[1]); + close(stdout_pipe[0]); + close(stderr_pipe[0]); + close(uds_fd); + return 1; + } int fds_to_send[3] = {stdin_pipe[1], stdout_pipe[0], stderr_pipe[0]}; if (send_fds(uds_fd, fds_to_send, 3) != 0) { diff --git a/lib/net_runner.ex b/lib/net_runner.ex index f6a00b2..c411e75 100644 --- a/lib/net_runner.ex +++ b/lib/net_runner.ex @@ -76,8 +76,16 @@ defmodule NetRunner do max_output_size = Keyword.get(opts, :max_output_size, nil) process_opts = Keyword.drop(opts, [:input, :timeout, :max_output_size]) - {:ok, pid} = Proc.start(cmd, args, process_opts) + case Proc.start(cmd, args, process_opts) do + {:ok, pid} -> + run_with_pid(pid, input, timeout, max_output_size) + {:error, _reason} = error -> + error + end + end + + defp run_with_pid(pid, input, timeout, max_output_size) do task = Task.async(fn -> run_io(pid, input, max_output_size) end) effective_timeout = timeout || :infinity diff --git a/lib/net_runner/daemon.ex b/lib/net_runner/daemon.ex index d450e7f..47b3668 100644 --- a/lib/net_runner/daemon.ex +++ b/lib/net_runner/daemon.ex @@ -83,7 +83,16 @@ defmodule NetRunner.Daemon do {:noreply, state} end - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + # Drain task went :DOWN. Normal completion would match the {ref, result} + # clause above, so here we expect an abnormal reason (crash, :killed, etc.) + # — log a warning so a drain crash does not silently stop draining. + def handle_info({:DOWN, ref, :process, _pid, reason}, state) do + if ref in [state.drain_ref, state.stderr_drain_ref] and reason != :normal do + require Logger + + Logger.warning("[NetRunner.Daemon] drain task crashed: #{inspect(reason)}") + end + {:noreply, state} end @@ -120,7 +129,7 @@ defmodule NetRunner.Daemon do defp drain_loop(reader, proc, on_output) do case reader.(proc) do {:ok, data} -> - handle_output(on_output, data) + safe_handle_output(on_output, data) drain_loop(reader, proc, on_output) :eof -> @@ -129,6 +138,25 @@ defmodule NetRunner.Daemon do {:error, _} -> :ok end + rescue + # Defensive: if reader.() or caller pattern blows up (e.g. Proc already + # terminated while we were mid-call), stop draining without bringing + # down the Daemon through the linked Task. + e -> + require Logger + Logger.warning("[NetRunner.Daemon] drain exception: #{inspect(e)}") + :ok + catch + :exit, _ -> :ok + end + + defp safe_handle_output(on_output, data) do + handle_output(on_output, data) + rescue + e -> + require Logger + Logger.warning("[NetRunner.Daemon] on_output raised: #{inspect(e)}") + :ok end defp handle_output(:discard, _data), do: :ok diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index 5c17daa..6bafa9a 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -94,7 +94,16 @@ defmodule NetRunner.Process do def init({cmd, args, opts}) do case Exec.spawn_process(cmd, args, opts) do {:ok, state} -> - state = %{state | stats: Stats.new()} + # Optionally monitor an "owner" process (typically the stream + # consumer). If it dies before the OS process exits we kill the + # OS process and stop cleanly instead of leaking a GenServer. + owner_ref = + case Keyword.get(opts, :owner) do + pid when is_pid(pid) -> Process.monitor(pid) + _ -> nil + end + + state = %{state | stats: Stats.new(), owner_ref: owner_ref} # Register with watcher for belt-and-suspenders cleanup NetRunner.Watcher.watch(self(), state.os_pid) # Start reading stderr in :consume mode @@ -246,13 +255,55 @@ defmodule NetRunner.Process do {:noreply, state} end + # A parked caller (read/write) died — drop its entry silently instead of + # letting it linger until process exit. The owner case is handled first. + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) + when is_reference(ref) do + if ref == state.owner_ref do + on_owner_down(state) + else + case Operations.pop_by_monitor(state.operations, ref) do + {nil, _ops} -> + {:noreply, state} + + {_op, ops} -> + {:noreply, %{state | operations: ops}} + end + end + end + + # Initial stderr chunk from kick_stderr_read in init/1. Without this clause + # the data would be silently dropped by the catch-all below. + def handle_info({:stderr_data, data}, state) when is_binary(data) do + stats = Stats.record_read_stderr(state.stats, byte_size(data)) + state = %{state | stderr_buffer: [data | state.stderr_buffer], stats: stats} + # Drain anything else buffered and re-arm enif_select on EAGAIN. + {:noreply, consume_stderr(state)} + end + def handle_info(_msg, state) do {:noreply, state} end + defp on_owner_down(state) do + if state.os_pid do + case Signal.resolve(:sigkill) do + {:ok, sig_num} -> + send_shepherd_command(state, <<0x01, sig_num::8>>) + Nif.nif_kill(state.os_pid, sig_num) + + _ -> + :ok + end + end + + {:stop, :normal, state} + end + @impl true def terminate(_reason, state) do - # Best-effort cleanup + # Best-effort cleanup. Order: close pipes (lets child see EOF), then + # UDS (shepherd detects POLLHUP, kills child), then the shepherd Port. if state.stdin, do: Pipe.close(state.stdin) if state.stdout, do: Pipe.close(state.stdout) if state.stderr, do: Pipe.close(state.stderr) @@ -261,6 +312,14 @@ defmodule NetRunner.Process do :socket.close(state.uds_socket) end + if is_port(state.shepherd_port) do + try do + Port.close(state.shepherd_port) + catch + _, _ -> :ok + end + end + :ok end @@ -275,11 +334,22 @@ defmodule NetRunner.Process do end # Writes data in a loop: partial writes retry immediately until EAGAIN - # (which registers enif_select) or completion. + # (which registers enif_select) or completion. This keeps enif_select + # in charge of readiness notifications; any path that parks the caller + # without going through the NIF's EAGAIN path must not be taken here. defp write_loop(<<>>, _from, state), do: {:reply, :ok, state} defp write_loop(data, from, state) do case Pipe.write(state.stdin, data) do + {:ok, 0} -> + # write(2) can legally return 0 on a non-empty buffer. Avoid + # spinning by forcing another nif_write — if the kernel still + # can't make progress it will return EAGAIN and register select. + # In practice this branch is unreachable on pipes/sockets but + # guards against the dirty scheduler hang regardless. + Process.sleep(1) + write_loop(data, from, state) + {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) state = %{state | stats: stats} @@ -288,8 +358,6 @@ defmodule NetRunner.Process do if bytes_written >= total do {:reply, :ok, state} else - # Partial write — retry remainder immediately to either complete - # or hit EAGAIN (which registers enif_select for readiness notification) remaining = binary_part(data, bytes_written, total - bytes_written) write_loop(remaining, from, state) end @@ -370,6 +438,11 @@ defmodule NetRunner.Process do defp retry_write_loop(ref, from, data, state) do case Pipe.write(state.stdin, data) do + {:ok, 0} -> + # Unreachable on pipes/sockets in practice; ops entry still valid + # with full data. A subsequent :ready_output will drive the retry. + state + {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) state = %{state | stats: stats} @@ -385,10 +458,8 @@ defmodule NetRunner.Process do end {:error, :eagain} -> - # Still parked, update data (enif_select already registered) - {_, ops} = Operations.pop(state.operations, ref) - {ops, _new_ref} = Operations.park(ops, :write, from, data) - %{state | operations: ops} + # Still parked, enif_select already re-registered + state {:error, _} = error -> GenServer.reply(from, error) @@ -399,8 +470,10 @@ defmodule NetRunner.Process do defp kick_stderr_read(state) do if state.stderr do - # Do an initial read to get enif_select registered - case Pipe.read(state.stderr) do + # Do an initial read to get enif_select registered. If data is + # immediately available, hand it to handle_info/2 so the GenServer + # buffers it (can't update state from init/1 without reshaping it). + case Pipe.read(state.stderr, @default_read_size) do {:ok, data} -> send(self(), {:stderr_data, data}) diff --git a/lib/net_runner/process/exec.ex b/lib/net_runner/process/exec.ex index a871830..042e2d7 100644 --- a/lib/net_runner/process/exec.ex +++ b/lib/net_runner/process/exec.ex @@ -25,7 +25,8 @@ defmodule NetRunner.Process.Exec do uds_path = uds_socket_path() pty_mode = Keyword.get(opts, :pty, false) - with :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), + with :ok <- validate_cmd_and_args(cmd, args), + :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), {:ok, listen_socket} <- create_uds_listener(uds_path), shepherd_port <- open_shepherd(uds_path, cmd, args, opts), {:ok, conn_socket} <- accept_connection(listen_socket), @@ -37,6 +38,30 @@ defmodule NetRunner.Process.Exec do end end + # Reject NUL bytes in cmd/args early; passing them through Port.open's + # args: option is undefined and could truncate a cmd string on the C side. + defp validate_cmd_and_args(cmd, args) do + cond do + not is_binary(cmd) -> + {:error, {:invalid_cmd, "must be a binary"}} + + cmd == "" -> + {:error, {:invalid_cmd, "must not be empty"}} + + String.contains?(cmd, <<0>>) -> + {:error, {:invalid_cmd, "must not contain NUL bytes"}} + + not is_list(args) -> + {:error, {:invalid_args, "must be a list of binaries"}} + + Enum.any?(args, fn a -> not is_binary(a) or String.contains?(a, <<0>>) end) -> + {:error, {:invalid_args, "each arg must be a binary without NUL bytes"}} + + true -> + :ok + end + end + defp setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode) do with {:ok, fds, iov_rest} <- receive_fds(conn_socket, pty_mode), {:ok, os_pid} <- extract_child_started(conn_socket, iov_rest), @@ -151,8 +176,12 @@ defmodule NetRunner.Process.Exec do defp cleanup_listener(listen_socket, path) do :socket.close(listen_socket) - File.rm(path) - :ok + + case File.rm(path) do + :ok -> :ok + {:error, :enoent} -> :ok + {:error, reason} -> {:error, {:uds_path_cleanup_failed, reason}} + end end @doc """ @@ -266,32 +295,35 @@ defmodule NetRunner.Process.Exec do @doc """ Reads a protocol message from the UDS. Used for ongoing communication. + + Structured as: read the 1-byte opcode, then the opcode-specific tail. + Avoids the peek-then-recv race where :peek sees the first byte but + the recv of the full frame times out because the tail is a moment + behind the kernel deliver queue. """ def read_uds_message(socket) do - case :socket.recv(socket, 0, [:peek], 0) do - {:ok, <<@msg_child_exited, _::binary>>} -> - case :socket.recv(socket, 5, [], 100) do - {:ok, <<@msg_child_exited, status::big-unsigned-32>>} -> - {:child_exited, status} - - other -> - {:error, {:unexpected, other}} - end - - {:ok, <<@msg_error, _::binary>>} -> - case :socket.recv(socket, 0, [], 100) do - {:ok, <<@msg_error, len::big-unsigned-16, msg::binary-size(len)>>} -> - {:shepherd_error, msg} - - other -> - {:error, {:unexpected, other}} - end + case :socket.recv(socket, 1, [], 500) do + {:ok, <<@msg_child_exited>>} -> recv_child_exited(socket) + {:ok, <<@msg_error>>} -> recv_error_message(socket) + {:ok, _} -> {:error, :unknown_message} + {:error, :timeout} -> {:error, :no_message} + {:error, reason} -> {:error, reason} + end + end - {:ok, _other} -> - {:error, :unknown_message} + defp recv_child_exited(socket) do + case :socket.recv(socket, 4, [], 500) do + {:ok, <>} -> {:child_exited, status} + other -> {:error, {:unexpected, other}} + end + end - {:error, reason} -> - {:error, reason} + defp recv_error_message(socket) do + with {:ok, <>} <- :socket.recv(socket, 2, [], 500), + {:ok, msg} <- :socket.recv(socket, len, [], 500) do + {:shepherd_error, msg} + else + other -> {:error, {:unexpected, other}} end end end diff --git a/lib/net_runner/process/operations.ex b/lib/net_runner/process/operations.ex index 85dad18..c40be4a 100644 --- a/lib/net_runner/process/operations.ex +++ b/lib/net_runner/process/operations.ex @@ -1,31 +1,58 @@ defmodule NetRunner.Process.Operations do @moduledoc false - @type op_type :: :read | :write + @type op_type :: :read | :write | {:read, :stdout | :stderr} @type pending_op :: {op_type(), GenServer.from(), term()} - defstruct pending: %{} + defstruct pending: %{}, monitors: %{} @type t :: %__MODULE__{ - pending: %{reference() => pending_op()} + pending: %{reference() => pending_op()}, + monitors: %{reference() => reference()} } @doc """ - Parks a caller that received :eagain. Returns updated ops and a ref for matching. + Parks a caller that received :eagain. Monitors the caller so the entry + can be reclaimed if the caller crashes or times out before the GenServer + can reply. Returns updated ops and a ref for matching. """ - def park(%__MODULE__{pending: pending} = ops, type, from, context \\ nil) do + def park(%__MODULE__{pending: pending, monitors: monitors} = ops, type, from, context \\ nil) do ref = make_ref() op = {type, from, context} - {%{ops | pending: Map.put(pending, ref, op)}, ref} + + {caller_pid, _} = from + mref = Process.monitor(caller_pid) + + {%{ops | pending: Map.put(pending, ref, op), monitors: Map.put(monitors, mref, ref)}, ref} end @doc """ - Retrieves and removes a pending operation by ref. + Retrieves and removes a pending operation by ref. Demonitors the caller + we established in park/4. """ - def pop(%__MODULE__{pending: pending} = ops, ref) do + def pop(%__MODULE__{pending: pending, monitors: monitors} = ops, ref) do case Map.pop(pending, ref) do - {nil, _} -> {nil, ops} - {op, rest} -> {op, %{ops | pending: rest}} + {nil, _} -> + {nil, ops} + + {op, rest} -> + monitors = demonitor_for_op(monitors, ref) + {op, %{ops | pending: rest, monitors: monitors}} + end + end + + @doc """ + Removes the pending op whose caller-monitor ref matches `mref` (invoked + from the GenServer's :DOWN handler). Returns {op_or_nil, new_ops}. + """ + def pop_by_monitor(%__MODULE__{pending: pending, monitors: monitors} = ops, mref) do + case Map.pop(monitors, mref) do + {nil, _} -> + {nil, ops} + + {op_ref, monitors_rest} -> + {op, pending_rest} = Map.pop(pending, op_ref) + {op, %{ops | pending: pending_rest, monitors: monitors_rest}} end end @@ -38,14 +65,28 @@ defmodule NetRunner.Process.Operations do @doc """ Replies to all pending operations with the given response and clears them. + Demonitors all caller monitors along the way. """ - def reply_all(%__MODULE__{pending: pending} = ops, response) do + def reply_all(%__MODULE__{pending: pending, monitors: monitors} = ops, response) do Enum.each(pending, fn {_ref, {_type, from, _ctx}} -> GenServer.reply(from, response) end) - %{ops | pending: %{}} + Enum.each(monitors, fn {mref, _op_ref} -> Process.demonitor(mref, [:flush]) end) + + %{ops | pending: %{}, monitors: %{}} end def empty?(%__MODULE__{pending: pending}), do: map_size(pending) == 0 + + defp demonitor_for_op(monitors, op_ref) do + case Enum.find(monitors, fn {_mref, r} -> r == op_ref end) do + {mref, _} -> + Process.demonitor(mref, [:flush]) + Map.delete(monitors, mref) + + nil -> + monitors + end + end end diff --git a/lib/net_runner/process/state.ex b/lib/net_runner/process/state.ex index 202d3c9..e560588 100644 --- a/lib/net_runner/process/state.ex +++ b/lib/net_runner/process/state.ex @@ -13,6 +13,7 @@ defmodule NetRunner.Process.State do :exit_status, :cmd, :args, + :owner_ref, operations: %Operations{}, awaiting_exit: [], stderr_mode: :consume, @@ -32,6 +33,7 @@ defmodule NetRunner.Process.State do exit_status: non_neg_integer() | nil, cmd: String.t(), args: [String.t()], + owner_ref: reference() | nil, operations: Operations.t(), awaiting_exit: [GenServer.from()], stderr_mode: :consume | :redirect | :disabled, diff --git a/lib/net_runner/signal.ex b/lib/net_runner/signal.ex index 69c972b..1c3df06 100644 --- a/lib/net_runner/signal.ex +++ b/lib/net_runner/signal.ex @@ -3,16 +3,18 @@ defmodule NetRunner.Signal do alias NetRunner.Process.Nif - @signals ~w(sigterm sigkill sigint sighup sigusr1 sigusr2 sigstop sigcont sigquit sigpipe)a - @doc """ Resolves a signal atom to its platform-specific number. + + The canonical list of supported signal atoms lives in the NIF + (`nif_signal_number`); calling it directly keeps Elixir and C from + drifting out of sync. """ - def resolve(signal) when signal in @signals do - Nif.nif_signal_number(signal) - end + def resolve(signal) when is_atom(signal), do: Nif.nif_signal_number(signal) + + def resolve(signal) when is_integer(signal) and signal >= 1 and signal <= 31, + do: {:ok, signal} - def resolve(signal) when is_integer(signal), do: {:ok, signal} def resolve(_signal), do: {:error, :unknown_signal} @doc """ diff --git a/lib/net_runner/stream.ex b/lib/net_runner/stream.ex index c6706fe..072cba6 100644 --- a/lib/net_runner/stream.ex +++ b/lib/net_runner/stream.ex @@ -27,7 +27,13 @@ defmodule NetRunner.Stream do """ def stream(cmd, args, opts) do input = Keyword.get(opts, :input, nil) - process_opts = Keyword.drop(opts, [:input]) + # Pass the caller as :owner so the Process GenServer stops (and kills + # the OS process) if the stream consumer crashes before Stream.resource's + # after callback would otherwise run. + process_opts = + opts + |> Keyword.drop([:input]) + |> Keyword.put_new(:owner, self()) case Proc.start(cmd, args, process_opts) do {:ok, pid} -> diff --git a/lib/net_runner/watcher.ex b/lib/net_runner/watcher.ex index 7f9b971..70711fb 100644 --- a/lib/net_runner/watcher.ex +++ b/lib/net_runner/watcher.ex @@ -38,35 +38,35 @@ defmodule NetRunner.Watcher do @impl true def handle_info({:DOWN, ref, :process, _pid, _reason}, %{monitor_ref: ref} = state) do - # GenServer crashed — kill the OS process - kill_os_process(state.os_pid) - {:stop, :normal, state} - end - - def handle_info(_msg, state) do - {:noreply, state} - end - - defp kill_os_process(os_pid) do - case Nif.nif_is_os_pid_alive(os_pid) do + # GenServer crashed — kill the OS process, then schedule a SIGKILL + # escalation check instead of Process.sleep (which would wedge this + # GenServer unresponsive to any other messages for 5 seconds). + case Nif.nif_is_os_pid_alive(state.os_pid) do true -> {:ok, sigterm} = Signal.resolve(:sigterm) - Nif.nif_kill(os_pid, sigterm) + Nif.nif_kill(state.os_pid, sigterm) + Process.send_after(self(), :escalate_to_sigkill, @kill_timeout) + {:noreply, state} - # Wait briefly, then escalate - Process.sleep(@kill_timeout) - - case Nif.nif_is_os_pid_alive(os_pid) do - true -> - {:ok, sigkill} = Signal.resolve(:sigkill) - Nif.nif_kill(os_pid, sigkill) + false -> + {:stop, :normal, state} + end + end - false -> - :ok - end + def handle_info(:escalate_to_sigkill, state) do + case Nif.nif_is_os_pid_alive(state.os_pid) do + true -> + {:ok, sigkill} = Signal.resolve(:sigkill) + Nif.nif_kill(state.os_pid, sigkill) false -> :ok end + + {:stop, :normal, state} + end + + def handle_info(_msg, state) do + {:noreply, state} end end diff --git a/test/cgroup_test.exs b/test/cgroup_test.exs index b05c4c3..1bbe836 100644 --- a/test/cgroup_test.exs +++ b/test/cgroup_test.exs @@ -5,16 +5,28 @@ defmodule NetRunner.CgroupTest do describe "cgroup support" do @tag :linux_only - test "cgroup_path option is passed to shepherd" do - # On macOS this is a no-op — shepherd ignores --cgroup-path on non-Linux. - # On Linux with cgroup v2, the child would be moved into the cgroup. - {:ok, pid} = - Proc.start("echo", ["hello"], cgroup_path: "net_runner/test_#{:rand.uniform(10000)}") + test "cgroup_path option is plumbed through to the shepherd" do + # On Linux, spawning with a cgroup_path requires write access under + # /sys/fs/cgroup/. In a privileged environment the child is moved into + # the cgroup and runs to completion; in CI (no privileges) the shepherd + # rejects the setup and returns an error — which proves the option was + # actually seen and validated by the C side. Either outcome confirms + # the plumbing. + path = "net_runner_test_#{:rand.uniform(1_000_000)}" - {:ok, data} = Proc.read(pid) - assert data =~ "hello" - {:ok, status} = Proc.await_exit(pid) - assert status == 0 + case Proc.start("echo", ["hello"], cgroup_path: path) do + {:ok, pid} -> + # Privileged run — child moved into cgroup successfully. + {:ok, data} = Proc.read(pid) + assert data =~ "hello" + assert {:ok, 0} = Proc.await_exit(pid) + + {:error, _reason} -> + # Unprivileged run — the shepherd refused to proceed without + # the requested isolation. That is the correct behaviour when + # a user explicitly asks for a cgroup they cannot use. + :ok + end end test "cgroup_path nil (default) works normally" do @@ -23,5 +35,13 @@ defmodule NetRunner.CgroupTest do assert data =~ "no cgroup" {:ok, 0} = Proc.await_exit(pid) end + + test "rejects invalid cgroup paths (traversal / absolute)" do + assert {:error, {:invalid_cgroup_path, _}} = + Proc.start("echo", ["x"], cgroup_path: "/absolute/nope") + + assert {:error, {:invalid_cgroup_path, _}} = + Proc.start("echo", ["x"], cgroup_path: "some/../evil") + end end end diff --git a/test/net_runner_test.exs b/test/net_runner_test.exs index a2c5f7c..9bd4665 100644 --- a/test/net_runner_test.exs +++ b/test/net_runner_test.exs @@ -60,4 +60,49 @@ defmodule NetRunnerTest do assert output == "hello\n" end end + + describe "input validation" do + # Regression: run/2 used to pattern-match {:ok, pid} on Proc.start + # which raised MatchError when validation failed. Now it returns the + # error tuple directly. + test "run surfaces NUL-byte validation error cleanly" do + assert {:error, {:invalid_args, _}} = NetRunner.run(["echo", "bad\0arg"]) + end + + test "stream surfaces NUL-byte validation error cleanly" do + assert {:error, {:invalid_args, _}} = NetRunner.stream(["echo", "bad\0arg"]) + end + + test "run rejects empty executable" do + assert {:error, {:invalid_cmd, _}} = NetRunner.run([""]) + end + end + + describe "timeout path cleanup" do + # Regression / sanity: on timeout, the OS process must be killed and + # the GenServer stopped — no zombies left behind. + test "timeout returns :timeout and cleans up" do + start_count = count_sleep_processes() + + for _ <- 1..5 do + assert {:error, :timeout} = + NetRunner.run(["sleep", "30"], timeout: 100) + end + + # Give the shepherd + watcher a moment to reap + Process.sleep(300) + + end_count = count_sleep_processes() + # Allow a tolerance for concurrent tests spawning sleeps + assert end_count <= start_count + 1, + "expected sleeps to be reaped; start=#{start_count} end=#{end_count}" + end + end + + defp count_sleep_processes do + case System.cmd("pgrep", ["-x", "sleep"], stderr_to_stdout: true) do + {out, 0} -> out |> String.split("\n", trim: true) |> length() + _ -> 0 + end + end end diff --git a/test/process_test.exs b/test/process_test.exs index 1a23419..8f23974 100644 --- a/test/process_test.exs +++ b/test/process_test.exs @@ -81,6 +81,17 @@ defmodule NetRunner.ProcessTest do end end + describe "binary data" do + test "round-trips output containing NUL bytes" do + {:ok, pid} = Proc.start("sh", ["-c", ~S|printf 'a\0b\0c'|], []) + + data = read_all(pid) + assert data == "a\0b\0c" + assert byte_size(data) == 5 + assert {:ok, 0} = Proc.await_exit(pid) + end + end + describe "os_pid" do test "returns the OS pid" do {:ok, pid} = Proc.start("sleep", ["100"]) @@ -108,6 +119,85 @@ defmodule NetRunner.ProcessTest do end end + describe "input validation" do + # Regression: NUL bytes in cmd/args used to be passed through to + # Port.open's args:, which is undefined behaviour on the C side. + test "rejects NUL byte in cmd" do + assert {:error, {:invalid_cmd, _}} = Proc.start("ec\0ho", ["hi"]) + end + + test "rejects NUL byte in args" do + assert {:error, {:invalid_args, _}} = Proc.start("echo", ["he\0llo"]) + end + + test "rejects empty cmd" do + assert {:error, {:invalid_cmd, _}} = Proc.start("", []) + end + + test "NetRunner.run surfaces validation error instead of crashing" do + assert {:error, {:invalid_args, _}} = + NetRunner.run(["echo", "he\0llo"]) + end + end + + describe "stderr capture for fast-exiting processes" do + # Regression: the initial stderr chunk was sent to self() via + # {:stderr_data, _} but no handle_info matched, so the first (often + # only) chunk was dropped for fast-exiting commands. + test "stderr-only command exits cleanly with default :consume" do + assert {"", 0} = NetRunner.run(["sh", "-c", "echo err >&2"]) + end + + test "stats reflect stderr bytes read for :consume mode" do + {:ok, pid} = Proc.start("sh", ["-c", "echo hello-stderr >&2"], stderr: :consume) + assert {:ok, 0} = Proc.await_exit(pid, 5_000) + stats = Proc.stats(pid) + # "hello-stderr\n" = 13 bytes; tolerate >0 in case the shell adds extras. + assert stats.bytes_err >= 13 + end + end + + describe "owner monitor cleanup" do + # Regression: if the stream consumer (or any :owner process) crashes + # mid-iteration, Stream.resource's after callback is never run. + # Before the :owner-monitor fix, NetRunner.Process and its OS child + # lived on. Now the GenServer SIGKILLs and stops. + test "Process SIGKILLs OS process when :owner dies" do + parent = self() + + consumer = + spawn(fn -> + {:ok, pid} = Proc.start("sleep", ["30"], owner: self()) + os_pid = Proc.os_pid(pid) + send(parent, {:os_pid, os_pid, pid}) + exit(:boom) + end) + + {os_pid, proc_pid} = + receive do + {:os_pid, op, pp} -> {op, pp} + after + 2_000 -> flunk("did not receive os_pid from consumer") + end + + _ = consumer + + # Wait for the Process GenServer to detect the DOWN, SIGKILL the + # child, reap it, and stop. + Process.sleep(500) + + refute Process.alive?(proc_pid), "Process GenServer should have stopped" + refute os_pid_alive?(os_pid), "OS process should be killed" + end + end + + defp os_pid_alive?(os_pid) do + case System.cmd("kill", ["-0", to_string(os_pid)], stderr_to_stdout: true) do + {_, 0} -> true + _ -> false + end + end + defp read_all(pid) do case Proc.read(pid) do {:ok, data} -> data <> read_all(pid) diff --git a/test/signal_test.exs b/test/signal_test.exs new file mode 100644 index 0000000..b9a5d67 --- /dev/null +++ b/test/signal_test.exs @@ -0,0 +1,45 @@ +defmodule NetRunner.SignalTest do + use ExUnit.Case, async: true + + alias NetRunner.Signal + + describe "resolve/1" do + test "resolves known signal atoms via the NIF" do + assert {:ok, num} = Signal.resolve(:sigterm) + assert is_integer(num) and num > 0 + end + + test "passes through valid integers in the POSIX 1..31 range" do + assert {:ok, 9} = Signal.resolve(9) + assert {:ok, 15} = Signal.resolve(15) + end + + test "rejects integers outside 1..31" do + assert {:error, :unknown_signal} = Signal.resolve(0) + assert {:error, :unknown_signal} = Signal.resolve(-1) + assert {:error, :unknown_signal} = Signal.resolve(32) + assert {:error, :unknown_signal} = Signal.resolve(9999) + end + + test "rejects unknown atoms" do + assert {:error, :unknown_signal} = Signal.resolve(:siggibberish) + end + + test "rejects non-atom, non-integer values" do + assert {:error, :unknown_signal} = Signal.resolve("sigterm") + assert {:error, :unknown_signal} = Signal.resolve(nil) + assert {:error, :unknown_signal} = Signal.resolve([]) + end + end + + describe "resolve!/1" do + test "returns the number for valid signals" do + assert is_integer(Signal.resolve!(:sigkill)) + end + + test "raises for invalid signals" do + assert_raise ArgumentError, fn -> Signal.resolve!(:bogus) end + assert_raise ArgumentError, fn -> Signal.resolve!(99) end + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index d1d11e6..c9011b7 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -5,3 +5,15 @@ exclude = end ExUnit.start(exclude: exclude) + +# Clean up any stale UDS sockets left behind by previous test runs that +# crashed before cleanup_listener could remove them. +defmodule NetRunner.TestCleanup do + def purge_stale_sockets do + pattern = Path.join(System.tmp_dir!(), "net_runner_*.sock") + pattern |> Path.wildcard() |> Enum.each(&File.rm/1) + end +end + +NetRunner.TestCleanup.purge_stale_sockets() +ExUnit.after_suite(fn _ -> NetRunner.TestCleanup.purge_stale_sockets() end)