diff --git a/README.md b/README.md index 7a1651c..d8bd285 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ Spore is a minimal TCP tunnel implemented in Elixir/OTP. It forwards a local TCP - Optional HMAC-SHA256 challenge/response authentication - Server selects/uses a configurable port range for public listeners - Client proxies between your local service and remote connections +- Pending-connection manager backed by OTP (Registry + DynamicSupervisor) for robust cleanup +- Socket tuning flags (`--sndbuf`, `--recbuf`) for high-throughput/latency paths ## Install / Build ```bash @@ -18,12 +20,12 @@ This produces an executable named `spore` in the project directory. ## Quickstart ### Server (choose a public range) ```bash -./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 [--control-port 7835] +./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` ### Client (forward local 3000; let server assign a port) ```bash -./spore local --local-host 127.0.0.1 --local-port 3000 --to --port 0 [--control-port 7835] +./spore local --local-host 127.0.0.1 --local-port 3000 --to --port 0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Spore prints the assigned public port (for example, `listening at :20345`). @@ -34,11 +36,11 @@ python3 -m http.server 25565 ``` Terminal B (server): ```bash -./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 [--control-port 7835] +./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Terminal C (client): ```bash -./spore local --local-host 127.0.0.1 --local-port 25565 --to 127.0.0.1 --port 0 [--control-port 7835] +./spore local --local-host 127.0.0.1 --local-port 25565 --to 127.0.0.1 --port 0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Terminal D (access through tunnel): ```bash @@ -48,8 +50,8 @@ curl -v 127.0.0.1:/ ## Authentication (optional) Provide the same secret on both sides to restrict access: ```bash -./spore server --secret SECRET [--control-port 7835] -./spore local --local-port 3000 --to --secret SECRET [--control-port 7835] +./spore server --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] +./spore local --local-port 3000 --to --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] ``` ## Interoperability @@ -57,12 +59,13 @@ Spore is designed to speak the same control protocol as Bore. You can mix Rust B ## Notes / Limitations - TCP only. If you need UDP (for example, Minecraft Bedrock), use a UDP-capable tunnel. -- Pending inbound connections are stored for up to 10 seconds; if the client does not accept within that window, they are dropped. +- Pending inbound connections are stored for up to 10 seconds; if the client does not accept within that window, they are dropped. Spore uses a supervised, per-connection process to manage this lifecycle. ## Troubleshooting - "address in use" starting server: another process is listening on the control port. Use `--control-port` to choose a different one or stop the existing process. - Client exits with `:eof`: ensure server is reachable at `--to`, secrets match (or are omitted on both), and the control port is open. - Repeated "removed stale connection": ensure the client is running and a remote connection arrives soon after the client starts (Spore holds pending connections for 10s). +- Low throughput on high-latency links: increase `--sndbuf`/`--recbuf` on both ends to match your bandwidth-delay product. ## License MIT, following the upstream project. diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 67bffaa..688f424 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -4,7 +4,12 @@ defmodule Spore.Application do @impl true def start(_type, _args) do - children = [] + children = [ + {Registry, keys: :unique, name: Spore.Pending.Registry}, + {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, + {Spore.Pending, []} + ] + opts = [strategy: :one_for_one, name: Spore.Supervisor] Supervisor.start_link(children, opts) end diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index 78f4ed5..19fd751 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -21,7 +21,9 @@ defmodule Spore.CLI do to: :string, port: :integer, secret: :string, - control_port: :integer + control_port: :integer, + sndbuf: :integer, + recbuf: :integer ], aliases: [p: :port] ) @@ -34,6 +36,8 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) + if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) case Spore.Client.new(local_host, local_port, to, port, secret) do {:ok, client} -> @@ -56,7 +60,9 @@ defmodule Spore.CLI do secret: :string, bind_addr: :string, bind_tunnels: :string, - control_port: :integer + control_port: :integer, + sndbuf: :integer, + recbuf: :integer ] ) @@ -68,6 +74,8 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) + if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) case Spore.Server.listen( min_port: min_port, @@ -84,8 +92,8 @@ defmodule Spore.CLI do defp usage(io) do IO.puts(io, """ Usage: - spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] - spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] + spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] [--sndbuf N] [--recbuf N] + spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] [--sndbuf N] [--recbuf N] """) end end diff --git a/lib/spore/pending.ex b/lib/spore/pending.ex new file mode 100644 index 0000000..a479619 --- /dev/null +++ b/lib/spore/pending.ex @@ -0,0 +1,33 @@ +defmodule Spore.Pending do + @moduledoc false + use GenServer + + alias Spore.PendingConnection + + def start_link(_args) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(:ok) do + {:ok, :ok} + end + + def insert(id, socket, ttl_ms \\ 10_000) do + child_spec = %{ + id: {PendingConnection, id}, + start: {PendingConnection, :start_link, [[id: id, socket: socket, ttl_ms: ttl_ms]]}, + restart: :temporary + } + + DynamicSupervisor.start_child(Spore.Pending.Supervisor, child_spec) + :ok + end + + def take(id) do + case Registry.lookup(Spore.Pending.Registry, id) do + [{pid, :pending}] -> GenServer.call(pid, :take, 2_000) + _ -> {:error, :not_found} + end + end +end diff --git a/lib/spore/pending_connection.ex b/lib/spore/pending_connection.ex new file mode 100644 index 0000000..40f9d5e --- /dev/null +++ b/lib/spore/pending_connection.ex @@ -0,0 +1,43 @@ +defmodule Spore.PendingConnection do + @moduledoc false + use GenServer + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(opts) do + id = Keyword.fetch!(opts, :id) + socket = Keyword.fetch!(opts, :socket) + ttl_ms = Keyword.get(opts, :ttl_ms, 10_000) + + Registry.register(Spore.Pending.Registry, id, :pending) + Process.send_after(self(), :expire, ttl_ms) + {:ok, %{id: id, socket: socket}} + end + + @impl true + def handle_call(:take, _from, %{socket: socket} = state) when is_port(socket) do + {:stop, :normal, {:ok, socket}, %{state | socket: nil}} + end + + @impl true + def handle_call(:take, _from, state) do + {:reply, {:error, :gone}, state} + end + + @impl true + def handle_info(:expire, %{socket: socket, id: id} = state) do + if is_port(socket) do + :gen_tcp.close(socket) + Logger.warning("removed stale connection #{id}") + end + + {:stop, :normal, %{state | socket: nil}} + end + + @impl true + def terminate(_reason, _state), do: :ok +end diff --git a/lib/spore/server.ex b/lib/spore/server.ex index fc9a5f5..00a4024 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -43,16 +43,17 @@ defmodule Spore.Server do x -> normalize_ip(x) end - ensure_conns_table() - - control_opts = [ - :binary, - {:ip, bind_addr}, - active: false, - packet: 0, - reuseaddr: true, - nodelay: true - ] + # pending connections managed by Spore.Pending + + control_opts = + [ + :binary, + {:ip, bind_addr}, + active: false, + packet: 0, + reuseaddr: true, + nodelay: true + ] ++ Shared.socket_tune_opts() with {:ok, listen_socket} <- :gen_tcp.listen(Shared.control_port(), control_opts) do Logger.info("server listening on #{:inet.ntoa(bind_addr)}:#{Shared.control_port()}") @@ -62,6 +63,7 @@ defmodule Spore.Server do defp accept_loop(listen_socket, port_range, auth, bind_tunnels) do {:ok, socket} = :gen_tcp.accept(listen_socket) + _ = Shared.tune_socket(socket) {:ok, {ip, _}} = :inet.peername(socket) Logger.info("incoming connection from #{:inet.ntoa(ip)}") Task.start(fn -> handle_connection(socket, port_range, auth, bind_tunnels) end) @@ -102,7 +104,7 @@ defmodule Spore.Server do end {%{"Accept" => id}, d2} -> - case take_conn(id) do + case Spore.Pending.take(id) do {:ok, stream2} -> # Forward traffic bidirectionally between control socket and stored tunnel conn # buffer intentionally unused @@ -133,8 +135,9 @@ defmodule Spore.Server do defp hello_loop(d, listener) do case :gen_tcp.accept(listener, 0) do {:ok, stream2} -> + _ = Shared.tune_socket(stream2) id = Auth.generate_uuid_v4() - insert_conn(id, stream2) + Spore.Pending.insert(id, stream2, 10_000) _ = Delimited.send(d, %{"Connection" => id}) hello_loop(send_heartbeat(d), listener) @@ -222,7 +225,7 @@ defmodule Spore.Server do case Delimited.recv_timeout(d) do {%{"Accept" => id}, d2} -> - case take_conn(id) do + case Spore.Pending.take(id) do {:ok, stream2} -> # Any buffered bytes already in d2.buffer are not handled here by design, as most cases buffer is empty {:ok, parts} = :inet.getopts(socket, [:active]) @@ -242,32 +245,7 @@ defmodule Spore.Server do end end - defp ensure_conns_table do - case :ets.whereis(:spore_conns) do - :undefined -> :ets.new(:spore_conns, [:set, :public, :named_table]) - _ -> :ok - end - end - - defp insert_conn(id, socket) do - :ets.insert(:spore_conns, {id, socket}) - # Remove stale entries after 10s - Task.start(fn -> - :timer.sleep(10_000) - - case :ets.take(:spore_conns, id) do - [{^id, _}] -> Logger.warning("removed stale connection #{id}") - _ -> :ok - end - end) - end - - defp take_conn(id) do - case :ets.take(:spore_conns, id) do - [{^id, socket}] -> {:ok, socket} - _ -> :error - end - end + # pending connection management moved to Spore.Pending defp normalize_ip({_, _, _, _} = ip), do: ip defp normalize_ip({_, _, _, _, _, _, _, _} = ip), do: ip diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 3aa3c95..24234e3 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -101,12 +101,51 @@ defmodule Spore.Shared do @doc "Connect with timeout, returning a passive, binary-mode socket." @spec connect(String.t(), :inet.port_number(), timeout()) :: {:ok, socket()} | {:error, term()} def connect(host, port, timeout_ms) do - :gen_tcp.connect( - String.to_charlist(host), - port, - [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], - timeout_ms - ) + result = + :gen_tcp.connect( + String.to_charlist(host), + port, + [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], + timeout_ms + ) + + case result do + {:ok, socket} -> + _ = tune_socket(socket) + {:ok, socket} + + other -> + other + end + end + + @doc "Return socket tuning options from application env." + def socket_tune_opts do + opts = [] + + opts = + case Application.get_env(:spore, :sndbuf) do + n when is_integer(n) and n > 0 -> [{:sndbuf, n} | opts] + _ -> opts + end + + opts = + case Application.get_env(:spore, :recbuf) do + n when is_integer(n) and n > 0 -> [{:recbuf, n} | opts] + _ -> opts + end + + opts + end + + @doc "Apply tuning options to a socket." + def tune_socket(socket) do + opts = socket_tune_opts() + + case opts do + [] -> :ok + _ -> :inet.setopts(socket, opts) + end end @doc "Bidirectionally pipe data between two sockets until either closes."