From 3d419718a4aa833d35cbba719615f52d3cbc1b8b Mon Sep 17 00:00:00 2001 From: Niko Maroulis Date: Sat, 6 Jun 2026 09:59:30 -0400 Subject: [PATCH] Fix review findings: stderr API, UDS perms, signal range, write loop, Daemon Address findings from a parallel code review of recent changes. Blocker: - Remove documented-but-unimplemented `stderr: :redirect` (output was silently dropped and the child could block on a full stderr pipe). `:stderr` is now validated at spawn time (`:consume`/`:disabled`); docs, typespec, and README updated. Security/correctness: - nif_kill: reject signals outside 1..31 (mirrors shepherd CMD_KILL). - UDS socket now lives in a per-spawn 0700 dir, blocking cross-user SCM_RIGHTS hijack; also cleans up the dir on failure paths (was leaking empty dirs in tmp). - Map a zero-byte write on a non-empty buffer to EAGAIN+enif_select in the NIF and drop the GenServer-blocking Process.sleep(1) retry. - Daemon drains via Task.Supervisor.async_nolink (added NetRunner.TaskSupervisor) so a drain crash can't take the Daemon down. Cleanups: - Log shepherd errors in handle_uds_message instead of dropping them. - Operations: O(1) demonitor via an op_ref -> mref reverse index. - Command.new/3: guard clauses instead of unless/raise. Tests: - process_test: poll instead of sleep(500) for async cleanup. - net_runner_test: unique `pgrep -f` marker (no longer racy under async). - signal_test: assert_raise message patterns. - daemon_test: drain-task isolation test. --- README.md | 2 +- c_src/net_runner_nif.c | 12 ++++- lib/net_runner.ex | 6 +-- lib/net_runner/application.ex | 3 +- lib/net_runner/command.ex | 19 ++++---- lib/net_runner/daemon.ex | 6 ++- lib/net_runner/process.ex | 22 ++++----- lib/net_runner/process/exec.ex | 71 ++++++++++++++++++++++------ lib/net_runner/process/operations.ex | 53 ++++++++++++++------- lib/net_runner/process/state.ex | 2 +- test/command_test.exs | 4 +- test/daemon_test.exs | 17 +++++++ test/net_runner_test.exs | 16 +++---- test/process_test.exs | 31 ++++++++++-- test/signal_test.exs | 4 +- 15 files changed, 189 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 3487740..fcd83ea 100644 --- a/README.md +++ b/README.md @@ -439,7 +439,7 @@ end, max_concurrency: 20) | `:input` | binary \| list | `nil` | Data to write to stdin | | `:timeout` | integer | `nil` | Wall-clock timeout in ms | | `:max_output_size` | integer | `nil` | Max bytes to collect | -| `:stderr` | atom | `:consume` | `:consume`, `:redirect`, or `:disabled` | +| `:stderr` | atom | `:consume` | `:consume` (drained internally) or `:disabled` | | `:pty` | boolean | `false` | Use pseudo-terminal | | `:kill_timeout` | integer | `5000` | SIGTERM→SIGKILL escalation timeout in ms | | `:cgroup_path` | string | `nil` | cgroup v2 path (Linux only) | diff --git a/c_src/net_runner_nif.c b/c_src/net_runner_nif.c index b7e4983..d7bb5b8 100644 --- a/c_src/net_runner_nif.c +++ b/c_src/net_runner_nif.c @@ -299,11 +299,14 @@ static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc, ssize_t n = write(fd, bin.data, bin.size); int saved_errno = errno; - if (n >= 0) { + if (n > 0) { enif_mutex_unlock(res->lock); return enif_make_tuple2(env, atom_ok, enif_make_int64(env, (int64_t)n)); } - if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { + /* write() returning 0 on a non-empty buffer (bin.size > 0 here) is rare + * but legal. Treat it like EAGAIN: register for write readiness and let + * the caller re-arm via :ready_output, rather than spinning. */ + if (n == 0 || saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) { int sel_ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE, res, NULL, atom_undefined); @@ -405,6 +408,11 @@ static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc, if (!enif_get_int(env, argv[1], &sig)) { return enif_make_badarg(env); } + /* Reject signals outside the POSIX range, mirroring shepherd.c's + * CMD_KILL validation. Bounds the blast radius of a stray nif_kill. */ + if (sig < 1 || sig > 31) { + return enif_make_badarg(env); + } if (kill((pid_t)os_pid, sig) == 0) { return atom_ok; diff --git a/lib/net_runner.ex b/lib/net_runner.ex index c411e75..f88c587 100644 --- a/lib/net_runner.ex +++ b/lib/net_runner.ex @@ -33,8 +33,8 @@ defmodule NetRunner do ## Options - * `:stderr` - `:consume` (default, captured internally), `:redirect` (merged with stdout), - or `:disabled` + * `:stderr` - `:consume` (default, drained internally so the child never + blocks on a full stderr pipe) or `:disabled` * `:input` - data to write to stdin (binary or enumerable) * `:timeout` - maximum wall-clock time in milliseconds. Sends SIGTERM then SIGKILL on timeout. Returns `{:error, :timeout}` instead of `{output, exit_status}`. @@ -119,7 +119,7 @@ defmodule NetRunner do ## Options * `:input` - data to write to stdin (binary, list, or Stream) - * `:stderr` - `:consume` (default), `:redirect`, or `:disabled` + * `:stderr` - `:consume` (default) or `:disabled` ## Examples diff --git a/lib/net_runner/application.ex b/lib/net_runner/application.ex index cc3dc69..7a9aa83 100644 --- a/lib/net_runner/application.ex +++ b/lib/net_runner/application.ex @@ -6,7 +6,8 @@ defmodule NetRunner.Application do @impl true def start(_type, _args) do children = [ - {DynamicSupervisor, name: NetRunner.WatcherSupervisor, strategy: :one_for_one} + {DynamicSupervisor, name: NetRunner.WatcherSupervisor, strategy: :one_for_one}, + {Task.Supervisor, name: NetRunner.TaskSupervisor} ] opts = [strategy: :one_for_one, name: NetRunner.Supervisor] diff --git a/lib/net_runner/command.ex b/lib/net_runner/command.ex index a9ed441..6ca78f0 100644 --- a/lib/net_runner/command.ex +++ b/lib/net_runner/command.ex @@ -55,18 +55,21 @@ defmodule NetRunner.Command do %NetRunner.Command{executable: "curl", args: ["-s"], opts: [timeout: 10_000]} """ @spec new(String.t(), [String.t()], keyword()) :: t() - def new(executable, args \\ [], opts \\ []) do - unless is_binary(executable) do - raise ArgumentError, "executable must be a string, got: #{inspect(executable)}" - end - - unless is_list(args) do - raise ArgumentError, "args must be a list, got: #{inspect(args)}" - end + def new(executable, args \\ [], opts \\ []) + def new(executable, args, opts) + when is_binary(executable) and is_list(args) and is_list(opts) do %__MODULE__{executable: executable, args: args, opts: opts} end + def new(executable, _args, _opts) when not is_binary(executable) do + raise ArgumentError, "executable must be a string, got: #{inspect(executable)}" + end + + def new(_executable, args, _opts) when not is_list(args) do + raise ArgumentError, "args must be a list, got: #{inspect(args)}" + end + @doc """ Decomposes a command struct into `{executable, args, opts}`. diff --git a/lib/net_runner/daemon.ex b/lib/net_runner/daemon.ex index 47b3668..9262e4d 100644 --- a/lib/net_runner/daemon.ex +++ b/lib/net_runner/daemon.ex @@ -118,8 +118,12 @@ defmodule NetRunner.Daemon do defp start_drain(proc, pipe, on_output) do reader = if pipe == :stdout, do: &Proc.read/1, else: &Proc.read_stderr/1 + # async_nolink (not async): the drain task is unlinked from the Daemon, + # so a task crash cannot take the Daemon down. Completion arrives as + # {ref, result} and abnormal exit as {:DOWN, ref, ...}, both handled in + # handle_info/2. task = - Task.async(fn -> + Task.Supervisor.async_nolink(NetRunner.TaskSupervisor, fn -> drain_loop(reader, proc, on_output) end) diff --git a/lib/net_runner/process.ex b/lib/net_runner/process.ex index 738cfc2..31c9345 100644 --- a/lib/net_runner/process.ex +++ b/lib/net_runner/process.ex @@ -337,19 +337,12 @@ defmodule NetRunner.Process do # (which registers enif_select) or completion. This keeps enif_select # in charge of readiness notifications; any path that parks the caller # without going through the NIF's EAGAIN path must not be taken here. + # A zero-byte write on a non-empty buffer is mapped to :eagain inside the + # NIF (which registers select), so it can never reach this loop. defp write_loop(<<>>, _from, state), do: {:reply, :ok, state} defp write_loop(data, from, state) do case Pipe.write(state.stdin, data) do - {:ok, 0} -> - # write(2) can legally return 0 on a non-empty buffer. Avoid - # spinning by forcing another nif_write — if the kernel still - # can't make progress it will return EAGAIN and register select. - # In practice this branch is unreachable on pipes/sockets but - # guards against the dirty scheduler hang regardless. - Process.sleep(1) - write_loop(data, from, state) - {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) state = %{state | stats: stats} @@ -438,11 +431,6 @@ defmodule NetRunner.Process do defp retry_write_loop(ref, from, data, state) do case Pipe.write(state.stdin, data) do - {:ok, 0} -> - # Unreachable on pipes/sockets in practice; ops entry still valid - # with full data. A subsequent :ready_output will drive the retry. - state - {:ok, bytes_written} -> stats = Stats.record_write(state.stats, bytes_written) state = %{state | stats: stats} @@ -552,6 +540,12 @@ defmodule NetRunner.Process do {:child_exited, status} -> finish_exit(state, status) + {:shepherd_error, msg} -> + require Logger + + Logger.warning("[NetRunner] shepherd reported error: #{inspect(msg)}") + state + _ -> state end diff --git a/lib/net_runner/process/exec.ex b/lib/net_runner/process/exec.ex index 042e2d7..e767260 100644 --- a/lib/net_runner/process/exec.ex +++ b/lib/net_runner/process/exec.ex @@ -25,19 +25,38 @@ defmodule NetRunner.Process.Exec do uds_path = uds_socket_path() pty_mode = Keyword.get(opts, :pty, false) - with :ok <- validate_cmd_and_args(cmd, args), - :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), - {:ok, listen_socket} <- create_uds_listener(uds_path), - shepherd_port <- open_shepherd(uds_path, cmd, args, opts), - {:ok, conn_socket} <- accept_connection(listen_socket), - :ok <- cleanup_listener(listen_socket, uds_path) do - # conn_socket and shepherd_port are now live — clean up on any failure - setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode) - else - {:error, reason} -> {:error, reason} + result = + with :ok <- validate_cmd_and_args(cmd, args), + :ok <- validate_stderr_mode(Keyword.get(opts, :stderr, :consume), pty_mode), + :ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)), + {:ok, listen_socket} <- create_uds_listener(uds_path), + shepherd_port <- open_shepherd(uds_path, cmd, args, opts), + {:ok, conn_socket} <- accept_connection(listen_socket), + :ok <- cleanup_listener(listen_socket, uds_path) do + # conn_socket and shepherd_port are now live — clean up on any failure + setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode) + end + + # On the happy path cleanup_listener removed the socket file and its + # per-spawn dir. On any failure (validation, accept timeout, ...) the dir + # may still exist — remove it so spawns don't leak empty 0700 dirs. + case result do + {:ok, _} = ok -> ok + {:error, _} = err -> cleanup_uds_dir_passthrough(uds_path, err) end end + defp cleanup_uds_dir(uds_path) do + _ = File.rm(uds_path) + _ = File.rmdir(Path.dirname(uds_path)) + :ok + end + + defp cleanup_uds_dir_passthrough(uds_path, err) do + cleanup_uds_dir(uds_path) + err + end + # Reject NUL bytes in cmd/args early; passing them through Port.open's # args: option is undefined and could truncate a cmd string on the C side. defp validate_cmd_and_args(cmd, args) do @@ -101,6 +120,16 @@ defmodule NetRunner.Process.Exec do _, _ -> :ok end + # In PTY mode stderr is folded into the bidirectional master FD, so the + # :stderr option is ignored. In pipe mode only :consume (drained internally + # to avoid blocking the child on a full pipe) and :disabled are supported. + defp validate_stderr_mode(_mode, true), do: :ok + defp validate_stderr_mode(mode, false) when mode in [:consume, :disabled], do: :ok + + defp validate_stderr_mode(mode, false) do + {:error, {:invalid_stderr, "must be :consume or :disabled, got: #{inspect(mode)}"}} + end + defp validate_cgroup_path(nil), do: :ok defp validate_cgroup_path(path) do @@ -118,9 +147,18 @@ defmodule NetRunner.Process.Exec do end end + # Place the socket inside a per-spawn 0700 directory so only the current + # user can traverse to it. Without this the socket lives directly in the + # world-traversable tmp dir, and a same-host attacker who wins the accept + # race against the real shepherd would receive the child's pipe FDs via + # SCM_RIGHTS. The 0700 dir reduces the threat to same-uid processes (which + # are already inside our trust domain). defp uds_socket_path do random = Base.encode16(:crypto.strong_rand_bytes(8), case: :lower) - Path.join(System.tmp_dir!(), "net_runner_#{random}.sock") + dir = Path.join(System.tmp_dir!(), "net_runner_#{random}") + File.mkdir_p!(dir) + File.chmod!(dir, 0o700) + Path.join(dir, "shepherd.sock") end defp create_uds_listener(path) do @@ -178,9 +216,14 @@ defmodule NetRunner.Process.Exec do :socket.close(listen_socket) case File.rm(path) do - :ok -> :ok - {:error, :enoent} -> :ok - {:error, reason} -> {:error, {:uds_path_cleanup_failed, reason}} + result when result in [:ok, {:error, :enoent}] -> + # Best-effort removal of the per-spawn 0700 dir created in + # uds_socket_path/0; it is empty once the socket file is gone. + _ = File.rmdir(Path.dirname(path)) + :ok + + {:error, reason} -> + {:error, {:uds_path_cleanup_failed, reason}} end end diff --git a/lib/net_runner/process/operations.ex b/lib/net_runner/process/operations.ex index c40be4a..954fe76 100644 --- a/lib/net_runner/process/operations.ex +++ b/lib/net_runner/process/operations.ex @@ -4,11 +4,12 @@ defmodule NetRunner.Process.Operations do @type op_type :: :read | :write | {:read, :stdout | :stderr} @type pending_op :: {op_type(), GenServer.from(), term()} - defstruct pending: %{}, monitors: %{} + defstruct pending: %{}, monitors: %{}, op_monitors: %{} @type t :: %__MODULE__{ pending: %{reference() => pending_op()}, - monitors: %{reference() => reference()} + monitors: %{reference() => reference()}, + op_monitors: %{reference() => reference()} } @doc """ @@ -16,28 +17,35 @@ defmodule NetRunner.Process.Operations do can be reclaimed if the caller crashes or times out before the GenServer can reply. Returns updated ops and a ref for matching. """ - def park(%__MODULE__{pending: pending, monitors: monitors} = ops, type, from, context \\ nil) do + def park(%__MODULE__{} = ops, type, from, context \\ nil) do ref = make_ref() op = {type, from, context} {caller_pid, _} = from mref = Process.monitor(caller_pid) - {%{ops | pending: Map.put(pending, ref, op), monitors: Map.put(monitors, mref, ref)}, ref} + new_ops = %{ + ops + | pending: Map.put(ops.pending, ref, op), + monitors: Map.put(ops.monitors, mref, ref), + op_monitors: Map.put(ops.op_monitors, ref, mref) + } + + {new_ops, ref} end @doc """ Retrieves and removes a pending operation by ref. Demonitors the caller we established in park/4. """ - def pop(%__MODULE__{pending: pending, monitors: monitors} = ops, ref) do + def pop(%__MODULE__{pending: pending} = ops, ref) do case Map.pop(pending, ref) do {nil, _} -> {nil, ops} {op, rest} -> - monitors = demonitor_for_op(monitors, ref) - {op, %{ops | pending: rest, monitors: monitors}} + ops = demonitor_for_op(%{ops | pending: rest}, ref) + {op, ops} end end @@ -45,14 +53,24 @@ defmodule NetRunner.Process.Operations do Removes the pending op whose caller-monitor ref matches `mref` (invoked from the GenServer's :DOWN handler). Returns {op_or_nil, new_ops}. """ - def pop_by_monitor(%__MODULE__{pending: pending, monitors: monitors} = ops, mref) do + def pop_by_monitor( + %__MODULE__{pending: pending, monitors: monitors, op_monitors: op_monitors} = ops, + mref + ) do case Map.pop(monitors, mref) do {nil, _} -> {nil, ops} {op_ref, monitors_rest} -> {op, pending_rest} = Map.pop(pending, op_ref) - {op, %{ops | pending: pending_rest, monitors: monitors_rest}} + + {op, + %{ + ops + | pending: pending_rest, + monitors: monitors_rest, + op_monitors: Map.delete(op_monitors, op_ref) + }} end end @@ -74,19 +92,20 @@ defmodule NetRunner.Process.Operations do Enum.each(monitors, fn {mref, _op_ref} -> Process.demonitor(mref, [:flush]) end) - %{ops | pending: %{}, monitors: %{}} + %{ops | pending: %{}, monitors: %{}, op_monitors: %{}} end def empty?(%__MODULE__{pending: pending}), do: map_size(pending) == 0 - defp demonitor_for_op(monitors, op_ref) do - case Enum.find(monitors, fn {_mref, r} -> r == op_ref end) do - {mref, _} -> - Process.demonitor(mref, [:flush]) - Map.delete(monitors, mref) + # O(1) demonitor via the op_ref -> mref reverse index. + defp demonitor_for_op(%__MODULE__{monitors: monitors, op_monitors: op_monitors} = ops, op_ref) do + case Map.pop(op_monitors, op_ref) do + {nil, _} -> + ops - nil -> - monitors + {mref, op_monitors_rest} -> + Process.demonitor(mref, [:flush]) + %{ops | monitors: Map.delete(monitors, mref), op_monitors: op_monitors_rest} end end end diff --git a/lib/net_runner/process/state.ex b/lib/net_runner/process/state.ex index e560588..bec7772 100644 --- a/lib/net_runner/process/state.ex +++ b/lib/net_runner/process/state.ex @@ -36,7 +36,7 @@ defmodule NetRunner.Process.State do owner_ref: reference() | nil, operations: Operations.t(), awaiting_exit: [GenServer.from()], - stderr_mode: :consume | :redirect | :disabled, + stderr_mode: :consume | :disabled, stderr_buffer: [binary()], status: status() } diff --git a/test/command_test.exs b/test/command_test.exs index 8678cfc..af35bfc 100644 --- a/test/command_test.exs +++ b/test/command_test.exs @@ -65,9 +65,9 @@ defmodule NetRunner.CommandTest do end test "preserves multiple opts" do - cmd = Command.new("cmd", [], timeout: 5_000, stderr: :redirect, pty: true) + cmd = Command.new("cmd", [], timeout: 5_000, stderr: :disabled, pty: true) - assert cmd.opts == [timeout: 5_000, stderr: :redirect, pty: true] + assert cmd.opts == [timeout: 5_000, stderr: :disabled, pty: true] end test "raises on non-binary executable" do diff --git a/test/daemon_test.exs b/test/daemon_test.exs index c6fa0b4..f14afbd 100644 --- a/test/daemon_test.exs +++ b/test/daemon_test.exs @@ -44,6 +44,23 @@ defmodule NetRunner.DaemonTest do GenServer.stop(daemon) end + test "a crashing on_output callback does not bring the Daemon down" do + # The drain task runs under Task.Supervisor.async_nolink, so an + # uncaught error in the callback must not take the Daemon with it. + {:ok, daemon} = + Daemon.start_link(cmd: "cat", args: [], on_output: fn _ -> raise "boom" end) + + assert :ok = Daemon.write(daemon, "trigger\n") + + # Give the drain task time to read the chunk and raise. + Process.sleep(200) + + assert Process.alive?(daemon) + assert Daemon.alive?(daemon) + + GenServer.stop(daemon) + end + test "daemon cleans up on crash" do Process.flag(:trap_exit, true) {:ok, daemon} = Daemon.start_link(cmd: "sleep", args: ["100"]) diff --git a/test/net_runner_test.exs b/test/net_runner_test.exs index 9bd4665..894582c 100644 --- a/test/net_runner_test.exs +++ b/test/net_runner_test.exs @@ -82,25 +82,25 @@ defmodule NetRunnerTest do # Regression / sanity: on timeout, the OS process must be killed and # the GenServer stopped — no zombies left behind. test "timeout returns :timeout and cleans up" do - start_count = count_sleep_processes() + # Use a unique sleep duration so the count is isolated from other + # async tests that spawn `sleep` — pgrep -f matches the full argv. + marker = "47" for _ <- 1..5 do assert {:error, :timeout} = - NetRunner.run(["sleep", "30"], timeout: 100) + NetRunner.run(["sleep", marker], timeout: 100) end # Give the shepherd + watcher a moment to reap Process.sleep(300) - end_count = count_sleep_processes() - # Allow a tolerance for concurrent tests spawning sleeps - assert end_count <= start_count + 1, - "expected sleeps to be reaped; start=#{start_count} end=#{end_count}" + assert count_sleep_processes(marker) == 0, + "expected all `sleep #{marker}` processes to be reaped" end end - defp count_sleep_processes do - case System.cmd("pgrep", ["-x", "sleep"], stderr_to_stdout: true) do + defp count_sleep_processes(marker) do + case System.cmd("pgrep", ["-f", "sleep #{marker}"], stderr_to_stdout: true) do {out, 0} -> out |> String.split("\n", trim: true) |> length() _ -> 0 end diff --git a/test/process_test.exs b/test/process_test.exs index 8f23974..2a51165 100644 --- a/test/process_test.exs +++ b/test/process_test.exs @@ -182,12 +182,33 @@ defmodule NetRunner.ProcessTest do _ = consumer - # Wait for the Process GenServer to detect the DOWN, SIGKILL the - # child, reap it, and stop. - Process.sleep(500) + # Poll until the Process GenServer detects the DOWN, SIGKILLs the + # child, reaps it, and stops. Avoids a fixed sleep that flakes on + # loaded CI runners. + assert eventually(fn -> + not Process.alive?(proc_pid) and not os_pid_alive?(os_pid) + end), + "Process GenServer should have stopped and OS process should be killed" + end + end + + # Polls `fun` every 50 ms until it returns true or `timeout` elapses. + defp eventually(fun, timeout \\ 3_000) do + deadline = System.monotonic_time(:millisecond) + timeout + do_eventually(fun, deadline) + end + + defp do_eventually(fun, deadline) do + cond do + fun.() -> + true + + System.monotonic_time(:millisecond) >= deadline -> + false - refute Process.alive?(proc_pid), "Process GenServer should have stopped" - refute os_pid_alive?(os_pid), "OS process should be killed" + true -> + Process.sleep(50) + do_eventually(fun, deadline) end end diff --git a/test/signal_test.exs b/test/signal_test.exs index b9a5d67..202d6aa 100644 --- a/test/signal_test.exs +++ b/test/signal_test.exs @@ -38,8 +38,8 @@ defmodule NetRunner.SignalTest do end test "raises for invalid signals" do - assert_raise ArgumentError, fn -> Signal.resolve!(:bogus) end - assert_raise ArgumentError, fn -> Signal.resolve!(99) end + assert_raise ArgumentError, ~r/unknown signal/, fn -> Signal.resolve!(:bogus) end + assert_raise ArgumentError, ~r/unknown signal/, fn -> Signal.resolve!(99) end end end end