From e277e9c131cfeb9da2e4c1a13e8cd01ad35b8af5 Mon Sep 17 00:00:00 2001 From: Rastrian Date: Sun, 21 Sep 2025 16:49:43 -0300 Subject: [PATCH 1/2] feat(control-plane): TLS; IP allow/deny and per-IP limits; multi-secret auth; expose Prometheus metrics --- .github/workflows/spore.yml | 8 +++ README.md | 22 ++++++- lib/spore/acl.ex | 53 +++++++++++++++++ lib/spore/application.ex | 4 +- lib/spore/auth.ex | 24 ++++++++ lib/spore/cli.ex | 22 ++++++- lib/spore/client.ex | 6 +- lib/spore/limits.ex | 40 +++++++++++++ lib/spore/metrics.ex | 92 +++++++++++++++++++++++++++++ lib/spore/pending_connection.ex | 1 + lib/spore/server.ex | 70 +++++++++++++++++++--- lib/spore/shared.ex | 100 ++++++++++++++++++++++---------- mix.exs | 2 +- 13 files changed, 397 insertions(+), 47 deletions(-) create mode 100644 lib/spore/acl.ex create mode 100644 lib/spore/limits.ex create mode 100644 lib/spore/metrics.ex diff --git a/.github/workflows/spore.yml b/.github/workflows/spore.yml index 8a30521..113be8d 100644 --- a/.github/workflows/spore.yml +++ b/.github/workflows/spore.yml @@ -58,3 +58,11 @@ jobs: - name: Verify escript working-directory: ${{ env.WORKDIR }} run: test -x ./spore && ls -l ./spore + + - name: Upload artifact (spore) + uses: actions/upload-artifact@v4 + with: + name: spore-${{ runner.os }}-${{ github.sha }} + path: ${{ env.WORKDIR }}/spore + if-no-files-found: error + retention-days: 14 diff --git a/README.md b/README.md index d8bd285..7c969f2 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,9 @@ Spore is a minimal TCP tunnel implemented in Elixir/OTP. It forwards a local TCP - Client proxies between your local service and remote connections - Pending-connection manager backed by OTP (Registry + DynamicSupervisor) for robust cleanup - Socket tuning flags (`--sndbuf`, `--recbuf`) for high-throughput/latency paths + - Optional TLS on control plane (`--tls`); multi-secret auth (comma-separated) + - Access control lists and limits: `--allow`, `--deny`, `--max-conns-per-ip` + - Prometheus metrics endpoint (text exposition) ## Install / Build ```bash @@ -20,7 +23,10 @@ This produces an executable named `spore` in the project directory. ## Quickstart ### Server (choose a public range) ```bash -./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 [--control-port 7835] [--sndbuf 1048576] [--recbuf 1048576] +./spore server --min-port 20000 --max-port 21000 --bind-addr 0.0.0.0 \ + [--control-port 7835] [--tls --certfile cert.pem --keyfile key.pem] \ + [--allow "10.0.0.0/8,192.168.0.0/16"] [--deny "0.0.0.0/0"] [--max-conns-per-ip 50] \ + [--sndbuf 1048576] [--recbuf 1048576] ``` ### Client (forward local 3000; let server assign a port) @@ -50,10 +56,21 @@ curl -v 127.0.0.1:/ ## Authentication (optional) Provide the same secret on both sides to restrict access: ```bash -./spore server --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] +./spore server --secret "secret1,secret2" [--control-port 7835] [--sndbuf N] [--recbuf N] ./spore local --local-port 3000 --to --secret SECRET [--control-port 7835] [--sndbuf N] [--recbuf N] ``` +## Metrics +Spore exposes basic counters and latency sums in Prometheus text format if `SPORE_METRICS_PORT` or `--metrics-port` is set (server-only). Example: +``` +spore_connections_incoming_total 42 +spore_connections_accepted_total 40 +spore_connections_stale_total 2 +spore_bytes_proxied_total 1234567 +spore_accept_latency_ms_sum 350 +spore_accept_latency_ms_count 40 +``` + ## Interoperability Spore is designed to speak the same control protocol as Bore. You can mix Rust Bore on one side and Spore on the other as long as secrets and addresses match. See Bore docs for protocol details: [bore (Rust)](https://github.com/ekzhang/bore). @@ -66,6 +83,7 @@ Spore is designed to speak the same control protocol as Bore. You can mix Rust B - Client exits with `:eof`: ensure server is reachable at `--to`, secrets match (or are omitted on both), and the control port is open. - Repeated "removed stale connection": ensure the client is running and a remote connection arrives soon after the client starts (Spore holds pending connections for 10s). - Low throughput on high-latency links: increase `--sndbuf`/`--recbuf` on both ends to match your bandwidth-delay product. + - TLS client trust: provide `--cacertfile` on the client or use `--insecure` in test environments. ## License MIT, following the upstream project. diff --git a/lib/spore/acl.ex b/lib/spore/acl.ex new file mode 100644 index 0000000..bd45490 --- /dev/null +++ b/lib/spore/acl.ex @@ -0,0 +1,53 @@ +defmodule Spore.ACL do + @moduledoc false + import Bitwise + + @spec allow?(tuple()) :: boolean() + def allow?(ip) do + allow = Application.get_env(:spore, :allow, []) + deny = Application.get_env(:spore, :deny, []) + + allowed = case allow do + [] -> true + _ -> Enum.any?(allow, &match_ip?(ip, &1)) + end + + denied = Enum.any?(deny, &match_ip?(ip, &1)) + allowed and not denied + end + + @spec parse_list(String.t()) :: list() + def parse_list(s) when is_binary(s) do + s + |> String.split([",", " ", "\n"], trim: true) + |> Enum.map(&parse_entry/1) + |> Enum.filter(& &1) + end + + defp parse_entry(entry) do + case String.split(entry, "/", parts: 2) do + [ip] -> + case :inet.parse_address(String.to_charlist(ip)) do + {:ok, addr} -> {:ip, addr} + _ -> nil + end + [ip, masklen] -> + with {:ok, addr} <- :inet.parse_address(String.to_charlist(ip)), + {len, ""} <- Integer.parse(masklen) do + {:cidr, addr, len} + else + _ -> nil + end + end + end + + defp match_ip?(ip, {:ip, addr}), do: ip == addr + defp match_ip?({a,b,c,d}, {:cidr, {a2,b2,c2,d2}, len}) when is_integer(len) and len>=0 and len<=32 do + mask = bnot((1 <<< (32 - len)) - 1) &&& 0xFFFFFFFF + ipi = (a<<<24) + (b<<<16) + (c<<<8) + d + base = (a2<<<24) + (b2<<<16) + (c2<<<8) + d2 + (ipi &&& mask) == (base &&& mask) + end + defp match_ip?({_,_,_,_,_,_,_,_}, {:cidr, _, _}), do: false + defp match_ip?(_, _), do: false +end diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 688f424..ecb161f 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -7,7 +7,9 @@ defmodule Spore.Application do children = [ {Registry, keys: :unique, name: Spore.Pending.Registry}, {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, - {Spore.Pending, []} + {Spore.Pending, []}, + {Spore.Limits, []}, + {Spore.Metrics, []} ] opts = [strategy: :one_for_one, name: Spore.Supervisor] diff --git a/lib/spore/auth.ex b/lib/spore/auth.ex index 38a1e0f..79dcc01 100644 --- a/lib/spore/auth.ex +++ b/lib/spore/auth.ex @@ -15,6 +15,18 @@ defmodule Spore.Auth do %{key: :crypto.hash(:sha256, secret)} end + @doc "Create multiple authenticators from a comma-separated list." + def new_many(secret_or_list) do + cond do + is_list(secret_or_list) -> Enum.map(secret_or_list, &new/1) + is_binary(secret_or_list) -> + secret_or_list + |> String.split([",", " ", "\n"], trim: true) + |> Enum.map(&new/1) + true -> [] + end + end + @doc "Generate a reply tag for a challenge UUID string." @spec answer(t, String.t()) :: String.t() def answer(%{key: key}, challenge_uuid_string) do @@ -51,6 +63,18 @@ defmodule Spore.Auth do end end + @doc "Server handshake accepting any of a list of authenticators." + def server_handshake_many(auths, d) when is_list(auths) and auths != [] do + challenge = generate_uuid_v4() + {:ok, _} = Spore.Shared.Delimited.send(d, %{"Challenge" => challenge}) + case Spore.Shared.Delimited.recv_timeout(d) do + {%{"Authenticate" => tag}, d2} -> + ok = Enum.any?(auths, fn a -> validate(a, challenge, tag) end) + if ok, do: {:ok, d2}, else: {{:error, :invalid_secret}, d2} + {_, d2} -> {{:error, :missing_authentication}, d2} + end + end + @doc "Client-side handshake: expect Challenge and respond with Authenticate." def client_handshake(%{key: _} = auth, d) do case Spore.Shared.Delimited.recv_timeout(d) do diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index 19fd751..2c2944d 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -22,6 +22,9 @@ defmodule Spore.CLI do port: :integer, secret: :string, control_port: :integer, + tls: :boolean, + cacertfile: :string, + insecure: :boolean, sndbuf: :integer, recbuf: :integer ], @@ -36,6 +39,9 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) + if cacert = Keyword.get(opts, :cacertfile), do: Application.put_env(:spore, :cacertfile, cacert) + if Keyword.get(opts, :insecure), do: Application.put_env(:spore, :ssl_verify, false) if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) @@ -61,6 +67,12 @@ defmodule Spore.CLI do bind_addr: :string, bind_tunnels: :string, control_port: :integer, + tls: :boolean, + certfile: :string, + keyfile: :string, + allow: :string, + deny: :string, + max_conns_per_ip: :integer, sndbuf: :integer, recbuf: :integer ] @@ -74,6 +86,12 @@ defmodule Spore.CLI do control_port = Keyword.get(opts, :control_port, nil) if control_port, do: Application.put_env(:spore, :control_port, control_port) + if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) + if cert = Keyword.get(opts, :certfile), do: Application.put_env(:spore, :certfile, cert) + if key = Keyword.get(opts, :keyfile), do: Application.put_env(:spore, :keyfile, key) + if allow = Keyword.get(opts, :allow), do: Application.put_env(:spore, :allow, Spore.ACL.parse_list(allow)) + if deny = Keyword.get(opts, :deny), do: Application.put_env(:spore, :deny, Spore.ACL.parse_list(deny)) + if m = Keyword.get(opts, :max_conns_per_ip), do: Application.put_env(:spore, :max_conns_per_ip, m) if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) @@ -92,8 +110,8 @@ defmodule Spore.CLI do defp usage(io) do IO.puts(io, """ Usage: - spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] [--sndbuf N] [--recbuf N] - spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] [--sndbuf N] [--recbuf N] + spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] [--tls] [--cacertfile PATH] [--insecure] [--sndbuf N] [--recbuf N] + spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] [--tls] [--certfile PATH] [--keyfile PATH] [--allow CIDRs] [--deny CIDRs] [--max-conns-per-ip N] [--sndbuf N] [--recbuf N] """) end end diff --git a/lib/spore/client.ex b/lib/spore/client.ex index 7a77bdb..6fb335a 100644 --- a/lib/spore/client.ex +++ b/lib/spore/client.ex @@ -24,7 +24,7 @@ defmodule Spore.Client do {:ok, t} | {:error, term()} def new(local_host, local_port, to, port, secret) do with {:ok, socket} <- Shared.connect(to, Shared.control_port(), Shared.network_timeout_ms()) do - d = Delimited.new(socket) + d = Delimited.new(socket, Shared.transport_mod()) auth = if secret, do: Auth.new(secret), else: nil d = @@ -121,7 +121,7 @@ defmodule Spore.Client do defp handle_connection(id, %__MODULE__{} = state) do with {:ok, remote_conn} <- Shared.connect(state.to, Shared.control_port(), Shared.network_timeout_ms()) do - d = Delimited.new(remote_conn) + d = Delimited.new(remote_conn, Shared.transport_mod()) d = case state.auth do @@ -140,7 +140,7 @@ defmodule Spore.Client do case Shared.connect(state.local_host, state.local_port, Shared.network_timeout_ms()) do {:ok, local_conn} -> # Any data already buffered in `d` is intentionally not forwarded; see Rust note - Shared.pipe_bidirectional(remote_conn, local_conn) + Shared.pipe_bidirectional(remote_conn, Shared.transport_mod(), local_conn, :gen_tcp) {:error, reason} -> :gen_tcp.close(remote_conn) diff --git a/lib/spore/limits.ex b/lib/spore/limits.ex new file mode 100644 index 0000000..e263208 --- /dev/null +++ b/lib/spore/limits.ex @@ -0,0 +1,40 @@ +defmodule Spore.Limits do + @moduledoc false + use GenServer + + def start_link(_), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + + @impl true + def init(state), do: {:ok, state} + + def can_open?(ip) do + GenServer.call(__MODULE__, {:can_open, ip}) + end + + def close(ip) do + GenServer.cast(__MODULE__, {:close, ip}) + end + + @impl true + def handle_call({:can_open, ip}, _from, state) do + max = Application.get_env(:spore, :max_conns_per_ip, :infinity) + {count, state2} = Map.get_and_update(state, ip, fn v -> {v || 0, (v || 0) + 1} end) + allow = case max do + :infinity -> true + n when is_integer(n) and n > 0 -> count < n + _ -> true + end + state3 = if allow, do: state2, else: state + {:reply, allow, state3} + end + + @impl true + def handle_cast({:close, ip}, state) do + state2 = update_in(state[ip], fn + nil -> nil + 1 -> nil + n when is_integer(n) and n>1 -> n-1 + end) + {:noreply, state2} + end +end diff --git a/lib/spore/metrics.ex b/lib/spore/metrics.ex new file mode 100644 index 0000000..046420f --- /dev/null +++ b/lib/spore/metrics.ex @@ -0,0 +1,92 @@ +defmodule Spore.Metrics do + @moduledoc false + use GenServer + require Logger + + @metrics_table :spore_metrics + @accept_ts_table :spore_accept_ts + + def start_link(_), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + + @impl true + def init(_) do + _ = :ets.new(@metrics_table, [:set, :named_table, :public]) + _ = :ets.new(@accept_ts_table, [:set, :named_table, :public]) + maybe_start_http() + {:ok, %{server: nil}} + end + + def inc(name, delta \\ 1) when is_atom(name) do + try do + :ets.update_counter(@metrics_table, name, {2, delta}, {name, 0}) + rescue + _ -> :ok + end + end + + def note_pending(id) do + :ets.insert(@accept_ts_table, {id, System.monotonic_time(:millisecond)}) + inc(:spore_connections_incoming_total, 1) + end + + def note_accept(id) do + now = System.monotonic_time(:millisecond) + case :ets.take(@accept_ts_table, id) do + [{^id, ts}] -> + inc(:spore_connections_accepted_total, 1) + inc(:spore_accept_latency_ms_sum, max(0, now - ts)) + inc(:spore_accept_latency_ms_count, 1) + _ -> :ok + end + end + + def track_bytes(n) when is_integer(n) and n > 0, do: inc(:spore_bytes_proxied_total, n) + + def stale(), do: inc(:spore_connections_stale_total, 1) + + defp maybe_start_http do + case Application.get_env(:spore, :metrics_port) do + nil -> :ok + port when is_integer(port) -> Task.start(fn -> run_http(port) end) + end + end + + defp run_http(port) do + case :gen_tcp.listen(port, [:binary, active: false, packet: :raw, reuseaddr: true]) do + {:ok, ls} -> + Logger.info("metrics listening on :#{port}") + accept_loop(ls) + {:error, err} -> Logger.error("metrics listen failed: #{inspect(err)}") + end + end + + defp accept_loop(ls) do + case :gen_tcp.accept(ls) do + {:ok, sock} -> + Task.start(fn -> serve(sock) end) + accept_loop(ls) + {:error, _} -> :ok + end + end + + defp serve(sock) do + _ = :gen_tcp.recv(sock, 0, 100) + body = render() + resp = [ + "HTTP/1.1 200 OK\r\n", + "Content-Type: text/plain; version=0.0.4\r\n", + "Content-Length: ", Integer.to_string(byte_size(body)), "\r\n", + "Connection: close\r\n\r\n", + body + ] + :gen_tcp.send(sock, resp) + :gen_tcp.close(sock) + end + + def render do + rows = :ets.tab2list(@metrics_table) + Enum.map_join(rows, "\n", fn {name, value} -> + ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value)] + end) <> "\n" + end +end diff --git a/lib/spore/pending_connection.ex b/lib/spore/pending_connection.ex index 40f9d5e..1bcd907 100644 --- a/lib/spore/pending_connection.ex +++ b/lib/spore/pending_connection.ex @@ -33,6 +33,7 @@ defmodule Spore.PendingConnection do if is_port(socket) do :gen_tcp.close(socket) Logger.warning("removed stale connection #{id}") + Spore.Metrics.stale() end {:stop, :normal, %{state | socket: nil}} diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 00a4024..73520cb 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -31,7 +31,11 @@ defmodule Spore.Server do auth = case Keyword.get(opts, :secret) do nil -> nil - secret -> Auth.new(secret) + secret -> + case Spore.Auth.new_many(secret) do + [one] -> one + many when is_list(many) -> {:many, many} + end end bind_addr = Keyword.get(opts, :bind_addr, {0, 0, 0, 0}) |> normalize_ip() @@ -55,23 +59,32 @@ defmodule Spore.Server do nodelay: true ] ++ Shared.socket_tune_opts() - with {:ok, listen_socket} <- :gen_tcp.listen(Shared.control_port(), control_opts) do + with {:ok, listen_socket} <- listen_socket(control_opts) do Logger.info("server listening on #{:inet.ntoa(bind_addr)}:#{Shared.control_port()}") accept_loop(listen_socket, min_port..max_port, auth, bind_tunnels) end end defp accept_loop(listen_socket, port_range, auth, bind_tunnels) do - {:ok, socket} = :gen_tcp.accept(listen_socket) - _ = Shared.tune_socket(socket) + {:ok, socket} = accept_conn(listen_socket) {:ok, {ip, _}} = :inet.peername(socket) Logger.info("incoming connection from #{:inet.ntoa(ip)}") - Task.start(fn -> handle_connection(socket, port_range, auth, bind_tunnels) end) + if Spore.ACL.allow?(ip) and Spore.Limits.can_open?(ip) do + Task.start(fn -> + try do + handle_connection(socket, port_range, auth, bind_tunnels) + after + Spore.Limits.close(ip) + end + end) + else + :gen_tcp.close(socket) + end accept_loop(listen_socket, port_range, auth, bind_tunnels) end defp handle_connection(socket, port_range, auth, bind_tunnels) do - d = Delimited.new(socket) + d = Delimited.new(socket, Shared.transport_mod()) d = case auth do @@ -88,6 +101,15 @@ defmodule Spore.Server do :gen_tcp.close(socket) exit(:normal) end + + {:many, list} -> + case Auth.server_handshake_many(list, d) do + {:ok, d2} -> d2 + {{:error, reason}, d2} -> + _ = Delimited.send(d2, %{"Error" => to_string(reason)}) + :gen_tcp.close(socket) + exit(:normal) + end end case Delimited.recv_timeout(d) do @@ -106,10 +128,11 @@ defmodule Spore.Server do {%{"Accept" => id}, d2} -> case Spore.Pending.take(id) do {:ok, stream2} -> + Spore.Metrics.note_accept(id) # Forward traffic bidirectionally between control socket and stored tunnel conn # buffer intentionally unused _ = d2 - Shared.pipe_bidirectional(socket, stream2) + Shared.pipe_bidirectional(socket, Shared.transport_mod(), stream2, :gen_tcp) :error -> Logger.warning("missing connection #{id}") @@ -132,12 +155,43 @@ defmodule Spore.Server do e -> Logger.warning("connection exited with error: #{inspect(e)}") end + defp listen_socket(control_opts) do + case Shared.transport_mod() do + :gen_tcp -> :gen_tcp.listen(Shared.control_port(), control_opts) + :ssl -> + ssl_opts = ssl_server_opts(control_opts) + apply(:ssl, :listen, [Shared.control_port(), ssl_opts]) + end + end + + defp accept_conn(listen_socket) do + case Shared.transport_mod() do + :gen_tcp -> :gen_tcp.accept(listen_socket) + :ssl -> + with {:ok, sock} <- apply(:ssl, :transport_accept, [listen_socket]) do + apply(:ssl, :ssl_accept, [sock]) + end + end + end + + defp ssl_server_opts(control_opts) do + certfile = Application.get_env(:spore, :certfile) + keyfile = Application.get_env(:spore, :keyfile) + base = [ + {:certfile, String.to_charlist(certfile || "")}, + {:keyfile, String.to_charlist(keyfile || "")}, + active: false + ] + base ++ control_opts + end + defp hello_loop(d, listener) do case :gen_tcp.accept(listener, 0) do {:ok, stream2} -> _ = Shared.tune_socket(stream2) id = Auth.generate_uuid_v4() Spore.Pending.insert(id, stream2, 10_000) + Spore.Metrics.note_pending(id) _ = Delimited.send(d, %{"Connection" => id}) hello_loop(send_heartbeat(d), listener) @@ -209,7 +263,7 @@ defmodule Spore.Server do # Accept connection branch: a new control connection will send {"Accept": id} def handle_accept_connection(socket, auth) do - d = Delimited.new(socket) + d = Delimited.new(socket, Shared.transport_mod()) d = case auth do diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 24234e3..9eaf792 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -25,13 +25,13 @@ defmodule Spore.Shared do defmodule Delimited do @moduledoc "Delimited JSON transport wrapping a passive TCP socket." - defstruct [:socket, buffer: <<>>] + defstruct [:socket, :io_mod, buffer: <<>>] - @type t :: %__MODULE__{socket: Spore.Shared.socket(), buffer: binary()} + @type t :: %__MODULE__{socket: Spore.Shared.socket(), io_mod: module(), buffer: binary()} - @doc "Wrap a passive, binary-mode TCP socket." - @spec new(Spore.Shared.socket()) :: t - def new(socket), do: %__MODULE__{socket: socket, buffer: <<>>} + @doc "Wrap a passive socket with IO module (:gen_tcp or :ssl)." + @spec new(Spore.Shared.socket(), module()) :: t + def new(socket, io_mod), do: %__MODULE__{socket: socket, io_mod: io_mod, buffer: <<>>} @doc "Receive next null-delimited JSON value. Returns {value, updated_transport}." @spec recv(t, timeout()) :: {any() | :eof | {:error, term()}, t} @@ -59,23 +59,23 @@ defmodule Spore.Shared do @doc "Send a JSON value followed by a null terminator. Returns updated transport." @spec send(t, any()) :: {:ok, t} | {:error, term()} - def send(%__MODULE__{socket: socket} = d, value) do + def send(%__MODULE__{socket: socket, io_mod: io_mod} = d, value) do with {:ok, json} <- Jason.encode(value), - :ok <- :gen_tcp.send(socket, [json, <<0>>]) do + :ok <- io_mod.send(socket, [json, <<0>>]) do {:ok, d} else {:error, _} = err -> err end end - defp read_frame(%__MODULE__{socket: socket, buffer: buf} = d, timeout) do + defp read_frame(%__MODULE__{socket: socket, io_mod: io_mod, buffer: buf} = d, timeout) do case :binary.match(buf, <<0>>) do {idx, 1} -> <> = buf {:ok, frame, %{d | buffer: rest}} :nomatch -> - case :gen_tcp.recv(socket, 0, timeout) do + case io_mod.recv(socket, 0, timeout) do {:ok, more} -> new = buf <> more @@ -101,24 +101,35 @@ defmodule Spore.Shared do @doc "Connect with timeout, returning a passive, binary-mode socket." @spec connect(String.t(), :inet.port_number(), timeout()) :: {:ok, socket()} | {:error, term()} def connect(host, port, timeout_ms) do - result = - :gen_tcp.connect( - String.to_charlist(host), - port, - [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], - timeout_ms - ) - - case result do - {:ok, socket} -> - _ = tune_socket(socket) - {:ok, socket} - - other -> - other + case transport_mod() do + :gen_tcp -> + result = + :gen_tcp.connect( + String.to_charlist(host), + port, + [:binary, active: false, packet: 0, nodelay: true, reuseaddr: true], + timeout_ms + ) + + case result do + {:ok, socket} -> + _ = tune_socket(socket) + {:ok, socket} + + other -> + other + end + + :ssl -> + ssl_opts = ssl_client_opts() + apply(:ssl, :connect, [String.to_charlist(host), port, ssl_opts, timeout_ms]) end end + def transport_mod do + if Application.get_env(:spore, :tls, false), do: :ssl, else: :gen_tcp + end + @doc "Return socket tuning options from application env." def socket_tune_opts do opts = [] @@ -148,11 +159,18 @@ defmodule Spore.Shared do end end + defp ssl_client_opts do + verify = if Application.get_env(:spore, :ssl_verify, false), do: :verify_peer, else: :verify_none + base = [active: false, verify: verify] + cacertfile = Application.get_env(:spore, :cacertfile) + if is_binary(cacertfile), do: [{:cacertfile, String.to_charlist(cacertfile)} | base], else: base + end + @doc "Bidirectionally pipe data between two sockets until either closes." @spec pipe_bidirectional(socket(), socket()) :: :ok def pipe_bidirectional(a, b) do - left = Task.async(fn -> pipe(a, b) end) - right = Task.async(fn -> pipe(b, a) end) + left = Task.async(fn -> pipe(a, :gen_tcp, b, :gen_tcp) end) + right = Task.async(fn -> pipe(b, :gen_tcp, a, :gen_tcp) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) @@ -168,14 +186,36 @@ defmodule Spore.Shared do :ok end - defp pipe(src, dst) do - case :gen_tcp.recv(src, 0) do + @doc "Bidirectionally pipe with explicit transport modules." + @spec pipe_bidirectional(socket(), module(), socket(), module()) :: :ok + def pipe_bidirectional(a, amod, b, bmod) do + left = Task.async(fn -> pipe(a, amod, b, bmod) end) + right = Task.async(fn -> pipe(b, bmod, a, amod) end) + ref_left = Process.monitor(left.pid) + ref_right = Process.monitor(right.pid) + receive do + {:DOWN, ^ref_left, :process, _pid, _} -> :ok + {:DOWN, ^ref_right, :process, _pid, _} -> :ok + end + Task.shutdown(left, :brutal_kill) + Task.shutdown(right, :brutal_kill) + close(a, amod) + close(b, bmod) + :ok + end + + defp pipe(src, src_mod, dst, dst_mod) do + case src_mod.recv(src, 0) do {:ok, data} -> - _ = :gen_tcp.send(dst, data) - pipe(src, dst) + _ = dst_mod.send(dst, data) + Spore.Metrics.track_bytes(byte_size(data)) + pipe(src, src_mod, dst, dst_mod) {:error, _} -> :ok end end + + defp close(socket, :gen_tcp), do: :gen_tcp.close(socket) + defp close(socket, :ssl), do: apply(:ssl, :close, [socket]) end diff --git a/mix.exs b/mix.exs index 0facc7c..5689734 100644 --- a/mix.exs +++ b/mix.exs @@ -14,7 +14,7 @@ defmodule Spore.MixProject do def application do [ - extra_applications: [:logger, :crypto], + extra_applications: [:logger, :crypto, :ssl], mod: {Spore.Application, []} ] end From fe381c060a499c4dba9e3bcfdb8a2a3b61d3897f Mon Sep 17 00:00:00 2001 From: Rastrian Date: Sun, 21 Sep 2025 16:50:49 -0300 Subject: [PATCH 2/2] fix format --- lib/spore/acl.ex | 21 +++++++++++++-------- lib/spore/auth.ex | 13 ++++++++++--- lib/spore/cli.ex | 18 ++++++++++++++---- lib/spore/limits.ex | 25 +++++++++++++++---------- lib/spore/metrics.ex | 20 ++++++++++++++++---- lib/spore/server.ex | 20 ++++++++++++++++---- lib/spore/shared.ex | 11 +++++++++-- 7 files changed, 93 insertions(+), 35 deletions(-) diff --git a/lib/spore/acl.ex b/lib/spore/acl.ex index bd45490..5d231fd 100644 --- a/lib/spore/acl.ex +++ b/lib/spore/acl.ex @@ -7,10 +7,11 @@ defmodule Spore.ACL do allow = Application.get_env(:spore, :allow, []) deny = Application.get_env(:spore, :deny, []) - allowed = case allow do - [] -> true - _ -> Enum.any?(allow, &match_ip?(ip, &1)) - end + allowed = + case allow do + [] -> true + _ -> Enum.any?(allow, &match_ip?(ip, &1)) + end denied = Enum.any?(deny, &match_ip?(ip, &1)) allowed and not denied @@ -31,6 +32,7 @@ defmodule Spore.ACL do {:ok, addr} -> {:ip, addr} _ -> nil end + [ip, masklen] -> with {:ok, addr} <- :inet.parse_address(String.to_charlist(ip)), {len, ""} <- Integer.parse(masklen) do @@ -42,12 +44,15 @@ defmodule Spore.ACL do end defp match_ip?(ip, {:ip, addr}), do: ip == addr - defp match_ip?({a,b,c,d}, {:cidr, {a2,b2,c2,d2}, len}) when is_integer(len) and len>=0 and len<=32 do + + defp match_ip?({a, b, c, d}, {:cidr, {a2, b2, c2, d2}, len}) + when is_integer(len) and len >= 0 and len <= 32 do mask = bnot((1 <<< (32 - len)) - 1) &&& 0xFFFFFFFF - ipi = (a<<<24) + (b<<<16) + (c<<<8) + d - base = (a2<<<24) + (b2<<<16) + (c2<<<8) + d2 + ipi = (a <<< 24) + (b <<< 16) + (c <<< 8) + d + base = (a2 <<< 24) + (b2 <<< 16) + (c2 <<< 8) + d2 (ipi &&& mask) == (base &&& mask) end - defp match_ip?({_,_,_,_,_,_,_,_}, {:cidr, _, _}), do: false + + defp match_ip?({_, _, _, _, _, _, _, _}, {:cidr, _, _}), do: false defp match_ip?(_, _), do: false end diff --git a/lib/spore/auth.ex b/lib/spore/auth.ex index 79dcc01..2dd849b 100644 --- a/lib/spore/auth.ex +++ b/lib/spore/auth.ex @@ -18,12 +18,16 @@ defmodule Spore.Auth do @doc "Create multiple authenticators from a comma-separated list." def new_many(secret_or_list) do cond do - is_list(secret_or_list) -> Enum.map(secret_or_list, &new/1) + is_list(secret_or_list) -> + Enum.map(secret_or_list, &new/1) + is_binary(secret_or_list) -> secret_or_list |> String.split([",", " ", "\n"], trim: true) |> Enum.map(&new/1) - true -> [] + + true -> + [] end end @@ -67,11 +71,14 @@ defmodule Spore.Auth do def server_handshake_many(auths, d) when is_list(auths) and auths != [] do challenge = generate_uuid_v4() {:ok, _} = Spore.Shared.Delimited.send(d, %{"Challenge" => challenge}) + case Spore.Shared.Delimited.recv_timeout(d) do {%{"Authenticate" => tag}, d2} -> ok = Enum.any?(auths, fn a -> validate(a, challenge, tag) end) if ok, do: {:ok, d2}, else: {{:error, :invalid_secret}, d2} - {_, d2} -> {{:error, :missing_authentication}, d2} + + {_, d2} -> + {{:error, :missing_authentication}, d2} end end diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index 2c2944d..f45a468 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -40,7 +40,10 @@ defmodule Spore.CLI do if control_port, do: Application.put_env(:spore, :control_port, control_port) if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) - if cacert = Keyword.get(opts, :cacertfile), do: Application.put_env(:spore, :cacertfile, cacert) + + if cacert = Keyword.get(opts, :cacertfile), + do: Application.put_env(:spore, :cacertfile, cacert) + if Keyword.get(opts, :insecure), do: Application.put_env(:spore, :ssl_verify, false) if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) @@ -89,9 +92,16 @@ defmodule Spore.CLI do if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) if cert = Keyword.get(opts, :certfile), do: Application.put_env(:spore, :certfile, cert) if key = Keyword.get(opts, :keyfile), do: Application.put_env(:spore, :keyfile, key) - if allow = Keyword.get(opts, :allow), do: Application.put_env(:spore, :allow, Spore.ACL.parse_list(allow)) - if deny = Keyword.get(opts, :deny), do: Application.put_env(:spore, :deny, Spore.ACL.parse_list(deny)) - if m = Keyword.get(opts, :max_conns_per_ip), do: Application.put_env(:spore, :max_conns_per_ip, m) + + if allow = Keyword.get(opts, :allow), + do: Application.put_env(:spore, :allow, Spore.ACL.parse_list(allow)) + + if deny = Keyword.get(opts, :deny), + do: Application.put_env(:spore, :deny, Spore.ACL.parse_list(deny)) + + if m = Keyword.get(opts, :max_conns_per_ip), + do: Application.put_env(:spore, :max_conns_per_ip, m) + if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) diff --git a/lib/spore/limits.ex b/lib/spore/limits.ex index e263208..eceff93 100644 --- a/lib/spore/limits.ex +++ b/lib/spore/limits.ex @@ -19,22 +19,27 @@ defmodule Spore.Limits do def handle_call({:can_open, ip}, _from, state) do max = Application.get_env(:spore, :max_conns_per_ip, :infinity) {count, state2} = Map.get_and_update(state, ip, fn v -> {v || 0, (v || 0) + 1} end) - allow = case max do - :infinity -> true - n when is_integer(n) and n > 0 -> count < n - _ -> true - end + + allow = + case max do + :infinity -> true + n when is_integer(n) and n > 0 -> count < n + _ -> true + end + state3 = if allow, do: state2, else: state {:reply, allow, state3} end @impl true def handle_cast({:close, ip}, state) do - state2 = update_in(state[ip], fn - nil -> nil - 1 -> nil - n when is_integer(n) and n>1 -> n-1 - end) + state2 = + update_in(state[ip], fn + nil -> nil + 1 -> nil + n when is_integer(n) and n > 1 -> n - 1 + end) + {:noreply, state2} end end diff --git a/lib/spore/metrics.ex b/lib/spore/metrics.ex index 046420f..cc90866 100644 --- a/lib/spore/metrics.ex +++ b/lib/spore/metrics.ex @@ -31,12 +31,15 @@ defmodule Spore.Metrics do def note_accept(id) do now = System.monotonic_time(:millisecond) + case :ets.take(@accept_ts_table, id) do [{^id, ts}] -> inc(:spore_connections_accepted_total, 1) inc(:spore_accept_latency_ms_sum, max(0, now - ts)) inc(:spore_accept_latency_ms_count, 1) - _ -> :ok + + _ -> + :ok end end @@ -56,7 +59,9 @@ defmodule Spore.Metrics do {:ok, ls} -> Logger.info("metrics listening on :#{port}") accept_loop(ls) - {:error, err} -> Logger.error("metrics listen failed: #{inspect(err)}") + + {:error, err} -> + Logger.error("metrics listen failed: #{inspect(err)}") end end @@ -65,26 +70,33 @@ defmodule Spore.Metrics do {:ok, sock} -> Task.start(fn -> serve(sock) end) accept_loop(ls) - {:error, _} -> :ok + + {:error, _} -> + :ok end end defp serve(sock) do _ = :gen_tcp.recv(sock, 0, 100) body = render() + resp = [ "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", - "Content-Length: ", Integer.to_string(byte_size(body)), "\r\n", + "Content-Length: ", + Integer.to_string(byte_size(body)), + "\r\n", "Connection: close\r\n\r\n", body ] + :gen_tcp.send(sock, resp) :gen_tcp.close(sock) end def render do rows = :ets.tab2list(@metrics_table) + Enum.map_join(rows, "\n", fn {name, value} -> ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value)] end) <> "\n" diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 73520cb..06e6d2e 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -30,7 +30,9 @@ defmodule Spore.Server do auth = case Keyword.get(opts, :secret) do - nil -> nil + nil -> + nil + secret -> case Spore.Auth.new_many(secret) do [one] -> one @@ -69,6 +71,7 @@ defmodule Spore.Server do {:ok, socket} = accept_conn(listen_socket) {:ok, {ip, _}} = :inet.peername(socket) Logger.info("incoming connection from #{:inet.ntoa(ip)}") + if Spore.ACL.allow?(ip) and Spore.Limits.can_open?(ip) do Task.start(fn -> try do @@ -80,6 +83,7 @@ defmodule Spore.Server do else :gen_tcp.close(socket) end + accept_loop(listen_socket, port_range, auth, bind_tunnels) end @@ -104,7 +108,9 @@ defmodule Spore.Server do {:many, list} -> case Auth.server_handshake_many(list, d) do - {:ok, d2} -> d2 + {:ok, d2} -> + d2 + {{:error, reason}, d2} -> _ = Delimited.send(d2, %{"Error" => to_string(reason)}) :gen_tcp.close(socket) @@ -157,7 +163,9 @@ defmodule Spore.Server do defp listen_socket(control_opts) do case Shared.transport_mod() do - :gen_tcp -> :gen_tcp.listen(Shared.control_port(), control_opts) + :gen_tcp -> + :gen_tcp.listen(Shared.control_port(), control_opts) + :ssl -> ssl_opts = ssl_server_opts(control_opts) apply(:ssl, :listen, [Shared.control_port(), ssl_opts]) @@ -166,7 +174,9 @@ defmodule Spore.Server do defp accept_conn(listen_socket) do case Shared.transport_mod() do - :gen_tcp -> :gen_tcp.accept(listen_socket) + :gen_tcp -> + :gen_tcp.accept(listen_socket) + :ssl -> with {:ok, sock} <- apply(:ssl, :transport_accept, [listen_socket]) do apply(:ssl, :ssl_accept, [sock]) @@ -177,11 +187,13 @@ defmodule Spore.Server do defp ssl_server_opts(control_opts) do certfile = Application.get_env(:spore, :certfile) keyfile = Application.get_env(:spore, :keyfile) + base = [ {:certfile, String.to_charlist(certfile || "")}, {:keyfile, String.to_charlist(keyfile || "")}, active: false ] + base ++ control_opts end diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 9eaf792..3b99b2a 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -160,10 +160,15 @@ defmodule Spore.Shared do end defp ssl_client_opts do - verify = if Application.get_env(:spore, :ssl_verify, false), do: :verify_peer, else: :verify_none + verify = + if Application.get_env(:spore, :ssl_verify, false), do: :verify_peer, else: :verify_none + base = [active: false, verify: verify] cacertfile = Application.get_env(:spore, :cacertfile) - if is_binary(cacertfile), do: [{:cacertfile, String.to_charlist(cacertfile)} | base], else: base + + if is_binary(cacertfile), + do: [{:cacertfile, String.to_charlist(cacertfile)} | base], + else: base end @doc "Bidirectionally pipe data between two sockets until either closes." @@ -193,10 +198,12 @@ defmodule Spore.Shared do right = Task.async(fn -> pipe(b, bmod, a, amod) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) + receive do {:DOWN, ^ref_left, :process, _pid, _} -> :ok {:DOWN, ^ref_right, :process, _pid, _} -> :ok end + Task.shutdown(left, :brutal_kill) Task.shutdown(right, :brutal_kill) close(a, amod)