From c77998929aeb9170b655b58d5e58ebb1d6e3ce0e Mon Sep 17 00:00:00 2001 From: Rastrian Date: Sun, 21 Sep 2025 16:25:19 -0300 Subject: [PATCH 1/2] feat(server): replace ETS with supervised pending manager; add socket sndbuf/recbuf tuning and apply on accept/connect --- README.md | 17 +++++++------ lib/spore/application.ex | 6 ++++- lib/spore/cli.ex | 16 +++++++++---- lib/spore/pending.ex | 32 +++++++++++++++++++++++++ lib/spore/pending_connection.ex | 42 +++++++++++++++++++++++++++++++++ lib/spore/server.ex | 39 +++++++----------------------- lib/spore/shared.ex | 33 +++++++++++++++++++++++++- 7 files changed, 141 insertions(+), 44 deletions(-) create mode 100644 lib/spore/pending.ex create mode 100644 lib/spore/pending_connection.ex diff --git a/README.md b/README.md index 7a1651c..d8bd285 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ Spore is a minimal TCP tunnel implemented in Elixir/OTP. It forwards a local TCP - Optional HMAC-SHA256 challenge/response authentication - Server selects/uses a configurable port range for public listeners - Client proxies between your local service and remote connections +- Pending-connection manager backed by OTP (Registry + DynamicSupervisor) for robust cleanup +- Socket tuning flags (`--sndbuf`, `--recbuf`) for high-throughput/latency paths ## Install / Build ```bash @@ -18,12 +20,12 @@ This produces an executable named `spore` in the project directory. ## Quickstart ### Server (choose a public range) ```bash -./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 [--control-port 7835] +./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` ### Client (forward local 3000; let server assign a port) ```bash -./spore local --local-host 127.0.0.1 --local-port 3000 --to --port 0 [--control-port 7835] +./spore local --local-host 127.0.0.1 --local-port 3000 --to --port 0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Spore prints the assigned public port (for example, `listening at :20345`). @@ -34,11 +36,11 @@ python3 -m http.server 25565 ``` Terminal B (server): ```bash -./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 [--control-port 7835] +./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Terminal C (client): ```bash -./spore local --local-host 127.0.0.1 --local-port 25565 --to 127.0.0.1 --port 0 [--control-port 7835] +./spore local --local-host 127.0.0.1 --local-port 25565 --to 127.0.0.1 --port 0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] ``` Terminal D (access through tunnel): ```bash @@ -48,8 +50,8 @@ curl -v 127.0.0.1:/ ## Authentication (optional) Provide the same secret on both sides to restrict access: ```bash -./spore server --secret SECRET [--control-port 7835] -./spore local --local-port 3000 --to --secret SECRET [--control-port 7835] +./spore server --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] +./spore local --local-port 3000 --to --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] ``` ## Interoperability @@ -57,12 +59,13 @@ Spore is designed to speak the same control protocol as Bore. You can mix Rust B ## Notes / Limitations - TCP only. If you need UDP (for example, Minecraft Bedrock), use a UDP-capable tunnel. -- Pending inbound connections are stored for up to 10 seconds; if the client does not accept within that window, they are dropped. +- Pending inbound connections are stored for up to 10 seconds; if the client does not accept within that window, they are dropped. Spore uses a supervised, per-connection process to manage this lifecycle. ## Troubleshooting - "address in use" starting server: another process is listening on the control port. Use `--control-port` to choose a different one or stop the existing process. - Client exits with `:eof`: ensure server is reachable at `--to`, secrets match (or are omitted on both), and the control port is open. - Repeated "removed stale connection": ensure the client is running and a remote connection arrives soon after the client starts (Spore holds pending connections for 10s). +- Low throughput on high-latency links: increase `--sndbuf`/`--recbuf` on both ends to match your bandwidth-delay product. ## License MIT, following the upstream project. diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 67bffaa..706e9b8 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -4,7 +4,11 @@ defmodule Spore.Application do @impl true def start(_type, _args) do - children = [] + children = [ + {Registry, keys: :unique, name: Spore.Pending.Registry}, + {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, + {Spore.Pending, []} + ] opts = [strategy: :one_for_one, name: Spore.Supervisor] Supervisor.start_link(children, opts) end diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index 78f4ed5..19fd751 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -21,7 +21,9 @@ defmodule Spore.CLI do to: :string, port: :integer, secret: :string, - control_port: :integer + control_port: :integer, + sndbuf: :integer, + recbuf: :integer ], aliases: [p: :port] ) @@ -34,6 +36,8 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) + if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) case Spore.Client.new(local_host, local_port, to, port, secret) do {:ok, client} -> @@ -56,7 +60,9 @@ defmodule Spore.CLI do secret: :string, bind_addr: :string, bind_tunnels: :string, - control_port: :integer + control_port: :integer, + sndbuf: :integer, + recbuf: :integer ] ) @@ -68,6 +74,8 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) + if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) case Spore.Server.listen( min_port: min_port, @@ -84,8 +92,8 @@ defmodule Spore.CLI do defp usage(io) do IO.puts(io, """ Usage: - spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] - spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] + spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] [--sndbuf N] [--recbuf N] + spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] [--sndbuf N] [--recbuf N] """) end end diff --git a/lib/spore/pending.ex b/lib/spore/pending.ex new file mode 100644 index 0000000..0848251 --- /dev/null +++ b/lib/spore/pending.ex @@ -0,0 +1,32 @@ +defmodule Spore.Pending do + @moduledoc false + use GenServer + + alias Spore.PendingConnection + + def start_link(_args) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(:ok) do + {:ok, :ok} + end + + def insert(id, socket, ttl_ms \\ 10_000) do + child_spec = %{ + id: {PendingConnection, id}, + start: {PendingConnection, :start_link, [[id: id, socket: socket, ttl_ms: ttl_ms]]}, + restart: :temporary + } + DynamicSupervisor.start_child(Spore.Pending.Supervisor, child_spec) + :ok + end + + def take(id) do + case Registry.lookup(Spore.Pending.Registry, id) do + [{pid, :pending}] -> GenServer.call(pid, :take, 2_000) + _ -> {:error, :not_found} + end + end +end diff --git a/lib/spore/pending_connection.ex b/lib/spore/pending_connection.ex new file mode 100644 index 0000000..72e0186 --- /dev/null +++ b/lib/spore/pending_connection.ex @@ -0,0 +1,42 @@ +defmodule Spore.PendingConnection do + @moduledoc false + use GenServer + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(opts) do + id = Keyword.fetch!(opts, :id) + socket = Keyword.fetch!(opts, :socket) + ttl_ms = Keyword.get(opts, :ttl_ms, 10_000) + + Registry.register(Spore.Pending.Registry, id, :pending) + Process.send_after(self(), :expire, ttl_ms) + {:ok, %{id: id, socket: socket}} + end + + @impl true + def handle_call(:take, _from, %{socket: socket} = state) when is_port(socket) do + {:stop, :normal, {:ok, socket}, %{state | socket: nil}} + end + + @impl true + def handle_call(:take, _from, state) do + {:reply, {:error, :gone}, state} + end + + @impl true + def handle_info(:expire, %{socket: socket, id: id} = state) do + if is_port(socket) do + :gen_tcp.close(socket) + Logger.warning("removed stale connection #{id}") + end + {:stop, :normal, %{state | socket: nil}} + end + + @impl true + def terminate(_reason, _state), do: :ok +end diff --git a/lib/spore/server.ex b/lib/spore/server.ex index fc9a5f5..6d0a85c 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -43,7 +43,7 @@ defmodule Spore.Server do x -> normalize_ip(x) end - ensure_conns_table() + # pending connections managed by Spore.Pending control_opts = [ :binary, @@ -52,7 +52,7 @@ defmodule Spore.Server do packet: 0, reuseaddr: true, nodelay: true - ] + ] ++ Shared.socket_tune_opts() with {:ok, listen_socket} <- :gen_tcp.listen(Shared.control_port(), control_opts) do Logger.info("server listening on #{:inet.ntoa(bind_addr)}:#{Shared.control_port()}") @@ -62,6 +62,7 @@ defmodule Spore.Server do defp accept_loop(listen_socket, port_range, auth, bind_tunnels) do {:ok, socket} = :gen_tcp.accept(listen_socket) + _ = Shared.tune_socket(socket) {:ok, {ip, _}} = :inet.peername(socket) Logger.info("incoming connection from #{:inet.ntoa(ip)}") Task.start(fn -> handle_connection(socket, port_range, auth, bind_tunnels) end) @@ -102,7 +103,7 @@ defmodule Spore.Server do end {%{"Accept" => id}, d2} -> - case take_conn(id) do + case Spore.Pending.take(id) do {:ok, stream2} -> # Forward traffic bidirectionally between control socket and stored tunnel conn # buffer intentionally unused @@ -133,8 +134,9 @@ defmodule Spore.Server do defp hello_loop(d, listener) do case :gen_tcp.accept(listener, 0) do {:ok, stream2} -> + _ = Shared.tune_socket(stream2) id = Auth.generate_uuid_v4() - insert_conn(id, stream2) + Spore.Pending.insert(id, stream2, 10_000) _ = Delimited.send(d, %{"Connection" => id}) hello_loop(send_heartbeat(d), listener) @@ -222,7 +224,7 @@ defmodule Spore.Server do case Delimited.recv_timeout(d) do {%{"Accept" => id}, d2} -> - case take_conn(id) do + case Spore.Pending.take(id) do {:ok, stream2} -> # Any buffered bytes already in d2.buffer are not handled here by design, as most cases buffer is empty {:ok, parts} = :inet.getopts(socket, [:active]) @@ -242,32 +244,7 @@ defmodule Spore.Server do end end - defp ensure_conns_table do - case :ets.whereis(:spore_conns) do - :undefined -> :ets.new(:spore_conns, [:set, :public, :named_table]) - _ -> :ok - end - end - - defp insert_conn(id, socket) do - :ets.insert(:spore_conns, {id, socket}) - # Remove stale entries after 10s - Task.start(fn -> - :timer.sleep(10_000) - - case :ets.take(:spore_conns, id) do - [{^id, _}] -> Logger.warning("removed stale connection #{id}") - _ -> :ok - end - end) - end - - defp take_conn(id) do - case :ets.take(:spore_conns, id) do - [{^id, socket}] -> {:ok, socket} - _ -> :error - end - end + # pending connection management moved to Spore.Pending defp normalize_ip({_, _, _, _} = ip), do: ip defp normalize_ip({_, _, _, _, _, _, _, _} = ip), do: ip diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 3aa3c95..5afd04d 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -101,12 +101,43 @@ defmodule Spore.Shared do @doc "Connect with timeout, returning a passive, binary-mode socket." @spec connect(String.t(), :inet.port_number(), timeout()) :: {:ok, socket()} | {:error, term()} def connect(host, port, timeout_ms) do - :gen_tcp.connect( + result = :gen_tcp.connect( String.to_charlist(host), port, [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], timeout_ms ) + case result do + {:ok, socket} -> + _ = tune_socket(socket) + {:ok, socket} + other -> other + end + end + + @doc "Return socket tuning options from application env." + def socket_tune_opts do + opts = [] + opts = + case Application.get_env(:spore, :sndbuf) do + n when is_integer(n) and n > 0 -> [{:sndbuf, n} | opts] + _ -> opts + end + opts = + case Application.get_env(:spore, :recbuf) do + n when is_integer(n) and n > 0 -> [{:recbuf, n} | opts] + _ -> opts + end + opts + end + + @doc "Apply tuning options to a socket." + def tune_socket(socket) do + opts = socket_tune_opts() + case opts do + [] -> :ok + _ -> :inet.setopts(socket, opts) + end end @doc "Bidirectionally pipe data between two sockets until either closes." From 403060824c23bd476a49d07dabf24e1343c3f29f Mon Sep 17 00:00:00 2001 From: Rastrian Date: Sun, 21 Sep 2025 16:26:53 -0300 Subject: [PATCH 2/2] fix code format --- lib/spore/application.ex | 1 + lib/spore/pending.ex | 1 + lib/spore/pending_connection.ex | 1 + lib/spore/server.ex | 17 +++++++++-------- lib/spore/shared.ex | 22 +++++++++++++++------- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 706e9b8..688f424 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -9,6 +9,7 @@ defmodule Spore.Application do {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, {Spore.Pending, []} ] + opts = [strategy: :one_for_one, name: Spore.Supervisor] Supervisor.start_link(children, opts) end diff --git a/lib/spore/pending.ex b/lib/spore/pending.ex index 0848251..a479619 100644 --- a/lib/spore/pending.ex +++ b/lib/spore/pending.ex @@ -19,6 +19,7 @@ defmodule Spore.Pending do start: {PendingConnection, :start_link, [[id: id, socket: socket, ttl_ms: ttl_ms]]}, restart: :temporary } + DynamicSupervisor.start_child(Spore.Pending.Supervisor, child_spec) :ok end diff --git a/lib/spore/pending_connection.ex b/lib/spore/pending_connection.ex index 72e0186..40f9d5e 100644 --- a/lib/spore/pending_connection.ex +++ b/lib/spore/pending_connection.ex @@ -34,6 +34,7 @@ defmodule Spore.PendingConnection do :gen_tcp.close(socket) Logger.warning("removed stale connection #{id}") end + {:stop, :normal, %{state | socket: nil}} end diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 6d0a85c..00a4024 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -45,14 +45,15 @@ defmodule Spore.Server do # pending connections managed by Spore.Pending - control_opts = [ - :binary, - {:ip, bind_addr}, - active: false, - packet: 0, - reuseaddr: true, - nodelay: true - ] ++ Shared.socket_tune_opts() + control_opts = + [ + :binary, + {:ip, bind_addr}, + active: false, + packet: 0, + reuseaddr: true, + nodelay: true + ] ++ Shared.socket_tune_opts() with {:ok, listen_socket} <- :gen_tcp.listen(Shared.control_port(), control_opts) do Logger.info("server listening on #{:inet.ntoa(bind_addr)}:#{Shared.control_port()}") diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 5afd04d..24234e3 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -101,39 +101,47 @@ defmodule Spore.Shared do @doc "Connect with timeout, returning a passive, binary-mode socket." @spec connect(String.t(), :inet.port_number(), timeout()) :: {:ok, socket()} | {:error, term()} def connect(host, port, timeout_ms) do - result = :gen_tcp.connect( - String.to_charlist(host), - port, - [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], - timeout_ms - ) + result = + :gen_tcp.connect( + String.to_charlist(host), + port, + [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], + timeout_ms + ) + case result do {:ok, socket} -> _ = tune_socket(socket) {:ok, socket} - other -> other + + other -> + other end end @doc "Return socket tuning options from application env." def socket_tune_opts do opts = [] + opts = case Application.get_env(:spore, :sndbuf) do n when is_integer(n) and n > 0 -> [{:sndbuf, n} | opts] _ -> opts end + opts = case Application.get_env(:spore, :recbuf) do n when is_integer(n) and n > 0 -> [{:recbuf, n} | opts] _ -> opts end + opts end @doc "Apply tuning options to a socket." def tune_socket(socket) do opts = socket_tune_opts() + case opts do [] -> :ok _ -> :inet.setopts(socket, opts)