From a528f48fad0b1b3b93d2f7fd5b81abde5a1a5d59 Mon Sep 17 00:00:00 2001 From: Rastrian Date: Mon, 22 Sep 2025 11:28:02 -0300 Subject: [PATCH 1/5] feat(admin,security): add /reload and /state; live config reload; banlist auth backoff; global max-pending cap; metrics drop counter; CLI --config wiring --- README.md | 4 +++ config_example.json | 12 +++++++ lib/spore/application.ex | 1 + lib/spore/banlist.ex | 39 +++++++++++++++++++++++ lib/spore/cli.ex | 31 +++++++++++++++++-- lib/spore/client.ex | 51 ++++++++++++++++++++++++++++-- lib/spore/config.ex | 41 ++++++++++++++++++++++++ lib/spore/metrics.ex | 67 +++++++++++++++++++++++++++++++++++++--- lib/spore/pending.ex | 12 +++++++ lib/spore/server.ex | 47 +++++++++++++++++++++++----- lib/spore/shared.ex | 22 ++++++++++--- 11 files changed, 308 insertions(+), 19 deletions(-) create mode 100644 config_example.json create mode 100644 lib/spore/banlist.ex create mode 100644 lib/spore/config.ex diff --git a/README.md b/README.md index 7c969f2..25c1cd7 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,10 @@ mix escript.build ``` This produces an executable named `spore` in the project directory. +### Optional config file (JSON) +You can pass a JSON config with `--config FILE.json` on both server and client. Supported keys: +`control_port`, `tls`, `cacertfile`, `client_certfile`, `client_keyfile`, `certfile`, `keyfile`, `allow`, `deny`, `max_conns_per_ip`, `metrics_port`, `sndbuf`, `recbuf`. + ## Quickstart ### Server (choose a public range) ```bash diff --git a/config_example.json b/config_example.json new file mode 100644 index 0000000..c4fbd03 --- /dev/null +++ b/config_example.json @@ -0,0 +1,12 @@ +{ + "control_port": 9000, + "tls": true, + "certfile": "cert.pem", + "keyfile": "key.pem", + "allow": "10.0.0.0/8,192.168.0.0/16", + "deny": "0.0.0.0/0", + "max_conns_per_ip": 50, + "metrics_port": 9568, + "sndbuf": 1048576, + "recbuf": 1048576 +} \ No newline at end of file diff --git a/lib/spore/application.ex b/lib/spore/application.ex index ecb161f..2ca1607 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -9,6 +9,7 @@ defmodule Spore.Application do {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, {Spore.Pending, []}, {Spore.Limits, []}, + {Spore.Banlist, []}, {Spore.Metrics, []} ] diff --git a/lib/spore/banlist.ex b/lib/spore/banlist.ex new file mode 100644 index 0000000..908da0a --- /dev/null +++ b/lib/spore/banlist.ex @@ -0,0 +1,39 @@ +defmodule Spore.Banlist do + @moduledoc false + use GenServer + + def start_link(_), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + + @impl true + def init(state), do: {:ok, state} + + def allow?(ip), do: GenServer.call(__MODULE__, {:allow?, ip}) + def note_failure(ip), do: GenServer.cast(__MODULE__, {:failure, ip}) + + @impl true + def handle_call({:allow?, ip}, _from, state) do + now = System.monotonic_time(:millisecond) + + case Map.get(state, ip) do + {:banned, until_ms} when now < until_ms -> {:reply, false, state} + {:banned, _} -> {:reply, true, Map.delete(state, ip)} + _ -> {:reply, true, state} + end + end + + @impl true + def handle_cast({:failure, ip}, state) do + {count, state2} = Map.get_and_update(state, {:count, ip}, fn v -> {v || 0, (v || 0) + 1} end) + threshold = Application.get_env(:spore, :auth_fail_threshold, 5) + ban_ms = Application.get_env(:spore, :auth_ban_ms, 60_000) + + state3 = + if count + 1 >= threshold do + Map.put(state2, ip, {:banned, System.monotonic_time(:millisecond) + ban_ms}) + else + state2 + end + + {:noreply, state3} + end +end diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index f45a468..bdfef0e 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -21,10 +21,13 @@ defmodule Spore.CLI do to: :string, port: :integer, secret: :string, + config: :string, control_port: :integer, tls: :boolean, cacertfile: :string, insecure: :boolean, + certfile: :string, + keyfile: :string, sndbuf: :integer, recbuf: :integer ], @@ -38,6 +41,13 @@ defmodule Spore.CLI do secret = Keyword.get(opts, :secret, nil) control_port = Keyword.get(opts, :control_port, nil) + if cfg = Keyword.get(opts, :config), + do: + ( + Application.put_env(:spore, :config_path, cfg) + load_config(cfg) + ) + if control_port, do: Application.put_env(:spore, :control_port, control_port) if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) @@ -45,6 +55,11 @@ defmodule Spore.CLI do do: Application.put_env(:spore, :cacertfile, cacert) if Keyword.get(opts, :insecure), do: Application.put_env(:spore, :ssl_verify, false) + + if cert = Keyword.get(opts, :certfile), + do: Application.put_env(:spore, :client_certfile, cert) + + if key = Keyword.get(opts, :keyfile), do: Application.put_env(:spore, :client_keyfile, key) if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) @@ -69,6 +84,7 @@ defmodule Spore.CLI do secret: :string, bind_addr: :string, bind_tunnels: :string, + config: :string, control_port: :integer, tls: :boolean, certfile: :string, @@ -76,6 +92,7 @@ defmodule Spore.CLI do allow: :string, deny: :string, max_conns_per_ip: :integer, + metrics_port: :integer, sndbuf: :integer, recbuf: :integer ] @@ -88,6 +105,13 @@ defmodule Spore.CLI do bind_tunnels = Keyword.get(opts, :bind_tunnels, nil) control_port = Keyword.get(opts, :control_port, nil) + if cfg = Keyword.get(opts, :config), + do: + ( + Application.put_env(:spore, :config_path, cfg) + load_config(cfg) + ) + if control_port, do: Application.put_env(:spore, :control_port, control_port) if Keyword.get(opts, :tls), do: Application.put_env(:spore, :tls, true) if cert = Keyword.get(opts, :certfile), do: Application.put_env(:spore, :certfile, cert) @@ -104,6 +128,7 @@ defmodule Spore.CLI do if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) + if mp = Keyword.get(opts, :metrics_port), do: Application.put_env(:spore, :metrics_port, mp) case Spore.Server.listen( min_port: min_port, @@ -120,8 +145,10 @@ defmodule Spore.CLI do defp usage(io) do IO.puts(io, """ Usage: - spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--control-port N] [--tls] [--cacertfile PATH] [--insecure] [--sndbuf N] [--recbuf N] - spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--control-port N] [--tls] [--certfile PATH] [--keyfile PATH] [--allow CIDRs] [--deny CIDRs] [--max-conns-per-ip N] [--sndbuf N] [--recbuf N] + spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--config FILE.json] [--control-port N] [--tls] [--cacertfile PATH] [--certfile PATH] [--keyfile PATH] [--insecure] [--sndbuf N] [--recbuf N] + spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--config FILE.json] [--control-port N] [--tls] [--certfile PATH] [--keyfile PATH] [--allow CIDRs] [--deny CIDRs] [--max-conns-per-ip N] [--sndbuf N] [--recbuf N] """) end + + defp load_config(path), do: Spore.Config.load_file(path) end diff --git a/lib/spore/client.ex b/lib/spore/client.ex index 6fb335a..dec2ea0 100644 --- a/lib/spore/client.ex +++ b/lib/spore/client.ex @@ -43,9 +43,36 @@ defmodule Spore.Client do end end - {:ok, d} = Delimited.send(d, %{"Hello" => port}) + # Try extended hello first, fall back to legacy Hello + features = [] + + features = + if Application.get_env(:spore, :tls, false), do: ["tls" | features], else: features + + {:ok, d} = + Delimited.send(d, %{ + "HelloEx" => %{ + "port" => port, + "version" => "spore/1", + "features" => Enum.reverse(features) + } + }) case Delimited.recv_timeout(d) do + {%{"HelloEx" => %{"port" => remote_port}}, d_after} -> + Logger.info("connected to server (HelloEx)") + Logger.info("listening at #{to}:#{remote_port}") + + {:ok, + %__MODULE__{ + to: to, + local_host: local_host, + local_port: local_port, + remote_port: remote_port, + auth: auth, + conn: d_after + }} + {%{"Hello" => remote_port}, d_after} -> Logger.info("connected to server") Logger.info("listening at #{to}:#{remote_port}") @@ -73,7 +100,27 @@ defmodule Spore.Client do {:error, reason} _ -> - {:error, :unexpected_initial_message} + # fallback: send legacy Hello once + {:ok, d2} = Delimited.send(d, %{"Hello" => port}) + + case Delimited.recv_timeout(d2) do + {%{"Hello" => remote_port}, d_after2} -> + Logger.info("connected to server (legacy)") + Logger.info("listening at #{to}:#{remote_port}") + + {:ok, + %__MODULE__{ + to: to, + local_host: local_host, + local_port: local_port, + remote_port: remote_port, + auth: auth, + conn: d_after2 + }} + + other -> + {:error, {:unexpected_initial_message, other}} + end end end catch diff --git a/lib/spore/config.ex b/lib/spore/config.ex new file mode 100644 index 0000000..49c537d --- /dev/null +++ b/lib/spore/config.ex @@ -0,0 +1,41 @@ +defmodule Spore.Config do + @moduledoc false + + def apply_map(map) when is_map(map) do + put(:control_port, map["control_port"]) + put(:tls, truthy(map["tls"])) + put(:cacertfile, map["cacertfile"]) + put(:client_certfile, map["client_certfile"]) + put(:client_keyfile, map["client_keyfile"]) + put(:certfile, map["certfile"]) + put(:keyfile, map["keyfile"]) + put(:allow, map["allow"] && Spore.ACL.parse_list(map["allow"])) + put(:deny, map["deny"] && Spore.ACL.parse_list(map["deny"])) + put(:max_conns_per_ip, map["max_conns_per_ip"]) + put(:max_pending, map["max_pending"]) + put(:metrics_port, map["metrics_port"]) + put(:sndbuf, map["sndbuf"]) + put(:recbuf, map["recbuf"]) + end + + def reload_from_env do + case Application.get_env(:spore, :config_path) do + nil -> {:error, :no_config} + path -> load_file(path) + end + end + + def load_file(path) do + with {:ok, content} <- File.read(path), + {:ok, map} <- Jason.decode(content) do + apply_map(map) + {:ok, :reloaded} + else + err -> err + end + end + + defp truthy(v), do: v in [true, 1, "1", "true", "TRUE"] + defp put(_k, nil), do: :ok + defp put(k, v), do: Application.put_env(:spore, k, v) +end diff --git a/lib/spore/metrics.ex b/lib/spore/metrics.ex index cc90866..6565946 100644 --- a/lib/spore/metrics.ex +++ b/lib/spore/metrics.ex @@ -37,12 +37,23 @@ defmodule Spore.Metrics do inc(:spore_connections_accepted_total, 1) inc(:spore_accept_latency_ms_sum, max(0, now - ts)) inc(:spore_accept_latency_ms_count, 1) + bucket(now - ts) _ -> :ok end end + defp bucket(ms) do + for le <- [5, 10, 25, 50, 100, 250, 500, 1000, 2000, 5000] do + if ms <= le do + inc(String.to_atom("spore_accept_latency_ms_bucket_le_" <> Integer.to_string(le)), 1) + end + end + + inc(:spore_accept_latency_ms_bucket_le_inf, 1) + end + def track_bytes(n) when is_integer(n) and n > 0, do: inc(:spore_bytes_proxied_total, n) def stale(), do: inc(:spore_connections_stale_total, 1) @@ -77,12 +88,49 @@ defmodule Spore.Metrics do end defp serve(sock) do - _ = :gen_tcp.recv(sock, 0, 100) - body = render() + with {:ok, req} <- :gen_tcp.recv(sock, 0, 200) do + first = req |> to_string() |> String.split("\r\n", parts: 2) |> hd() + + route = + cond do + String.starts_with?(first, "GET /metrics") -> + :metrics + + String.starts_with?(first, "POST /reload") or String.starts_with?(first, "GET /reload") -> + :reload + + String.starts_with?(first, "GET /state") -> + :state + + true -> + :metrics + end + + case route do + :metrics -> + reply_text(sock, render(), "text/plain; version=0.0.4") + + :reload -> + case Spore.Config.reload_from_env() do + {:ok, _} -> reply_text(sock, "ok\n", "text/plain") + {:error, _} -> reply_text(sock, "no-config\n", "text/plain") + end + + :state -> + body = state_render() + reply_text(sock, body, "application/json") + end + end + + :gen_tcp.close(sock) + end + defp reply_text(sock, body, ctype) do resp = [ "HTTP/1.1 200 OK\r\n", - "Content-Type: text/plain; version=0.0.4\r\n", + "Content-Type: ", + ctype, + "\r\n", "Content-Length: ", Integer.to_string(byte_size(body)), "\r\n", @@ -91,7 +139,6 @@ defmodule Spore.Metrics do ] :gen_tcp.send(sock, resp) - :gen_tcp.close(sock) end def render do @@ -101,4 +148,16 @@ defmodule Spore.Metrics do ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value)] end) <> "\n" end + + defp state_render do + Jason.encode!(%{ + control_port: Application.get_env(:spore, :control_port), + tls: Application.get_env(:spore, :tls), + allow: Application.get_env(:spore, :allow), + deny: Application.get_env(:spore, :deny), + max_conns_per_ip: Application.get_env(:spore, :max_conns_per_ip), + max_pending: Application.get_env(:spore, :max_pending), + metrics_port: Application.get_env(:spore, :metrics_port) + }) + end end diff --git a/lib/spore/pending.ex b/lib/spore/pending.ex index a479619..ab02714 100644 --- a/lib/spore/pending.ex +++ b/lib/spore/pending.ex @@ -14,6 +14,18 @@ defmodule Spore.Pending do end def insert(id, socket, ttl_ms \\ 10_000) do + maxp = Application.get_env(:spore, :max_pending, :infinity) + + if maxp != :infinity do + %{active: active} = DynamicSupervisor.count_children(Spore.Pending.Supervisor) + + if active >= maxp do + :gen_tcp.close(socket) + Spore.Metrics.inc(:spore_connections_pending_dropped_total, 1) + throw({:error, :too_many_pending}) + end + end + child_spec = %{ id: {PendingConnection, id}, start: {PendingConnection, :start_link, [[id: id, socket: socket, ttl_ms: ttl_ms]]}, diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 06e6d2e..8633919 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -72,7 +72,7 @@ defmodule Spore.Server do {:ok, {ip, _}} = :inet.peername(socket) Logger.info("incoming connection from #{:inet.ntoa(ip)}") - if Spore.ACL.allow?(ip) and Spore.Limits.can_open?(ip) do + if Spore.ACL.allow?(ip) and Spore.Banlist.allow?(ip) and Spore.Limits.can_open?(ip) do Task.start(fn -> try do handle_connection(socket, port_range, auth, bind_tunnels) @@ -102,6 +102,7 @@ defmodule Spore.Server do {{:error, reason}, d2} -> _ = Delimited.send(d2, %{"Error" => to_string(reason)}) + if {:ok, {ip, _}} = :inet.peername(socket), do: Spore.Banlist.note_failure(ip) :gen_tcp.close(socket) exit(:normal) end @@ -113,12 +114,30 @@ defmodule Spore.Server do {{:error, reason}, d2} -> _ = Delimited.send(d2, %{"Error" => to_string(reason)}) + if {:ok, {ip, _}} = :inet.peername(socket), do: Spore.Banlist.note_failure(ip) :gen_tcp.close(socket) exit(:normal) end end case Delimited.recv_timeout(d) do + {%{"HelloEx" => %{"port" => req_port}}, d2} -> + case create_listener(req_port, port_range, bind_tunnels) do + {:ok, listener} -> + {:ok, {_ip, actual}} = :inet.sockname(listener) + Logger.info("new client on port #{actual}") + + {:ok, d3} = + Delimited.send(d2, %{ + "HelloEx" => %{"port" => actual, "version" => "spore/1", "features" => []} + }) + + hello_loop(d3, listener) + + {:error, message} -> + _ = Delimited.send(d2, %{"Error" => message}) + end + {%{"Hello" => req_port}, d2} -> case create_listener(req_port, port_range, bind_tunnels) do {:ok, listener} -> @@ -187,12 +206,26 @@ defmodule Spore.Server do defp ssl_server_opts(control_opts) do certfile = Application.get_env(:spore, :certfile) keyfile = Application.get_env(:spore, :keyfile) - - base = [ - {:certfile, String.to_charlist(certfile || "")}, - {:keyfile, String.to_charlist(keyfile || "")}, - active: false - ] + cacertfile = Application.get_env(:spore, :cacertfile) + + base = [active: false] + + base = + if is_binary(certfile) and is_binary(keyfile), + do: [ + {:certfile, String.to_charlist(certfile)}, + {:keyfile, String.to_charlist(keyfile)} | base + ], + else: base + + base = + if is_binary(cacertfile), + do: [ + {:cacertfile, String.to_charlist(cacertfile)}, + {:verify, :verify_peer}, + {:fail_if_no_peer_cert, true} | base + ], + else: base base ++ control_opts end diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 3b99b2a..9789274 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -161,14 +161,28 @@ defmodule Spore.Shared do defp ssl_client_opts do verify = - if Application.get_env(:spore, :ssl_verify, false), do: :verify_peer, else: :verify_none + if Application.get_env(:spore, :ssl_verify, true), do: :verify_peer, else: :verify_none base = [active: false, verify: verify] cacertfile = Application.get_env(:spore, :cacertfile) - if is_binary(cacertfile), - do: [{:cacertfile, String.to_charlist(cacertfile)} | base], - else: base + base = + if is_binary(cacertfile), + do: [{:cacertfile, String.to_charlist(cacertfile)} | base], + else: base + + certfile = Application.get_env(:spore, :client_certfile) + keyfile = Application.get_env(:spore, :client_keyfile) + + base = + if is_binary(certfile) and is_binary(keyfile), + do: [ + {:certfile, String.to_charlist(certfile)}, + {:keyfile, String.to_charlist(keyfile)} | base + ], + else: base + + base end @doc "Bidirectionally pipe data between two sockets until either closes." From ef0f8b42f9a5e736fcc2271becdd7b907722d0c5 Mon Sep 17 00:00:00 2001 From: Rastrian Date: Mon, 22 Sep 2025 11:43:52 -0300 Subject: [PATCH 2/5] feat(otel): add optional OTLP exporter, tracing hooks, and CLI flags (--otel-enable, --otel-endpoint); start exporter on boot --- lib/spore/application.ex | 2 + lib/spore/auth.ex | 12 +++-- lib/spore/cli.ex | 16 ++++-- lib/spore/config.ex | 2 + lib/spore/limits.ex | 9 ++++ lib/spore/metrics.ex | 15 ++++-- lib/spore/secret_quota.ex | 46 +++++++++++++++++ lib/spore/server.ex | 15 +++++- lib/spore/shared.ex | 105 +++++++++++++++++++++++++++----------- lib/spore/tracing.ex | 53 +++++++++++++++++++ mix.exs | 5 +- mix.lock | 11 ++++ 12 files changed, 247 insertions(+), 44 deletions(-) create mode 100644 lib/spore/secret_quota.ex create mode 100644 lib/spore/tracing.ex diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 2ca1607..6296cb6 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -4,12 +4,14 @@ defmodule Spore.Application do @impl true def start(_type, _args) do + _ = Spore.Tracing.start() children = [ {Registry, keys: :unique, name: Spore.Pending.Registry}, {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, {Spore.Pending, []}, {Spore.Limits, []}, {Spore.Banlist, []}, + {Spore.SecretQuota, []}, {Spore.Metrics, []} ] diff --git a/lib/spore/auth.ex b/lib/spore/auth.ex index 2dd849b..6eb054b 100644 --- a/lib/spore/auth.ex +++ b/lib/spore/auth.ex @@ -8,11 +8,13 @@ defmodule Spore.Auth do import Bitwise - @type t :: %{key: binary()} + @type t :: %{key: binary(), id: String.t()} @spec new(String.t()) :: t def new(secret) do - %{key: :crypto.hash(:sha256, secret)} + hash = :crypto.hash(:sha256, secret) + id = Base.encode16(hash, case: :lower) + %{key: hash, id: id} end @doc "Create multiple authenticators from a comma-separated list." @@ -74,8 +76,10 @@ defmodule Spore.Auth do case Spore.Shared.Delimited.recv_timeout(d) do {%{"Authenticate" => tag}, d2} -> - ok = Enum.any?(auths, fn a -> validate(a, challenge, tag) end) - if ok, do: {:ok, d2}, else: {{:error, :invalid_secret}, d2} + case Enum.find(auths, fn a -> validate(a, challenge, tag) end) do + %{id: id} -> {:ok, d2, id} + _ -> {{:error, :invalid_secret}, d2} + end {_, d2} -> {{:error, :missing_authentication}, d2} diff --git a/lib/spore/cli.ex b/lib/spore/cli.ex index bdfef0e..b286724 100644 --- a/lib/spore/cli.ex +++ b/lib/spore/cli.ex @@ -29,7 +29,9 @@ defmodule Spore.CLI do certfile: :string, keyfile: :string, sndbuf: :integer, - recbuf: :integer + recbuf: :integer, + otel_enable: :boolean, + otel_endpoint: :string ], aliases: [p: :port] ) @@ -62,6 +64,8 @@ defmodule Spore.CLI do if key = Keyword.get(opts, :keyfile), do: Application.put_env(:spore, :client_keyfile, key) if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) + if Keyword.get(opts, :otel_enable), do: Application.put_env(:spore, :otel_enable, true) + if ep = Keyword.get(opts, :otel_endpoint), do: Application.put_env(:spore, :otel_endpoint, ep) case Spore.Client.new(local_host, local_port, to, port, secret) do {:ok, client} -> @@ -94,7 +98,9 @@ defmodule Spore.CLI do max_conns_per_ip: :integer, metrics_port: :integer, sndbuf: :integer, - recbuf: :integer + recbuf: :integer, + otel_enable: :boolean, + otel_endpoint: :string ] ) @@ -129,6 +135,8 @@ defmodule Spore.CLI do if sndbuf = Keyword.get(opts, :sndbuf), do: Application.put_env(:spore, :sndbuf, sndbuf) if recbuf = Keyword.get(opts, :recbuf), do: Application.put_env(:spore, :recbuf, recbuf) if mp = Keyword.get(opts, :metrics_port), do: Application.put_env(:spore, :metrics_port, mp) + if Keyword.get(opts, :otel_enable), do: Application.put_env(:spore, :otel_enable, true) + if ep = Keyword.get(opts, :otel_endpoint), do: Application.put_env(:spore, :otel_endpoint, ep) case Spore.Server.listen( min_port: min_port, @@ -145,8 +153,8 @@ defmodule Spore.CLI do defp usage(io) do IO.puts(io, """ Usage: - spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--config FILE.json] [--control-port N] [--tls] [--cacertfile PATH] [--certfile PATH] [--keyfile PATH] [--insecure] [--sndbuf N] [--recbuf N] - spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--config FILE.json] [--control-port N] [--tls] [--certfile PATH] [--keyfile PATH] [--allow CIDRs] [--deny CIDRs] [--max-conns-per-ip N] [--sndbuf N] [--recbuf N] + spore local --local-port --to [--local-host HOST] [--port PORT] [--secret SECRET] [--config FILE.json] [--control-port N] [--tls] [--cacertfile PATH] [--certfile PATH] [--keyfile PATH] [--insecure] [--sndbuf N] [--recbuf N] [--otel-enable] [--otel-endpoint URL] + spore server [--min-port N] [--max-port N] [--secret SECRET] [--bind-addr IP] [--bind-tunnels IP] [--config FILE.json] [--control-port N] [--tls] [--certfile PATH] [--keyfile PATH] [--allow CIDRs] [--deny CIDRs] [--max-conns-per-ip N] [--sndbuf N] [--recbuf N] [--metrics-port N] [--otel-enable] [--otel-endpoint URL] """) end diff --git a/lib/spore/config.ex b/lib/spore/config.ex index 49c537d..ea877f0 100644 --- a/lib/spore/config.ex +++ b/lib/spore/config.ex @@ -14,8 +14,10 @@ defmodule Spore.Config do put(:max_conns_per_ip, map["max_conns_per_ip"]) put(:max_pending, map["max_pending"]) put(:metrics_port, map["metrics_port"]) + put(:secret_quotas, map["secret_quotas"]) # map of secret_id -> max active put(:sndbuf, map["sndbuf"]) put(:recbuf, map["recbuf"]) + Spore.SecretQuota.reload_limits() end def reload_from_env do diff --git a/lib/spore/limits.ex b/lib/spore/limits.ex index eceff93..4732cd4 100644 --- a/lib/spore/limits.ex +++ b/lib/spore/limits.ex @@ -15,6 +15,10 @@ defmodule Spore.Limits do GenServer.cast(__MODULE__, {:close, ip}) end + def snapshot do + GenServer.call(__MODULE__, :snapshot) + end + @impl true def handle_call({:can_open, ip}, _from, state) do max = Application.get_env(:spore, :max_conns_per_ip, :infinity) @@ -31,6 +35,11 @@ defmodule Spore.Limits do {:reply, allow, state3} end + @impl true + def handle_call(:snapshot, _from, state) do + {:reply, state, state} + end + @impl true def handle_cast({:close, ip}, state) do state2 = diff --git a/lib/spore/metrics.ex b/lib/spore/metrics.ex index 6565946..e5bac63 100644 --- a/lib/spore/metrics.ex +++ b/lib/spore/metrics.ex @@ -143,10 +143,17 @@ defmodule Spore.Metrics do def render do rows = :ets.tab2list(@metrics_table) - - Enum.map_join(rows, "\n", fn {name, value} -> - ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value)] - end) <> "\n" + per_ip = Spore.Limits.snapshot() + pending = DynamicSupervisor.count_children(Spore.Pending.Supervisor).active + + base = Enum.map(rows, fn {name, value} -> + ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value), "\n"] + end) + ip_lines = Enum.map(per_ip, fn {ip, count} -> + ["spore_conns_by_ip{ip=\"", :inet.ntoa(ip) |> to_string(), "\"} ", Integer.to_string(count), "\n"] + end) + pending_line = ["spore_pending_active ", Integer.to_string(pending), "\n"] + IO.iodata_to_binary(base ++ ip_lines ++ [pending_line]) end defp state_render do diff --git a/lib/spore/secret_quota.ex b/lib/spore/secret_quota.ex new file mode 100644 index 0000000..78e3030 --- /dev/null +++ b/lib/spore/secret_quota.ex @@ -0,0 +1,46 @@ +defmodule Spore.SecretQuota do + @moduledoc false + use GenServer + + def start_link(_), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + + @impl true + def init(_) do + {:ok, %{counts: %{}, limits: load_limits()}} + end + + def allow?(id), do: GenServer.call(__MODULE__, {:allow?, id}) + def dec(id), do: GenServer.cast(__MODULE__, {:dec, id}) + def reload_limits(), do: GenServer.cast(__MODULE__, :reload) + + defp load_limits do + case Application.get_env(:spore, :secret_quotas) do + m when is_map(m) -> m + _ -> %{} + end + end + + @impl true + def handle_call({:allow?, id}, _from, %{counts: c, limits: l} = state) do + curr = Map.get(c, id, 0) + max = Map.get(l, id, :infinity) + allow = max == :infinity or curr < max + c2 = if allow, do: Map.put(c, id, curr + 1), else: c + {:reply, allow, %{state | counts: c2}} + end + + @impl true + def handle_cast({:dec, id}, %{counts: c} = state) do + c2 = case Map.get(c, id) do + nil -> c + 1 -> Map.delete(c, id) + n when is_integer(n) and n>1 -> Map.put(c, id, n-1) + end + {:noreply, %{state | counts: c2}} + end + + @impl true + def handle_cast(:reload, state) do + {:noreply, %{state | limits: load_limits()}} + end +end diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 8633919..55096d1 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -88,6 +88,7 @@ defmodule Spore.Server do end defp handle_connection(socket, port_range, auth, bind_tunnels) do + {:ok, {ip, _}} = :inet.peername(socket) d = Delimited.new(socket, Shared.transport_mod()) d = @@ -109,7 +110,13 @@ defmodule Spore.Server do {:many, list} -> case Auth.server_handshake_many(list, d) do - {:ok, d2} -> + {:ok, d2, auth_id} -> + if not Spore.SecretQuota.allow?(auth_id) do + _ = Delimited.send(d2, %{"Error" => "quota exceeded"}) + :gen_tcp.close(socket) + exit(:normal) + end + Process.put({:spore_auth_id}, auth_id) d2 {{:error, reason}, d2} -> @@ -120,6 +127,7 @@ defmodule Spore.Server do end end + Spore.Tracing.with_span("spore.control_connection", %{"peer.ip" => :inet.ntoa(ip) |> to_string()}, fn -> case Delimited.recv_timeout(d) do {%{"HelloEx" => %{"port" => req_port}}, d2} -> case create_listener(req_port, port_range, bind_tunnels) do @@ -176,8 +184,11 @@ defmodule Spore.Server do {_, _d2} -> :ok end + end) rescue - e -> Logger.warning("connection exited with error: #{inspect(e)}") + e -> + if auth_id = Process.get({:spore_auth_id}), do: Spore.SecretQuota.dec(auth_id) + Logger.warning("connection exited with error: #{inspect(e)}") end defp listen_socket(control_opts) do diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 9789274..003f935 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -188,50 +188,83 @@ defmodule Spore.Shared do @doc "Bidirectionally pipe data between two sockets until either closes." @spec pipe_bidirectional(socket(), socket()) :: :ok def pipe_bidirectional(a, b) do - left = Task.async(fn -> pipe(a, :gen_tcp, b, :gen_tcp) end) - right = Task.async(fn -> pipe(b, :gen_tcp, a, :gen_tcp) end) - ref_left = Process.monitor(left.pid) - ref_right = Process.monitor(right.pid) - - receive do - {:DOWN, ^ref_left, :process, _pid, _} -> :ok - {:DOWN, ^ref_right, :process, _pid, _} -> :ok + if Application.get_env(:spore, :active_once, false) do + left = Task.async(fn -> pipe_active_once(a, :gen_tcp, b, :gen_tcp) end) + right = Task.async(fn -> pipe_active_once(b, :gen_tcp, a, :gen_tcp) end) + ref_left = Process.monitor(left.pid) + ref_right = Process.monitor(right.pid) + receive do + {:DOWN, ^ref_left, :process, _pid, _} -> :ok + {:DOWN, ^ref_right, :process, _pid, _} -> :ok + end + Task.shutdown(left, :brutal_kill) + Task.shutdown(right, :brutal_kill) + :gen_tcp.close(a) + :gen_tcp.close(b) + :ok + else + left = Task.async(fn -> pipe(a, :gen_tcp, b, :gen_tcp) end) + right = Task.async(fn -> pipe(b, :gen_tcp, a, :gen_tcp) end) + ref_left = Process.monitor(left.pid) + ref_right = Process.monitor(right.pid) + receive do + {:DOWN, ^ref_left, :process, _pid, _} -> :ok + {:DOWN, ^ref_right, :process, _pid, _} -> :ok + end + Task.shutdown(left, :brutal_kill) + Task.shutdown(right, :brutal_kill) + :gen_tcp.close(a) + :gen_tcp.close(b) + :ok end - - Task.shutdown(left, :brutal_kill) - Task.shutdown(right, :brutal_kill) - :gen_tcp.close(a) - :gen_tcp.close(b) - :ok end @doc "Bidirectionally pipe with explicit transport modules." @spec pipe_bidirectional(socket(), module(), socket(), module()) :: :ok def pipe_bidirectional(a, amod, b, bmod) do - left = Task.async(fn -> pipe(a, amod, b, bmod) end) - right = Task.async(fn -> pipe(b, bmod, a, amod) end) - ref_left = Process.monitor(left.pid) - ref_right = Process.monitor(right.pid) - - receive do - {:DOWN, ^ref_left, :process, _pid, _} -> :ok - {:DOWN, ^ref_right, :process, _pid, _} -> :ok + if Application.get_env(:spore, :active_once, false) do + left = Task.async(fn -> pipe_active_once(a, amod, b, bmod) end) + right = Task.async(fn -> pipe_active_once(b, bmod, a, amod) end) + ref_left = Process.monitor(left.pid) + ref_right = Process.monitor(right.pid) + receive do + {:DOWN, ^ref_left, :process, _pid, _} -> :ok + {:DOWN, ^ref_right, :process, _pid, _} -> :ok + end + Task.shutdown(left, :brutal_kill) + Task.shutdown(right, :brutal_kill) + close(a, amod) + close(b, bmod) + :ok + else + left = Task.async(fn -> pipe(a, amod, b, bmod) end) + right = Task.async(fn -> pipe(b, bmod, a, amod) end) + ref_left = Process.monitor(left.pid) + ref_right = Process.monitor(right.pid) + receive do + {:DOWN, ^ref_left, :process, _pid, _} -> :ok + {:DOWN, ^ref_right, :process, _pid, _} -> :ok + end + Task.shutdown(left, :brutal_kill) + Task.shutdown(right, :brutal_kill) + close(a, amod) + close(b, bmod) + :ok end - - Task.shutdown(left, :brutal_kill) - Task.shutdown(right, :brutal_kill) - close(a, amod) - close(b, bmod) - :ok end defp pipe(src, src_mod, dst, dst_mod) do - case src_mod.recv(src, 0) do + idle = Application.get_env(:spore, :idle_timeout_ms, 300_000) + case src_mod.recv(src, 0, idle) do {:ok, data} -> _ = dst_mod.send(dst, data) Spore.Metrics.track_bytes(byte_size(data)) + Spore.Tracing.add_event("spore.bytes", %{bytes: byte_size(data)}) pipe(src, src_mod, dst, dst_mod) + {:error, :timeout} -> + :ok + {:error, _} -> :ok end @@ -239,4 +272,18 @@ defmodule Spore.Shared do defp close(socket, :gen_tcp), do: :gen_tcp.close(socket) defp close(socket, :ssl), do: apply(:ssl, :close, [socket]) + + defp pipe_active_once(src, src_mod, dst, dst_mod) do + _ = apply(src_mod, :setopts, [src, [active: :once]]) + receive do + {tag, ^src, data} when tag in [:tcp, :ssl] -> + _ = dst_mod.send(dst, data) + Spore.Metrics.track_bytes(byte_size(data)) + pipe_active_once(src, src_mod, dst, dst_mod) + {closed, ^src} when closed in [:tcp_closed, :ssl_closed] -> :ok + {err, ^src, _reason} when err in [:tcp_error, :ssl_error] -> :ok + after + Application.get_env(:spore, :idle_timeout_ms, 300_000) -> :ok + end + end end diff --git a/lib/spore/tracing.ex b/lib/spore/tracing.ex new file mode 100644 index 0000000..ca6ccd9 --- /dev/null +++ b/lib/spore/tracing.ex @@ -0,0 +1,53 @@ +defmodule Spore.Tracing do + @moduledoc false + + def start do + if Application.get_env(:spore, :otel_enable, false) and loaded_exporter?() do + endpoint = Application.get_env(:spore, :otel_endpoint, "http://localhost:4318") + headers = Application.get_env(:spore, :otel_headers, %{}) + exporter_opts = %{protocol: :http_protobuf, endpoint: endpoint, headers: headers} + _ = :application.ensure_all_started(:opentelemetry) + _ = :application.ensure_all_started(:opentelemetry_exporter) + _ = apply(:opentelemetry_exporter, :setup, [[exporter: {:otlp, exporter_opts}]]) + :ok + else + :ok + end + end + + def with_span(name, attrs \\ %{}, fun) when is_function(fun, 0) do + if loaded?() do + OpenTelemetry.Tracer.with_span name, fn -> + set_attrs(attrs) + fun.() + end + else + fun.() + end + end + + def add_event(name, attrs \\ %{}) do + if loaded?() do + OpenTelemetry.Tracer.add_event(name, attrs) + else + :ok + end + end + + def set_attrs(attrs) when is_map(attrs) do + if loaded?() do + OpenTelemetry.Tracer.set_attributes(attrs) + else + :ok + end + end + + defp loaded? do + Code.ensure_loaded?(OpenTelemetry.Tracer) and function_exported?(OpenTelemetry.Tracer, :with_span, 2) + end + + defp loaded_exporter? do + Code.ensure_loaded?(:opentelemetry_exporter) and + function_exported?(:opentelemetry_exporter, :setup, 1) + end +end diff --git a/mix.exs b/mix.exs index 5689734..708e2ca 100644 --- a/mix.exs +++ b/mix.exs @@ -22,7 +22,10 @@ defmodule Spore.MixProject do defp deps do [ {:jason, "~> 1.4"}, - {:uuid, "~> 1.1"} + {:uuid, "~> 1.1"}, + {:opentelemetry_api, "~> 1.3"}, + {:opentelemetry, "~> 1.4"}, + {:opentelemetry_exporter, "~> 1.7"} ] end end diff --git a/mix.lock b/mix.lock index 4e8e926..cb7efd3 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,15 @@ %{ + "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, + "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, + "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, + "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, + "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, + "hpack": {:hex, :hpack_erl, "0.3.0", "2461899cc4ab6a0ef8e970c1661c5fc6a52d3c25580bc6dd204f84ce94669926", [:rebar3], [], "hexpm", "d6137d7079169d8c485c6962dfe261af5b9ef60fbc557344511c1e65e3d95fb0"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "opentelemetry": {:hex, :opentelemetry, "1.6.0", "0954dbe12f490ee7b126c9e924cf60141b1238a02dfc700907eadde4dcc20460", [:rebar3], [{:opentelemetry_api, "~> 1.4.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "5fd0123d65d2649f10e478e7444927cd9fbdffcaeb8c1c2fcae3d486d18c5e62"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.4.1", "e071429a37441a0fe9097eeea0ff921ebadce8eba8e1ce297b05a43c7a0d121f", [:mix, :rebar3], [], "hexpm", "39bdb6ad740bc13b16215cb9f233d66796bbae897f3bf6eb77abb712e87c3c26"}, + "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.9.0", "e344bf5e3dab2815fe381b0cac172c06cfc29ecf792c5d74cbbd2b3184af359c", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.6.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.4.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "2030a59e33afff6aaeba847d865c8db5dc3873db87a9257df2ca03cafd9e0478"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "tls_certificate_check": {:hex, :tls_certificate_check, "1.29.0", "4473005eb0bbdad215d7083a230e2e076f538d9ea472c8009fd22006a4cfc5f6", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "5b0d0e5cb0f928bc4f210df667304ed91c5bff2a391ce6bdedfbfe70a8f096c5"}, "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, } From 779eb2f22ca63a8a91c6fe52bf8bea2daafc796e Mon Sep 17 00:00:00 2001 From: Rastrian Date: Mon, 22 Sep 2025 11:44:40 -0300 Subject: [PATCH 3/5] fix(ci): fix format --- lib/spore/application.ex | 1 + lib/spore/config.ex | 3 +- lib/spore/metrics.ex | 30 ++++++++-- lib/spore/secret_quota.ex | 12 ++-- lib/spore/server.ex | 113 ++++++++++++++++++++------------------ lib/spore/shared.ex | 18 +++++- lib/spore/tracing.ex | 7 ++- 7 files changed, 113 insertions(+), 71 deletions(-) diff --git a/lib/spore/application.ex b/lib/spore/application.ex index 6296cb6..88e4ae1 100644 --- a/lib/spore/application.ex +++ b/lib/spore/application.ex @@ -5,6 +5,7 @@ defmodule Spore.Application do @impl true def start(_type, _args) do _ = Spore.Tracing.start() + children = [ {Registry, keys: :unique, name: Spore.Pending.Registry}, {DynamicSupervisor, name: Spore.Pending.Supervisor, strategy: :one_for_one}, diff --git a/lib/spore/config.ex b/lib/spore/config.ex index ea877f0..e9e1c83 100644 --- a/lib/spore/config.ex +++ b/lib/spore/config.ex @@ -14,7 +14,8 @@ defmodule Spore.Config do put(:max_conns_per_ip, map["max_conns_per_ip"]) put(:max_pending, map["max_pending"]) put(:metrics_port, map["metrics_port"]) - put(:secret_quotas, map["secret_quotas"]) # map of secret_id -> max active + # map of secret_id -> max active + put(:secret_quotas, map["secret_quotas"]) put(:sndbuf, map["sndbuf"]) put(:recbuf, map["recbuf"]) Spore.SecretQuota.reload_limits() diff --git a/lib/spore/metrics.ex b/lib/spore/metrics.ex index e5bac63..23d3bcf 100644 --- a/lib/spore/metrics.ex +++ b/lib/spore/metrics.ex @@ -146,12 +146,30 @@ defmodule Spore.Metrics do per_ip = Spore.Limits.snapshot() pending = DynamicSupervisor.count_children(Spore.Pending.Supervisor).active - base = Enum.map(rows, fn {name, value} -> - ["# TYPE ", Atom.to_string(name), " counter\n", Atom.to_string(name), " ", to_string(value), "\n"] - end) - ip_lines = Enum.map(per_ip, fn {ip, count} -> - ["spore_conns_by_ip{ip=\"", :inet.ntoa(ip) |> to_string(), "\"} ", Integer.to_string(count), "\n"] - end) + base = + Enum.map(rows, fn {name, value} -> + [ + "# TYPE ", + Atom.to_string(name), + " counter\n", + Atom.to_string(name), + " ", + to_string(value), + "\n" + ] + end) + + ip_lines = + Enum.map(per_ip, fn {ip, count} -> + [ + "spore_conns_by_ip{ip=\"", + :inet.ntoa(ip) |> to_string(), + "\"} ", + Integer.to_string(count), + "\n" + ] + end) + pending_line = ["spore_pending_active ", Integer.to_string(pending), "\n"] IO.iodata_to_binary(base ++ ip_lines ++ [pending_line]) end diff --git a/lib/spore/secret_quota.ex b/lib/spore/secret_quota.ex index 78e3030..89e3c4e 100644 --- a/lib/spore/secret_quota.ex +++ b/lib/spore/secret_quota.ex @@ -31,11 +31,13 @@ defmodule Spore.SecretQuota do @impl true def handle_cast({:dec, id}, %{counts: c} = state) do - c2 = case Map.get(c, id) do - nil -> c - 1 -> Map.delete(c, id) - n when is_integer(n) and n>1 -> Map.put(c, id, n-1) - end + c2 = + case Map.get(c, id) do + nil -> c + 1 -> Map.delete(c, id) + n when is_integer(n) and n > 1 -> Map.put(c, id, n - 1) + end + {:noreply, %{state | counts: c2}} end diff --git a/lib/spore/server.ex b/lib/spore/server.ex index 55096d1..8eb7288 100644 --- a/lib/spore/server.ex +++ b/lib/spore/server.ex @@ -116,6 +116,7 @@ defmodule Spore.Server do :gen_tcp.close(socket) exit(:normal) end + Process.put({:spore_auth_id}, auth_id) d2 @@ -127,64 +128,68 @@ defmodule Spore.Server do end end - Spore.Tracing.with_span("spore.control_connection", %{"peer.ip" => :inet.ntoa(ip) |> to_string()}, fn -> - case Delimited.recv_timeout(d) do - {%{"HelloEx" => %{"port" => req_port}}, d2} -> - case create_listener(req_port, port_range, bind_tunnels) do - {:ok, listener} -> - {:ok, {_ip, actual}} = :inet.sockname(listener) - Logger.info("new client on port #{actual}") - - {:ok, d3} = - Delimited.send(d2, %{ - "HelloEx" => %{"port" => actual, "version" => "spore/1", "features" => []} - }) - - hello_loop(d3, listener) - - {:error, message} -> - _ = Delimited.send(d2, %{"Error" => message}) - end - - {%{"Hello" => req_port}, d2} -> - case create_listener(req_port, port_range, bind_tunnels) do - {:ok, listener} -> - {:ok, {_ip, actual}} = :inet.sockname(listener) - Logger.info("new client on port #{actual}") - {:ok, d3} = Delimited.send(d2, %{"Hello" => actual}) - hello_loop(d3, listener) + Spore.Tracing.with_span( + "spore.control_connection", + %{"peer.ip" => :inet.ntoa(ip) |> to_string()}, + fn -> + case Delimited.recv_timeout(d) do + {%{"HelloEx" => %{"port" => req_port}}, d2} -> + case create_listener(req_port, port_range, bind_tunnels) do + {:ok, listener} -> + {:ok, {_ip, actual}} = :inet.sockname(listener) + Logger.info("new client on port #{actual}") + + {:ok, d3} = + Delimited.send(d2, %{ + "HelloEx" => %{"port" => actual, "version" => "spore/1", "features" => []} + }) + + hello_loop(d3, listener) + + {:error, message} -> + _ = Delimited.send(d2, %{"Error" => message}) + end + + {%{"Hello" => req_port}, d2} -> + case create_listener(req_port, port_range, bind_tunnels) do + {:ok, listener} -> + {:ok, {_ip, actual}} = :inet.sockname(listener) + Logger.info("new client on port #{actual}") + {:ok, d3} = Delimited.send(d2, %{"Hello" => actual}) + hello_loop(d3, listener) + + {:error, message} -> + _ = Delimited.send(d2, %{"Error" => message}) + end + + {%{"Accept" => id}, d2} -> + case Spore.Pending.take(id) do + {:ok, stream2} -> + Spore.Metrics.note_accept(id) + # Forward traffic bidirectionally between control socket and stored tunnel conn + # buffer intentionally unused + _ = d2 + Shared.pipe_bidirectional(socket, Shared.transport_mod(), stream2, :gen_tcp) + + :error -> + Logger.warning("missing connection #{id}") + end + + {%{"Authenticate" => _}, _d2} -> + Logger.warning("unexpected authenticate") + :ok - {:error, message} -> - _ = Delimited.send(d2, %{"Error" => message}) - end + {:eof, _} -> + :ok - {%{"Accept" => id}, d2} -> - case Spore.Pending.take(id) do - {:ok, stream2} -> - Spore.Metrics.note_accept(id) - # Forward traffic bidirectionally between control socket and stored tunnel conn - # buffer intentionally unused - _ = d2 - Shared.pipe_bidirectional(socket, Shared.transport_mod(), stream2, :gen_tcp) + {{:error, _}, _} -> + :ok - :error -> - Logger.warning("missing connection #{id}") + {_, _d2} -> + :ok end - - {%{"Authenticate" => _}, _d2} -> - Logger.warning("unexpected authenticate") - :ok - - {:eof, _} -> - :ok - - {{:error, _}, _} -> - :ok - - {_, _d2} -> - :ok - end - end) + end + ) rescue e -> if auth_id = Process.get({:spore_auth_id}), do: Spore.SecretQuota.dec(auth_id) diff --git a/lib/spore/shared.ex b/lib/spore/shared.ex index 003f935..4f67919 100644 --- a/lib/spore/shared.ex +++ b/lib/spore/shared.ex @@ -193,10 +193,12 @@ defmodule Spore.Shared do right = Task.async(fn -> pipe_active_once(b, :gen_tcp, a, :gen_tcp) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) + receive do {:DOWN, ^ref_left, :process, _pid, _} -> :ok {:DOWN, ^ref_right, :process, _pid, _} -> :ok end + Task.shutdown(left, :brutal_kill) Task.shutdown(right, :brutal_kill) :gen_tcp.close(a) @@ -207,10 +209,12 @@ defmodule Spore.Shared do right = Task.async(fn -> pipe(b, :gen_tcp, a, :gen_tcp) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) + receive do {:DOWN, ^ref_left, :process, _pid, _} -> :ok {:DOWN, ^ref_right, :process, _pid, _} -> :ok end + Task.shutdown(left, :brutal_kill) Task.shutdown(right, :brutal_kill) :gen_tcp.close(a) @@ -227,10 +231,12 @@ defmodule Spore.Shared do right = Task.async(fn -> pipe_active_once(b, bmod, a, amod) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) + receive do {:DOWN, ^ref_left, :process, _pid, _} -> :ok {:DOWN, ^ref_right, :process, _pid, _} -> :ok end + Task.shutdown(left, :brutal_kill) Task.shutdown(right, :brutal_kill) close(a, amod) @@ -241,10 +247,12 @@ defmodule Spore.Shared do right = Task.async(fn -> pipe(b, bmod, a, amod) end) ref_left = Process.monitor(left.pid) ref_right = Process.monitor(right.pid) + receive do {:DOWN, ^ref_left, :process, _pid, _} -> :ok {:DOWN, ^ref_right, :process, _pid, _} -> :ok end + Task.shutdown(left, :brutal_kill) Task.shutdown(right, :brutal_kill) close(a, amod) @@ -255,6 +263,7 @@ defmodule Spore.Shared do defp pipe(src, src_mod, dst, dst_mod) do idle = Application.get_env(:spore, :idle_timeout_ms, 300_000) + case src_mod.recv(src, 0, idle) do {:ok, data} -> _ = dst_mod.send(dst, data) @@ -275,13 +284,18 @@ defmodule Spore.Shared do defp pipe_active_once(src, src_mod, dst, dst_mod) do _ = apply(src_mod, :setopts, [src, [active: :once]]) + receive do {tag, ^src, data} when tag in [:tcp, :ssl] -> _ = dst_mod.send(dst, data) Spore.Metrics.track_bytes(byte_size(data)) pipe_active_once(src, src_mod, dst, dst_mod) - {closed, ^src} when closed in [:tcp_closed, :ssl_closed] -> :ok - {err, ^src, _reason} when err in [:tcp_error, :ssl_error] -> :ok + + {closed, ^src} when closed in [:tcp_closed, :ssl_closed] -> + :ok + + {err, ^src, _reason} when err in [:tcp_error, :ssl_error] -> + :ok after Application.get_env(:spore, :idle_timeout_ms, 300_000) -> :ok end diff --git a/lib/spore/tracing.ex b/lib/spore/tracing.ex index ca6ccd9..04da943 100644 --- a/lib/spore/tracing.ex +++ b/lib/spore/tracing.ex @@ -17,10 +17,10 @@ defmodule Spore.Tracing do def with_span(name, attrs \\ %{}, fun) when is_function(fun, 0) do if loaded?() do - OpenTelemetry.Tracer.with_span name, fn -> + OpenTelemetry.Tracer.with_span(name, fn -> set_attrs(attrs) fun.() - end + end) else fun.() end @@ -43,7 +43,8 @@ defmodule Spore.Tracing do end defp loaded? do - Code.ensure_loaded?(OpenTelemetry.Tracer) and function_exported?(OpenTelemetry.Tracer, :with_span, 2) + Code.ensure_loaded?(OpenTelemetry.Tracer) and + function_exported?(OpenTelemetry.Tracer, :with_span, 2) end defp loaded_exporter? do From 33f1a4fd0d06b873479c1b1722fb66a4044962f8 Mon Sep 17 00:00:00 2001 From: Rastrian Date: Mon, 22 Sep 2025 11:50:44 -0300 Subject: [PATCH 4/5] fix(ci): fix tests and added release matrix workflow --- .github/workflows/release-matrix.yml | 80 ++++++++++++++++++++++++++++ lib/spore/tracing.ex | 6 +-- 2 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/release-matrix.yml diff --git a/.github/workflows/release-matrix.yml b/.github/workflows/release-matrix.yml new file mode 100644 index 0000000..e470650 --- /dev/null +++ b/.github/workflows/release-matrix.yml @@ -0,0 +1,80 @@ +name: Spore Release Matrix + +on: + workflow_dispatch: + inputs: + name: + description: 'Release name (optional)' + required: false + type: string + +jobs: + build-release: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Determine working directory + id: workdir + run: | + if [ -d bore-erlang ]; then + echo "WORKDIR=bore-erlang" >> $GITHUB_ENV + else + echo "WORKDIR=." >> $GITHUB_ENV + fi + echo "Using WORKDIR=$WORKDIR" + shell: bash + + - name: Setup Elixir/OTP + uses: erlef/setup-beam@v1 + with: + otp-version: '26' + elixir-version: '1.16' + + - name: Install deps + working-directory: ${{ env.WORKDIR }} + run: | + mix local.hex --force + mix deps.get + + - name: Compile (prod) + working-directory: ${{ env.WORKDIR }} + run: MIX_ENV=prod mix compile + + - name: Build release + working-directory: ${{ env.WORKDIR }} + run: MIX_ENV=prod mix release + + - name: Archive release (Unix) + if: runner.os != 'Windows' + working-directory: ${{ env.WORKDIR }} + run: | + cd _build/prod/rel + tar czf spore-${{ runner.os }}.tar.gz spore + + - name: Archive release (Windows) + if: runner.os == 'Windows' + working-directory: ${{ env.WORKDIR }} + run: | + cd _build/prod/rel + powershell -Command "Compress-Archive -Path spore -DestinationPath spore-Windows.zip" + + - name: Upload artifact (Unix) + if: runner.os != 'Windows' + uses: actions/upload-artifact@v4 + with: + name: spore-${{ runner.os }}-release + path: ${{ env.WORKDIR }}/_build/prod/rel/spore-${{ runner.os }}.tar.gz + if-no-files-found: error + + - name: Upload artifact (Windows) + if: runner.os == 'Windows' + uses: actions/upload-artifact@v4 + with: + name: spore-${{ runner.os }}-release + path: ${{ env.WORKDIR }}/_build/prod/rel/spore-Windows.zip + if-no-files-found: error diff --git a/lib/spore/tracing.ex b/lib/spore/tracing.ex index 04da943..60b1da4 100644 --- a/lib/spore/tracing.ex +++ b/lib/spore/tracing.ex @@ -1,5 +1,6 @@ defmodule Spore.Tracing do @moduledoc false + require OpenTelemetry.Tracer def start do if Application.get_env(:spore, :otel_enable, false) and loaded_exporter?() do @@ -17,10 +18,9 @@ defmodule Spore.Tracing do def with_span(name, attrs \\ %{}, fun) when is_function(fun, 0) do if loaded?() do - OpenTelemetry.Tracer.with_span(name, fn -> - set_attrs(attrs) + OpenTelemetry.Tracer.with_span name, attrs do fun.() - end) + end else fun.() end From 565f733885b0209ef2680ecc2f3275b2eb08ddb2 Mon Sep 17 00:00:00 2001 From: Rastrian Date: Mon, 22 Sep 2025 11:54:30 -0300 Subject: [PATCH 5/5] feat(ci): added more workflows for testing --- .github/workflows/e2e-interop.yml | 80 +++++++++++++++++++++++++++++++ .github/workflows/e2e.yml | 73 ++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 .github/workflows/e2e-interop.yml create mode 100644 .github/workflows/e2e.yml diff --git a/.github/workflows/e2e-interop.yml b/.github/workflows/e2e-interop.yml new file mode 100644 index 0000000..705b77e --- /dev/null +++ b/.github/workflows/e2e-interop.yml @@ -0,0 +1,80 @@ +name: Spore E2E Interop + +on: + workflow_dispatch: {} + +jobs: + interop-linux: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Determine working directory + id: workdir + run: | + if [ -d bore-erlang ]; then + echo "WORKDIR=bore-erlang" >> $GITHUB_ENV + else + echo "WORKDIR=." >> $GITHUB_ENV + fi + echo "Using WORKDIR=$WORKDIR" + + - name: Setup Elixir/OTP + uses: erlef/setup-beam@v1 + with: + otp-version: '26' + elixir-version: '1.16' + + - name: Build Spore escript + working-directory: ${{ env.WORKDIR }} + run: | + mix deps.get + mix escript.build + + - name: Setup Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Install bore-cli + run: | + cargo install bore-cli + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + + - name: Start local HTTP server (backend) + working-directory: ${{ env.WORKDIR }} + run: | + nohup python3 -m http.server 25565 >/tmp/http.log 2>&1 & + sleep 1 + + - name: Start Spore server + working-directory: ${{ env.WORKDIR }} + run: | + nohup ./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 >/tmp/server.log 2>&1 & + sleep 1 + + - name: Start bore (Rust) client + run: | + nohup bore local 25565 --to 127.0.0.1 --port 0 >/tmp/bore-client.log 2>&1 & + sleep 2 + echo "Bore client log:" && tail -n +1 /tmp/bore-client.log + + - name: Discover assigned remote port + id: discover + run: | + PORT=$(grep -oE 'listening at 127.0.0.1:[0-9]+' /tmp/bore-client.log | tail -n1 | sed 's/.*://') + if [ -z "$PORT" ]; then echo "Could not find assigned port"; exit 1; fi + echo "REMOTE_PORT=$PORT" >> $GITHUB_ENV + + - name: Curl through the tunnel + run: | + echo "Curling 127.0.0.1:${REMOTE_PORT}/" + curl -sSf -v 127.0.0.1:${REMOTE_PORT}/ >/tmp/curl.out + test -s /tmp/curl.out + + - name: Show logs on failure + if: failure() + run: | + echo "=== server.log ==="; cat /tmp/server.log || true + echo "=== bore-client.log ==="; cat /tmp/bore-client.log || true + echo "=== http.log ==="; cat /tmp/http.log || true + echo "=== curl.out ==="; cat /tmp/curl.out || true diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml new file mode 100644 index 0000000..d7d2b82 --- /dev/null +++ b/.github/workflows/e2e.yml @@ -0,0 +1,73 @@ +name: Spore E2E + +on: + workflow_dispatch: {} + +jobs: + e2e-linux: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Determine working directory + id: workdir + run: | + if [ -d bore-erlang ]; then + echo "WORKDIR=bore-erlang" >> $GITHUB_ENV + else + echo "WORKDIR=." >> $GITHUB_ENV + fi + echo "Using WORKDIR=$WORKDIR" + + - name: Setup Elixir/OTP + uses: erlef/setup-beam@v1 + with: + otp-version: '26' + elixir-version: '1.16' + + - name: Install deps and build escript + working-directory: ${{ env.WORKDIR }} + run: | + mix deps.get + mix escript.build + + - name: Start local HTTP server (backend) + working-directory: ${{ env.WORKDIR }} + run: | + nohup python3 -m http.server 25565 >/tmp/http.log 2>&1 & + sleep 1 + + - name: Start Spore server + working-directory: ${{ env.WORKDIR }} + run: | + nohup ./spore server --min-port 20000 --max-port 21000 --bind-addr 127.0.0.1 --metrics-port 9568 >/tmp/server.log 2>&1 & + sleep 1 + + - name: Start Spore client + working-directory: ${{ env.WORKDIR }} + run: | + nohup ./spore local --local-host 127.0.0.1 --local-port 25565 --to 127.0.0.1 --port 0 >/tmp/client.log 2>&1 & + sleep 2 + echo "Client log:" && tail -n +1 /tmp/client.log + + - name: Discover assigned remote port + id: discover + run: | + PORT=$(grep -oE 'listening at 127.0.0.1:[0-9]+' /tmp/client.log | tail -n1 | sed 's/.*://') + if [ -z "$PORT" ]; then echo "Could not find assigned port"; exit 1; fi + echo "REMOTE_PORT=$PORT" >> $GITHUB_ENV + + - name: Curl through the tunnel + run: | + echo "Curling 127.0.0.1:${REMOTE_PORT}/" + curl -sSf -v 127.0.0.1:${REMOTE_PORT}/ >/tmp/curl.out + test -s /tmp/curl.out + + - name: Show logs on failure + if: failure() + run: | + echo "=== server.log ==="; cat /tmp/server.log || true + echo "=== client.log ==="; cat /tmp/client.log || true + echo "=== http.log ==="; cat /tmp/http.log || true + echo "=== curl.out ==="; cat /tmp/curl.out || true