Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ end, max_concurrency: 20)
| `:input` | binary \| list | `nil` | Data to write to stdin |
| `:timeout` | integer | `nil` | Wall-clock timeout in ms |
| `:max_output_size` | integer | `nil` | Max bytes to collect |
| `:stderr` | atom | `:consume` | `:consume`, `:redirect`, or `:disabled` |
| `:stderr` | atom | `:consume` | `:consume` (drained internally) or `:disabled` |
| `:pty` | boolean | `false` | Use pseudo-terminal |
| `:kill_timeout` | integer | `5000` | SIGTERM→SIGKILL escalation timeout in ms |
| `:cgroup_path` | string | `nil` | cgroup v2 path (Linux only) |
Expand Down
12 changes: 10 additions & 2 deletions c_src/net_runner_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,14 @@ static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
ssize_t n = write(fd, bin.data, bin.size);
int saved_errno = errno;

if (n >= 0) {
if (n > 0) {
enif_mutex_unlock(res->lock);
return enif_make_tuple2(env, atom_ok, enif_make_int64(env, (int64_t)n));
}
if (saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) {
/* write() returning 0 on a non-empty buffer (bin.size > 0 here) is rare
* but legal. Treat it like EAGAIN: register for write readiness and let
* the caller re-arm via :ready_output, rather than spinning. */
if (n == 0 || saved_errno == EAGAIN || saved_errno == EWOULDBLOCK) {
int sel_ret = enif_select(env, (ErlNifEvent)fd,
ERL_NIF_SELECT_WRITE, res, NULL,
atom_undefined);
Expand Down Expand Up @@ -405,6 +408,11 @@ static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
if (!enif_get_int(env, argv[1], &sig)) {
return enif_make_badarg(env);
}
/* Reject signals outside the POSIX range, mirroring shepherd.c's
* CMD_KILL validation. Bounds the blast radius of a stray nif_kill. */
if (sig < 1 || sig > 31) {
return enif_make_badarg(env);
}

if (kill((pid_t)os_pid, sig) == 0) {
return atom_ok;
Expand Down
6 changes: 3 additions & 3 deletions lib/net_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ defmodule NetRunner do

## Options

* `:stderr` - `:consume` (default, captured internally), `:redirect` (merged with stdout),
or `:disabled`
* `:stderr` - `:consume` (default, drained internally so the child never
blocks on a full stderr pipe) or `:disabled`
* `:input` - data to write to stdin (binary or enumerable)
* `:timeout` - maximum wall-clock time in milliseconds. Sends SIGTERM then SIGKILL
on timeout. Returns `{:error, :timeout}` instead of `{output, exit_status}`.
Expand Down Expand Up @@ -119,7 +119,7 @@ defmodule NetRunner do
## Options

* `:input` - data to write to stdin (binary, list, or Stream)
* `:stderr` - `:consume` (default), `:redirect`, or `:disabled`
* `:stderr` - `:consume` (default) or `:disabled`

## Examples

Expand Down
3 changes: 2 additions & 1 deletion lib/net_runner/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ defmodule NetRunner.Application do
@impl true
def start(_type, _args) do
children = [
{DynamicSupervisor, name: NetRunner.WatcherSupervisor, strategy: :one_for_one}
{DynamicSupervisor, name: NetRunner.WatcherSupervisor, strategy: :one_for_one},
{Task.Supervisor, name: NetRunner.TaskSupervisor}
]

opts = [strategy: :one_for_one, name: NetRunner.Supervisor]
Expand Down
19 changes: 11 additions & 8 deletions lib/net_runner/command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ defmodule NetRunner.Command do
%NetRunner.Command{executable: "curl", args: ["-s"], opts: [timeout: 10_000]}
"""
@spec new(String.t(), [String.t()], keyword()) :: t()
def new(executable, args \\ [], opts \\ []) do
unless is_binary(executable) do
raise ArgumentError, "executable must be a string, got: #{inspect(executable)}"
end

unless is_list(args) do
raise ArgumentError, "args must be a list, got: #{inspect(args)}"
end
def new(executable, args \\ [], opts \\ [])

def new(executable, args, opts)
when is_binary(executable) and is_list(args) and is_list(opts) do
%__MODULE__{executable: executable, args: args, opts: opts}
end

def new(executable, _args, _opts) when not is_binary(executable) do
raise ArgumentError, "executable must be a string, got: #{inspect(executable)}"
end

def new(_executable, args, _opts) when not is_list(args) do
raise ArgumentError, "args must be a list, got: #{inspect(args)}"
end

@doc """
Decomposes a command struct into `{executable, args, opts}`.

Expand Down
6 changes: 5 additions & 1 deletion lib/net_runner/daemon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ defmodule NetRunner.Daemon do
defp start_drain(proc, pipe, on_output) do
reader = if pipe == :stdout, do: &Proc.read/1, else: &Proc.read_stderr/1

# async_nolink (not async): the drain task is unlinked from the Daemon,
# so a task crash cannot take the Daemon down. Completion arrives as
# {ref, result} and abnormal exit as {:DOWN, ref, ...}, both handled in
# handle_info/2.
task =
Task.async(fn ->
Task.Supervisor.async_nolink(NetRunner.TaskSupervisor, fn ->
drain_loop(reader, proc, on_output)
end)

Expand Down
22 changes: 8 additions & 14 deletions lib/net_runner/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -337,19 +337,12 @@ defmodule NetRunner.Process do
# (which registers enif_select) or completion. This keeps enif_select
# in charge of readiness notifications; any path that parks the caller
# without going through the NIF's EAGAIN path must not be taken here.
# A zero-byte write on a non-empty buffer is mapped to :eagain inside the
# NIF (which registers select), so it can never reach this loop.
defp write_loop(<<>>, _from, state), do: {:reply, :ok, state}

defp write_loop(data, from, state) do
case Pipe.write(state.stdin, data) do
{:ok, 0} ->
# write(2) can legally return 0 on a non-empty buffer. Avoid
# spinning by forcing another nif_write — if the kernel still
# can't make progress it will return EAGAIN and register select.
# In practice this branch is unreachable on pipes/sockets but
# guards against the dirty scheduler hang regardless.
Process.sleep(1)
write_loop(data, from, state)

{:ok, bytes_written} ->
stats = Stats.record_write(state.stats, bytes_written)
state = %{state | stats: stats}
Expand Down Expand Up @@ -438,11 +431,6 @@ defmodule NetRunner.Process do

defp retry_write_loop(ref, from, data, state) do
case Pipe.write(state.stdin, data) do
{:ok, 0} ->
# Unreachable on pipes/sockets in practice; ops entry still valid
# with full data. A subsequent :ready_output will drive the retry.
state

{:ok, bytes_written} ->
stats = Stats.record_write(state.stats, bytes_written)
state = %{state | stats: stats}
Expand Down Expand Up @@ -552,6 +540,12 @@ defmodule NetRunner.Process do
{:child_exited, status} ->
finish_exit(state, status)

{:shepherd_error, msg} ->
require Logger

Logger.warning("[NetRunner] shepherd reported error: #{inspect(msg)}")
state

_ ->
state
end
Expand Down
71 changes: 57 additions & 14 deletions lib/net_runner/process/exec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,38 @@ defmodule NetRunner.Process.Exec do
uds_path = uds_socket_path()
pty_mode = Keyword.get(opts, :pty, false)

with :ok <- validate_cmd_and_args(cmd, args),
:ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)),
{:ok, listen_socket} <- create_uds_listener(uds_path),
shepherd_port <- open_shepherd(uds_path, cmd, args, opts),
{:ok, conn_socket} <- accept_connection(listen_socket),
:ok <- cleanup_listener(listen_socket, uds_path) do
# conn_socket and shepherd_port are now live — clean up on any failure
setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode)
else
{:error, reason} -> {:error, reason}
result =
with :ok <- validate_cmd_and_args(cmd, args),
:ok <- validate_stderr_mode(Keyword.get(opts, :stderr, :consume), pty_mode),
:ok <- validate_cgroup_path(Keyword.get(opts, :cgroup_path, nil)),
{:ok, listen_socket} <- create_uds_listener(uds_path),
shepherd_port <- open_shepherd(uds_path, cmd, args, opts),
{:ok, conn_socket} <- accept_connection(listen_socket),
:ok <- cleanup_listener(listen_socket, uds_path) do
# conn_socket and shepherd_port are now live — clean up on any failure
setup_after_connection(conn_socket, shepherd_port, owner, cmd, args, opts, pty_mode)
end

# On the happy path cleanup_listener removed the socket file and its
# per-spawn dir. On any failure (validation, accept timeout, ...) the dir
# may still exist — remove it so spawns don't leak empty 0700 dirs.
case result do
{:ok, _} = ok -> ok
{:error, _} = err -> cleanup_uds_dir_passthrough(uds_path, err)
end
end

defp cleanup_uds_dir(uds_path) do
_ = File.rm(uds_path)
_ = File.rmdir(Path.dirname(uds_path))
:ok
end

defp cleanup_uds_dir_passthrough(uds_path, err) do
cleanup_uds_dir(uds_path)
err
end

# Reject NUL bytes in cmd/args early; passing them through Port.open's
# args: option is undefined and could truncate a cmd string on the C side.
defp validate_cmd_and_args(cmd, args) do
Expand Down Expand Up @@ -101,6 +120,16 @@ defmodule NetRunner.Process.Exec do
_, _ -> :ok
end

# In PTY mode stderr is folded into the bidirectional master FD, so the
# :stderr option is ignored. In pipe mode only :consume (drained internally
# to avoid blocking the child on a full pipe) and :disabled are supported.
defp validate_stderr_mode(_mode, true), do: :ok
defp validate_stderr_mode(mode, false) when mode in [:consume, :disabled], do: :ok

defp validate_stderr_mode(mode, false) do
{:error, {:invalid_stderr, "must be :consume or :disabled, got: #{inspect(mode)}"}}
end

defp validate_cgroup_path(nil), do: :ok

defp validate_cgroup_path(path) do
Expand All @@ -118,9 +147,18 @@ defmodule NetRunner.Process.Exec do
end
end

# Place the socket inside a per-spawn 0700 directory so only the current
# user can traverse to it. Without this the socket lives directly in the
# world-traversable tmp dir, and a same-host attacker who wins the accept
# race against the real shepherd would receive the child's pipe FDs via
# SCM_RIGHTS. The 0700 dir reduces the threat to same-uid processes (which
# are already inside our trust domain).
defp uds_socket_path do
random = Base.encode16(:crypto.strong_rand_bytes(8), case: :lower)
Path.join(System.tmp_dir!(), "net_runner_#{random}.sock")
dir = Path.join(System.tmp_dir!(), "net_runner_#{random}")
File.mkdir_p!(dir)
File.chmod!(dir, 0o700)
Path.join(dir, "shepherd.sock")
end

defp create_uds_listener(path) do
Expand Down Expand Up @@ -178,9 +216,14 @@ defmodule NetRunner.Process.Exec do
:socket.close(listen_socket)

case File.rm(path) do
:ok -> :ok
{:error, :enoent} -> :ok
{:error, reason} -> {:error, {:uds_path_cleanup_failed, reason}}
result when result in [:ok, {:error, :enoent}] ->
# Best-effort removal of the per-spawn 0700 dir created in
# uds_socket_path/0; it is empty once the socket file is gone.
_ = File.rmdir(Path.dirname(path))
:ok

{:error, reason} ->
{:error, {:uds_path_cleanup_failed, reason}}
end
end

Expand Down
53 changes: 36 additions & 17 deletions lib/net_runner/process/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,73 @@ defmodule NetRunner.Process.Operations do
@type op_type :: :read | :write | {:read, :stdout | :stderr}
@type pending_op :: {op_type(), GenServer.from(), term()}

defstruct pending: %{}, monitors: %{}
defstruct pending: %{}, monitors: %{}, op_monitors: %{}

@type t :: %__MODULE__{
pending: %{reference() => pending_op()},
monitors: %{reference() => reference()}
monitors: %{reference() => reference()},
op_monitors: %{reference() => reference()}
}

@doc """
Parks a caller that received :eagain. Monitors the caller so the entry
can be reclaimed if the caller crashes or times out before the GenServer
can reply. Returns updated ops and a ref for matching.
"""
def park(%__MODULE__{pending: pending, monitors: monitors} = ops, type, from, context \\ nil) do
def park(%__MODULE__{} = ops, type, from, context \\ nil) do
ref = make_ref()
op = {type, from, context}

{caller_pid, _} = from
mref = Process.monitor(caller_pid)

{%{ops | pending: Map.put(pending, ref, op), monitors: Map.put(monitors, mref, ref)}, ref}
new_ops = %{
ops
| pending: Map.put(ops.pending, ref, op),
monitors: Map.put(ops.monitors, mref, ref),
op_monitors: Map.put(ops.op_monitors, ref, mref)
}

{new_ops, ref}
end

@doc """
Retrieves and removes a pending operation by ref. Demonitors the caller
we established in park/4.
"""
def pop(%__MODULE__{pending: pending, monitors: monitors} = ops, ref) do
def pop(%__MODULE__{pending: pending} = ops, ref) do
case Map.pop(pending, ref) do
{nil, _} ->
{nil, ops}

{op, rest} ->
monitors = demonitor_for_op(monitors, ref)
{op, %{ops | pending: rest, monitors: monitors}}
ops = demonitor_for_op(%{ops | pending: rest}, ref)
{op, ops}
end
end

@doc """
Removes the pending op whose caller-monitor ref matches `mref` (invoked
from the GenServer's :DOWN handler). Returns {op_or_nil, new_ops}.
"""
def pop_by_monitor(%__MODULE__{pending: pending, monitors: monitors} = ops, mref) do
def pop_by_monitor(
%__MODULE__{pending: pending, monitors: monitors, op_monitors: op_monitors} = ops,
mref
) do
case Map.pop(monitors, mref) do
{nil, _} ->
{nil, ops}

{op_ref, monitors_rest} ->
{op, pending_rest} = Map.pop(pending, op_ref)
{op, %{ops | pending: pending_rest, monitors: monitors_rest}}

{op,
%{
ops
| pending: pending_rest,
monitors: monitors_rest,
op_monitors: Map.delete(op_monitors, op_ref)
}}
end
end

Expand All @@ -74,19 +92,20 @@ defmodule NetRunner.Process.Operations do

Enum.each(monitors, fn {mref, _op_ref} -> Process.demonitor(mref, [:flush]) end)

%{ops | pending: %{}, monitors: %{}}
%{ops | pending: %{}, monitors: %{}, op_monitors: %{}}
end

def empty?(%__MODULE__{pending: pending}), do: map_size(pending) == 0

defp demonitor_for_op(monitors, op_ref) do
case Enum.find(monitors, fn {_mref, r} -> r == op_ref end) do
{mref, _} ->
Process.demonitor(mref, [:flush])
Map.delete(monitors, mref)
# O(1) demonitor via the op_ref -> mref reverse index.
defp demonitor_for_op(%__MODULE__{monitors: monitors, op_monitors: op_monitors} = ops, op_ref) do
case Map.pop(op_monitors, op_ref) do
{nil, _} ->
ops

nil ->
monitors
{mref, op_monitors_rest} ->
Process.demonitor(mref, [:flush])
%{ops | monitors: Map.delete(monitors, mref), op_monitors: op_monitors_rest}
end
end
end
2 changes: 1 addition & 1 deletion lib/net_runner/process/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule NetRunner.Process.State do
owner_ref: reference() | nil,
operations: Operations.t(),
awaiting_exit: [GenServer.from()],
stderr_mode: :consume | :redirect | :disabled,
stderr_mode: :consume | :disabled,
stderr_buffer: [binary()],
status: status()
}
Expand Down
4 changes: 2 additions & 2 deletions test/command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ defmodule NetRunner.CommandTest do
end

test "preserves multiple opts" do
cmd = Command.new("cmd", [], timeout: 5_000, stderr: :redirect, pty: true)
cmd = Command.new("cmd", [], timeout: 5_000, stderr: :disabled, pty: true)

assert cmd.opts == [timeout: 5_000, stderr: :redirect, pty: true]
assert cmd.opts == [timeout: 5_000, stderr: :disabled, pty: true]
end

test "raises on non-binary executable" do
Expand Down
Loading
Loading