From c5b4cafa5648bcdb0bb3ea2aeeb71570bda68f5d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:10:02 -0400 Subject: [PATCH 01/15] gate: add sourceos_gate package marker --- src/sourceos_gate/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 src/sourceos_gate/__init__.py diff --git a/src/sourceos_gate/__init__.py b/src/sourceos_gate/__init__.py new file mode 100644 index 0000000..41de35d --- /dev/null +++ b/src/sourceos_gate/__init__.py @@ -0,0 +1 @@ +"""SourceOS Gate package.""" From e584ca888098280905e99ed945d46a155d79dd6d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:11:41 -0400 Subject: [PATCH 02/15] gate: add error types --- src/sourceos_gate/errors.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/sourceos_gate/errors.py diff --git a/src/sourceos_gate/errors.py b/src/sourceos_gate/errors.py new file mode 100644 index 0000000..c0c0633 --- /dev/null +++ b/src/sourceos_gate/errors.py @@ -0,0 +1,31 @@ +"""Error types for SourceOS gate components.""" + +from __future__ import annotations + + +class GateError(Exception): + """Base error for gate operations.""" + + +class ReplayError(GateError): + """Raised when a token+nonce pair has already been seen.""" + + +class ExpiredGrantError(GateError): + """Raised when a grant has expired.""" + + +class BaselineMissingError(GateError): + """Raised when nft baseline objects are missing.""" + + +class PermissionError(GateError): + """Raised when operation requires elevated privileges.""" + + +class NftError(GateError): + """Raised when nft operations fail.""" + + +class ValidationError(GateError): + """Raised when request payload is invalid.""" From 0ee9e158ddf69780a7482167c58a9636dcf00740 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:12:11 -0400 Subject: [PATCH 03/15] gate: add time utilities --- src/sourceos_gate/timeutil.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 src/sourceos_gate/timeutil.py diff --git a/src/sourceos_gate/timeutil.py b/src/sourceos_gate/timeutil.py new file mode 100644 index 0000000..1a1ff0e --- /dev/null +++ b/src/sourceos_gate/timeutil.py @@ -0,0 +1,13 @@ +"""Time helpers.""" + +from __future__ import annotations + +import time + + +def now_epoch() -> int: + return int(time.time()) + + +def now_iso_utc() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) From 5d09846e8e0df27c791a8e43201671bdd3716ec7 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:12:45 -0400 Subject: [PATCH 04/15] gate: add audit log writer --- src/sourceos_gate/audit.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 src/sourceos_gate/audit.py diff --git a/src/sourceos_gate/audit.py b/src/sourceos_gate/audit.py new file mode 100644 index 0000000..e2f8dcb --- /dev/null +++ b/src/sourceos_gate/audit.py @@ -0,0 +1,28 @@ +"""Append-only NDJSON audit log writer.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path + +from .timeutil import now_iso_utc + + +@dataclass(frozen=True) +class AuditLog: + root: Path + + def path_for(self, stream: str) -> Path: + # Store by day for easy rotation. + day = now_iso_utc()[:10] + return self.root / "audit" / "events" / day / f"{stream}.ndjson" + + def append(self, stream: str, record: dict) -> None: + p = self.path_for(stream) + p.parent.mkdir(parents=True, exist_ok=True) + rec = dict(record) + rec.setdefault("ts", now_iso_utc()) + line = json.dumps(rec, sort_keys=True) + with p.open("a", encoding="utf-8") as f: + f.write(line + "\n") From e99eef11093c6eb750a493c4f08880907e725475 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:13:42 -0400 Subject: [PATCH 05/15] gate: add sqlite state store (replay + grants) --- src/sourceos_gate/store.py | 150 +++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 src/sourceos_gate/store.py diff --git a/src/sourceos_gate/store.py b/src/sourceos_gate/store.py new file mode 100644 index 0000000..7bf62e6 --- /dev/null +++ b/src/sourceos_gate/store.py @@ -0,0 +1,150 @@ +"""SQLite-backed store for egress gate state. + +We use one sqlite database under the store root for: +- replay protection (token_id + nonce) +- active grants (targets, ports, proto, expiry) + +This is intentionally local-first and requires no external services. +""" + +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +from .errors import ExpiredGrantError, ReplayError +from .timeutil import now_epoch + + +@dataclass(frozen=True) +class Grant: + token_id: str + nonce: str + exp: int + targets: list[str] + ports: list[int] + proto: str + installed_at: int + + +@dataclass(frozen=True) +class GateStore: + root: Path + + @property + def db_path(self) -> Path: + return self.root / "gate" / "egress" / "state.sqlite" + + def connect(self) -> sqlite3.Connection: + self.db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(self.db_path.as_posix()) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + def init(self) -> None: + with self.connect() as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS replay (" + " token_id TEXT NOT NULL," + " nonce TEXT NOT NULL," + " exp INTEGER NOT NULL," + " seen_at INTEGER NOT NULL," + " PRIMARY KEY(token_id, nonce)" + ")" + ) + conn.execute( + "CREATE TABLE IF NOT EXISTS grants (" + " token_id TEXT NOT NULL," + " nonce TEXT NOT NULL," + " exp INTEGER NOT NULL," + " proto TEXT NOT NULL," + " targets_json TEXT NOT NULL," + " ports_json TEXT NOT NULL," + " installed_at INTEGER NOT NULL," + " PRIMARY KEY(token_id, nonce)" + ")" + ) + conn.execute("CREATE INDEX IF NOT EXISTS grants_exp_idx ON grants(exp)") + + def _record_replay(self, conn: sqlite3.Connection, token_id: str, nonce: str, exp: int) -> None: + try: + conn.execute( + "INSERT INTO replay (token_id, nonce, exp, seen_at) VALUES (?, ?, ?, ?)", + (token_id, nonce, int(exp), now_epoch()), + ) + except sqlite3.IntegrityError as e: + raise ReplayError("replay detected") from e + + def install_grant(self, token_id: str, nonce: str, exp: int, targets: Iterable[str], ports: Iterable[int], proto: str) -> Grant: + if exp <= now_epoch(): + raise ExpiredGrantError("grant expired") + + tlist = [str(t) for t in targets] + plist = [int(p) for p in ports] + p = (proto or "tcp").lower() + if p not in ("tcp", "udp"): + p = "tcp" + + self.init() + with self.connect() as conn: + self._record_replay(conn, token_id, nonce, exp) + conn.execute( + "INSERT INTO grants (token_id, nonce, exp, proto, targets_json, ports_json, installed_at)" + " VALUES (?, ?, ?, ?, ?, ?, ?)", + (token_id, nonce, int(exp), p, json.dumps(tlist), json.dumps(plist), now_epoch()), + ) + + return Grant(token_id=token_id, nonce=nonce, exp=int(exp), targets=tlist, ports=plist, proto=p, installed_at=now_epoch()) + + def prune_expired(self) -> int: + self.init() + cutoff = now_epoch() + with self.connect() as conn: + cur = conn.execute("SELECT COUNT(1) FROM grants WHERE exp <= ?", (cutoff,)) + to_delete = int(cur.fetchone()[0]) + conn.execute("DELETE FROM grants WHERE exp <= ?", (cutoff,)) + return to_delete + + def list_active(self) -> list[Grant]: + self.init() + cutoff = now_epoch() + out: list[Grant] = [] + with self.connect() as conn: + cur = conn.execute( + "SELECT token_id, nonce, exp, proto, targets_json, ports_json, installed_at FROM grants WHERE exp > ? ORDER BY installed_at ASC", + (cutoff,), + ) + for row in cur.fetchall(): + token_id, nonce, exp, proto, targets_json, ports_json, installed_at = row + out.append( + Grant( + token_id=str(token_id), + nonce=str(nonce), + exp=int(exp), + proto=str(proto), + targets=json.loads(targets_json), + ports=[int(p) for p in json.loads(ports_json)], + installed_at=int(installed_at), + ) + ) + return out + + def compute_active_sets(self) -> tuple[set[str], set[str], set[str]]: + addrs: set[str] = set() + tcp_ports: set[str] = set() + udp_ports: set[str] = set() + for g in self.list_active(): + for t in g.targets: + addrs.add(t.replace("/32", "")) + if g.proto == "udp": + for p in g.ports: + udp_ports.add(str(int(p))) + else: + for p in g.ports: + tcp_ports.add(str(int(p))) + return addrs, tcp_ports, udp_ports From dbed01d73200c841ff7ec56e9c11f16a9cfe0b45 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:14:42 -0400 Subject: [PATCH 06/15] gate: add nft integration module (apply + verify) --- src/sourceos_gate/nft.py | 166 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 src/sourceos_gate/nft.py diff --git a/src/sourceos_gate/nft.py b/src/sourceos_gate/nft.py new file mode 100644 index 0000000..3a077a0 --- /dev/null +++ b/src/sourceos_gate/nft.py @@ -0,0 +1,166 @@ +"""nftables integration for the egress gate. + +We mutate *only* allowlist sets defined by the baseline ruleset. +We never flush rulesets. + +Baseline expectation: +- table inet sourceos +- sets: frontier_allow_v4, frontier_allow_tcp_ports, frontier_allow_udp_ports +- chain output policy drop with rules referencing those sets + +This module provides: +- baseline presence checks +- apply allowlist set contents via nft -f - scripts +- read set contents via nft -j with fallback +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +from dataclasses import dataclass +from typing import Iterable + +from .errors import BaselineMissingError, NftError, PermissionError + + +@dataclass(frozen=True) +class NftConfig: + table_family: str = "inet" + table_name: str = "sourceos" + set_allow_v4: str = "frontier_allow_v4" + set_tcp_ports: str = "frontier_allow_tcp_ports" + set_udp_ports: str = "frontier_allow_udp_ports" + chain_output: str = "output" + + +def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: + return subprocess.run(cmd, check=check, text=True, capture_output=True) + + +def _run_script(script: str) -> None: + try: + subprocess.run(["nft", "-f", "-"], input=script, text=True, capture_output=True, check=True) + except subprocess.CalledProcessError as e: + raise NftError(e.stderr or e.stdout or "nft failed") from e + + +def _exists(kind: str, *parts: str) -> bool: + res = _run(["nft", "list", kind] + list(parts), check=False) + return res.returncode == 0 + + +def require_baseline(cfg: NftConfig = NftConfig()) -> None: + if not _exists("table", cfg.table_family, cfg.table_name): + raise BaselineMissingError("missing nft baseline table") + for s in (cfg.set_allow_v4, cfg.set_tcp_ports, cfg.set_udp_ports): + if not _exists("set", cfg.table_family, cfg.table_name, s): + raise BaselineMissingError(f"missing nft baseline set: {s}") + if not _exists("chain", cfg.table_family, cfg.table_name, cfg.chain_output): + raise BaselineMissingError("missing nft baseline output chain") + + +def apply_sets(addrs: Iterable[str], tcp_ports: Iterable[str], udp_ports: Iterable[str], cfg: NftConfig = NftConfig()) -> None: + if os.geteuid() != 0: + raise PermissionError("root required") + require_baseline(cfg) + + a = sorted({str(x).replace("/32", "") for x in addrs if str(x).strip()}) + t = sorted({str(int(p)) for p in tcp_ports if str(p).strip()}) + u = sorted({str(int(p)) for p in udp_ports if str(p).strip()}) + + lines = [ + f"flush set {cfg.table_family} {cfg.table_name} {cfg.set_allow_v4}", + f"flush set {cfg.table_family} {cfg.table_name} {cfg.set_tcp_ports}", + f"flush set {cfg.table_family} {cfg.table_name} {cfg.set_udp_ports}", + ] + if a: + lines.append(f"add element {cfg.table_family} {cfg.table_name} {cfg.set_allow_v4} {{ " + ", ".join(a) + " }}") + if t: + lines.append(f"add element {cfg.table_family} {cfg.table_name} {cfg.set_tcp_ports} {{ " + ", ".join(t) + " }}") + if u: + lines.append(f"add element {cfg.table_family} {cfg.table_name} {cfg.set_udp_ports} {{ " + ", ".join(u) + " }}") + + _run_script("\n".join(lines) + "\n") + + +def parse_nft_set_elements_json(obj: dict) -> set[str] | None: + nftables = obj.get("nftables") + if not isinstance(nftables, list): + return None + + elems: list[str] = [] + for entry in nftables: + if not isinstance(entry, dict): + continue + + if "set" in entry and isinstance(entry["set"], dict): + s = entry["set"] + raw = s.get("elem") + if isinstance(raw, list): + for it in raw: + if isinstance(it, dict) and "elem" in it: + v = it.get("elem") + if isinstance(v, dict) and "val" in v: + v = v.get("val") + if v is not None: + elems.append(str(v)) + elif it is not None: + elems.append(str(it)) + + if "elem" in entry and isinstance(entry["elem"], dict): + e = entry["elem"] + v = e.get("elem") + if isinstance(v, dict) and "val" in v: + v = v.get("val") + if v is not None: + elems.append(str(v)) + + return {e.strip() for e in elems if str(e).strip()} + + +def _list_set_json(setname: str, cfg: NftConfig) -> set[str] | None: + res = _run(["nft", "-j", "list", "set", cfg.table_family, cfg.table_name, setname], check=False) + if res.returncode != 0: + return None + try: + obj = json.loads(res.stdout) + except Exception: + return None + return parse_nft_set_elements_json(obj) + + +def _list_set_text(setname: str, cfg: NftConfig) -> set[str]: + res = _run(["nft", "list", "set", cfg.table_family, cfg.table_name, setname], check=False) + if res.returncode != 0: + raise NftError(f"cannot list set {setname}") + m = re.search(r"elements\s*=\s*\{(.*?)\}\s*", res.stdout, flags=re.S) + if not m: + return set() + inside = m.group(1).strip() + if not inside: + return set() + return {p.strip() for p in inside.split(",") if p.strip()} + + +def list_set(setname: str, cfg: NftConfig = NftConfig()) -> set[str]: + j = _list_set_json(setname, cfg) + if j is not None: + return j + return _list_set_text(setname, cfg) + + +def verify_sets(expected_addrs: set[str], expected_tcp: set[str], expected_udp: set[str], cfg: NftConfig = NftConfig()) -> None: + require_baseline(cfg) + act_addrs = {a.replace("/32", "") for a in list_set(cfg.set_allow_v4, cfg)} + act_tcp = {str(int(p)) for p in list_set(cfg.set_tcp_ports, cfg) if str(p).isdigit()} + act_udp = {str(int(p)) for p in list_set(cfg.set_udp_ports, cfg) if str(p).isdigit()} + + if act_addrs != expected_addrs: + raise NftError(f"allow_v4 mismatch: expected={sorted(expected_addrs)} actual={sorted(act_addrs)}") + if act_tcp != expected_tcp: + raise NftError(f"tcp ports mismatch: expected={sorted(expected_tcp)} actual={sorted(act_tcp)}") + if act_udp != expected_udp: + raise NftError(f"udp ports mismatch: expected={sorted(expected_udp)} actual={sorted(act_udp)}") From a0b56ceb8ecfb2131a6fb9933ecd74d0c19d851d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:16:09 -0400 Subject: [PATCH 07/15] gate: add EgressGate high-level API (store + nft + audit) --- src/sourceos_gate/egress.py | 114 ++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 src/sourceos_gate/egress.py diff --git a/src/sourceos_gate/egress.py b/src/sourceos_gate/egress.py new file mode 100644 index 0000000..20ac0d9 --- /dev/null +++ b/src/sourceos_gate/egress.py @@ -0,0 +1,114 @@ +"""High-level egress gate operations. + +This module ties together: +- GateStore (sqlite state) +- nft integration (apply/verify) +- audit log + +It provides a composable API used by both CLI and daemon. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from .audit import AuditLog +from .errors import GateError +from .nft import NftConfig, apply_sets, require_baseline, verify_sets +from .store import GateStore +from .timeutil import now_iso_utc + + +@dataclass(frozen=True) +class EgressGate: + store: GateStore + audit: AuditLog + nft_cfg: NftConfig = NftConfig() + + @classmethod + def for_root(cls, root: Path) -> "EgressGate": + return cls(store=GateStore(root), audit=AuditLog(root)) + + def init(self) -> None: + self.store.init() + + def install_grant( + self, + token_id: str, + nonce: str, + exp: int, + targets: list[str], + ports: list[int], + proto: str, + apply: bool = False, + ) -> None: + g = self.store.install_grant(token_id, nonce, exp, targets, ports, proto) + self.audit.append( + "gate.egress", + { + "action": "grant.install", + "token_id": g.token_id, + "nonce": g.nonce, + "exp": g.exp, + "proto": g.proto, + "targets": g.targets, + "ports": g.ports, + }, + ) + if apply: + self.apply() + + def prune(self, apply: bool = False) -> int: + removed = self.store.prune_expired() + self.audit.append( + "gate.egress", + { + "action": "grant.prune", + "removed": removed, + }, + ) + if apply: + self.apply() + return removed + + def apply(self) -> None: + require_baseline(self.nft_cfg) + addrs, tcp_ports, udp_ports = self.store.compute_active_sets() + apply_sets(addrs, tcp_ports, udp_ports, self.nft_cfg) + self.audit.append( + "gate.egress", + { + "action": "apply", + "allow_v4": sorted(addrs), + "tcp_ports": sorted(tcp_ports), + "udp_ports": sorted(udp_ports), + }, + ) + + def verify(self) -> None: + require_baseline(self.nft_cfg) + addrs, tcp_ports, udp_ports = self.store.compute_active_sets() + verify_sets(addrs, tcp_ports, udp_ports, self.nft_cfg) + self.audit.append( + "gate.egress", + { + "action": "verify", + "status": "ok", + "allow_v4": sorted(addrs), + "tcp_ports": sorted(tcp_ports), + "udp_ports": sorted(udp_ports), + }, + ) + + def snapshot(self) -> dict: + # A small structured status object for UI/ops. + addrs, tcp_ports, udp_ports = self.store.compute_active_sets() + return { + "ts": now_iso_utc(), + "active": { + "allow_v4": sorted(addrs), + "tcp_ports": sorted(tcp_ports), + "udp_ports": sorted(udp_ports), + }, + } From 51ad159df3903547208f59a5c60c342c2d4127a7 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:16:54 -0400 Subject: [PATCH 08/15] gate: add daemon JSON protocol helpers --- src/sourceos_gate/protocol.py | 76 +++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/sourceos_gate/protocol.py diff --git a/src/sourceos_gate/protocol.py b/src/sourceos_gate/protocol.py new file mode 100644 index 0000000..32b2c84 --- /dev/null +++ b/src/sourceos_gate/protocol.py @@ -0,0 +1,76 @@ +"""Unix-socket JSON protocol for the egress gate daemon. + +Protocol: one JSON object per line (NDJSON framing). + +Request: +{ + "id": "", + "method": "health|snapshot|grant.install|prune|apply|verify", + "params": { ... } +} + +Response: +{ + "id": "", + "ok": true|false, + "result": {...} | null, + "error": {"code": "...", "message": "..."} | null +} + +Notes: +- Authentication is by socket filesystem permissions (deployment responsibility). +- We do not attempt network auth or remote access. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from .errors import ( + BaselineMissingError, + ExpiredGrantError, + GateError, + NftError, + ReplayError, + ValidationError, +) + + +@dataclass(frozen=True) +class RpcError: + code: str + message: str + + def to_dict(self) -> dict: + return {"code": self.code, "message": self.message} + + +def ok_response(req_id: str | None, result: Any) -> dict: + return {"id": req_id, "ok": True, "result": result, "error": None} + + +def err_response(req_id: str | None, err: RpcError) -> dict: + return {"id": req_id, "ok": False, "result": None, "error": err.to_dict()} + + +def map_error(e: Exception) -> RpcError: + if isinstance(e, ReplayError): + return RpcError("replay", str(e) or "replay detected") + if isinstance(e, ExpiredGrantError): + return RpcError("expired", str(e) or "grant expired") + if isinstance(e, BaselineMissingError): + return RpcError("baseline_missing", str(e) or "nft baseline missing") + if isinstance(e, NftError): + return RpcError("nft", str(e) or "nft error") + if isinstance(e, ValidationError): + return RpcError("invalid", str(e) or "invalid request") + if isinstance(e, GateError): + return RpcError("gate", str(e) or "gate error") + return RpcError("internal", f"{type(e).__name__}: {e}") + + +def require_fields(obj: dict, fields: list[str]) -> None: + for f in fields: + if f not in obj: + raise ValidationError(f"missing field: {f}") From c9e417e92c7fe1ae2636d08f3c1704aad3ee18b7 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:20:46 -0400 Subject: [PATCH 09/15] gate: add asyncio unix socket daemon server --- src/sourceos_gate/daemon.py | 103 ++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 src/sourceos_gate/daemon.py diff --git a/src/sourceos_gate/daemon.py b/src/sourceos_gate/daemon.py new file mode 100644 index 0000000..9c87db0 --- /dev/null +++ b/src/sourceos_gate/daemon.py @@ -0,0 +1,103 @@ +"""Egress gate daemon. + +Runs a local Unix socket server and serves one-JSON-per-line requests. + +This is intended for host-local orchestration (systemd socket activation or +static socket path), and is not exposed to the network. +""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .egress import EgressGate +from .protocol import err_response, map_error, ok_response, require_fields +from .timeutil import now_iso_utc + + +@dataclass(frozen=True) +class DaemonConfig: + socket_path: Path + store_root: Path + + +async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, gate: EgressGate) -> None: + peer = writer.get_extra_info("peername") + try: + while not reader.at_eof(): + line = await reader.readline() + if not line: + break + raw = line.decode("utf-8", errors="replace").strip() + if not raw: + continue + + req_id: str | None = None + try: + req = json.loads(raw) + if not isinstance(req, dict): + raise ValueError("request is not an object") + req_id = req.get("id") + method = req.get("method") + params = req.get("params") or {} + if not isinstance(method, str): + raise ValueError("missing method") + if not isinstance(params, dict): + raise ValueError("params must be object") + + if method == "health": + res = ok_response(req_id, {"ts": now_iso_utc(), "status": "ok"}) + elif method == "snapshot": + res = ok_response(req_id, gate.snapshot()) + elif method == "grant.install": + require_fields(params, ["token_id", "nonce", "exp", "targets", "ports", "proto"]) + gate.install_grant( + token_id=str(params["token_id"]), + nonce=str(params["nonce"]), + exp=int(params["exp"]), + targets=[str(x) for x in params["targets"]], + ports=[int(x) for x in params["ports"]], + proto=str(params["proto"]), + apply=bool(params.get("apply", False)), + ) + res = ok_response(req_id, {"status": "installed"}) + elif method == "prune": + removed = gate.prune(apply=bool(params.get("apply", False))) + res = ok_response(req_id, {"removed": removed}) + elif method == "apply": + gate.apply() + res = ok_response(req_id, {"status": "applied"}) + elif method == "verify": + gate.verify() + res = ok_response(req_id, {"status": "ok"}) + else: + raise ValueError(f"unknown method: {method}") + except Exception as e: + res = err_response(req_id, map_error(e)) + + writer.write((json.dumps(res, sort_keys=True) + "\n").encode("utf-8")) + await writer.drain() + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + + +async def serve(cfg: DaemonConfig) -> None: + cfg.socket_path.parent.mkdir(parents=True, exist_ok=True) + if cfg.socket_path.exists(): + cfg.socket_path.unlink() + + gate = EgressGate.for_root(cfg.store_root) + gate.init() + + server = await asyncio.start_unix_server(lambda r, w: handle_client(r, w, gate), path=str(cfg.socket_path)) + + async with server: + await server.serve_forever() From 27f5962fb217ae3a06a22f85a54847009ad6f6f4 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:21:33 -0400 Subject: [PATCH 10/15] gate(tools): add daemon runner entrypoint --- tools/sourceos_gate_egressd.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tools/sourceos_gate_egressd.py diff --git a/tools/sourceos_gate_egressd.py b/tools/sourceos_gate_egressd.py new file mode 100644 index 0000000..6dbfb18 --- /dev/null +++ b/tools/sourceos_gate_egressd.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +"""Run the SourceOS egress gate daemon. + +This is a host-local Unix socket service. + +Example: + python tools/sourceos_gate_egressd.py --store-root /var/lib/sourceos --socket /run/sourceos/gate-egress.sock +""" + +from __future__ import annotations + +import argparse +import asyncio +from pathlib import Path + +from sourceos_gate.daemon import DaemonConfig, serve + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--store-root", default="/var/lib/sourceos") + ap.add_argument("--socket", default="/run/sourceos/gate-egress.sock") + args = ap.parse_args() + + cfg = DaemonConfig(socket_path=Path(args.socket), store_root=Path(args.store_root)) + asyncio.run(serve(cfg)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) From 9ce0d5a5d64960c4b4a97134b0940496d307e7ca Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:25:05 -0400 Subject: [PATCH 11/15] gate(cli): migrate egress gate CLI to sourceos_gate package --- tools/sourceos_gate_egress.py | 481 ++++++---------------------------- 1 file changed, 73 insertions(+), 408 deletions(-) diff --git a/tools/sourceos_gate_egress.py b/tools/sourceos_gate_egress.py index 2f3ab9a..27c8934 100644 --- a/tools/sourceos_gate_egress.py +++ b/tools/sourceos_gate_egress.py @@ -1,402 +1,43 @@ #!/usr/bin/env python3 -"""SourceOS v0 egress gate. +"""SourceOS egress gate (CLI). -Implements the enforcement posture described in: -- docs/TRUTH_PLANE_IMPLEMENTATION.md +This is the operator-facing CLI for the Truth Plane egress gate. -v0 scope: -- Maintain a replay cache (sqlite) for grant nonces. -- Maintain an allowlist state file for granted targets/ports + expiry. -- Optional **explicit apply** mode that mutates nftables allowlist sets only. -- Verification mode to ensure kernel nft set state matches allowlist state. +Key properties: +- Local-first +- Deny-by-default for frontier egress +- Explicit, auditable nft allowlist set mutation only (baseline must exist) +- Replay protection (token_id + nonce) -Security posture: -- local-first -- deny-by-default -- no covert behavior -- apply mode is opt-in and records an audit event per mutation +For a long-lived host-local service, use: +- tools/sourceos_gate_egressd.py (Unix socket daemon) -Important: -- The baseline ruleset (table/chain/set definitions + output policy) is expected - to be applied by an operator or image build lane. -- This tool must NOT flush the nft ruleset. It only flushes/updates allow sets. - -Usage: - # create state + replay db - python tools/sourceos_gate_egress.py init --store-root /var/lib/sourceos - - # record a grant (dry-run; state only) - python tools/sourceos_gate_egress.py grant --store-root /var/lib/sourceos \ - --token-id tok_123 --nonce n_001 --exp 1760000000 \ - --target 1.2.3.4/32 --port 443 - - # record + apply to nft allowlist sets (requires nft + root; baseline must already exist) - sudo python tools/sourceos_gate_egress.py grant --apply --store-root /var/lib/sourceos \ - --token-id tok_123 --nonce n_001 --exp 1760000000 \ - --target 1.2.3.4/32 --port 443 - - # record + apply UDP allowlist (e.g., DNS) - sudo python tools/sourceos_gate_egress.py grant --apply --proto udp --store-root /var/lib/sourceos \ - --token-id tok_dns --nonce n_dns --exp 1760000000 \ - --target 1.1.1.1/32 --port 53 - - # prune expired grants from state (and optionally apply) - sudo python tools/sourceos_gate_egress.py prune --apply --store-root /var/lib/sourceos - - # verify nft allow sets match state (requires nft; often requires root) - sudo python tools/sourceos_gate_egress.py verify --store-root /var/lib/sourceos +Docs: +- docs/TRUTH_PLANE_RUNBOOK.md +- docs/DEV_VALIDATE.md """ from __future__ import annotations import argparse -import json -import os -import re -import sqlite3 -import subprocess -import time +import sys from pathlib import Path +# Ensure repo-local src/ is importable without packaging. +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if SRC_DIR.is_dir() and str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) -def _ensure_dir(p: Path) -> None: - p.mkdir(parents=True, exist_ok=True) - - -def _db_path(root: Path) -> Path: - return root / "gate" / "egress" / "replay-cache.sqlite" - - -def _state_path(root: Path) -> Path: - return root / "gate" / "egress" / "allowlist.state.json" - - -def _audit_path(root: Path) -> Path: - day = time.strftime("%Y-%m-%d", time.gmtime()) - return root / "audit" / "events" / day / "gate.egress.ndjson" - - -def _connect(db: Path) -> sqlite3.Connection: - _ensure_dir(db.parent) - conn = sqlite3.connect(db.as_posix()) - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA synchronous=NORMAL") - return conn - - -def init_store(root: Path) -> None: - db = _db_path(root) - conn = _connect(db) - conn.execute( - "CREATE TABLE IF NOT EXISTS replay (token_id TEXT NOT NULL, nonce TEXT NOT NULL, exp INTEGER NOT NULL, seen_at INTEGER NOT NULL, PRIMARY KEY(token_id, nonce))" - ) - conn.commit() - conn.close() - - st = _state_path(root) - if not st.exists(): - _ensure_dir(st.parent) - st.write_text(json.dumps({"version": 0, "allow": []}, indent=2) + "\n", encoding="utf-8") - - -def _now_epoch() -> int: - return int(time.time()) - - -def _load_state(root: Path) -> dict: - st_path = _state_path(root) - if not st_path.exists(): - init_store(root) - return json.loads(_state_path(root).read_text(encoding="utf-8")) - - -def _save_state(root: Path, state: dict) -> None: - _state_path(root).write_text(json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8") - - -def _append_audit(root: Path, obj: dict) -> None: - p = _audit_path(root) - _ensure_dir(p.parent) - line = json.dumps(obj, sort_keys=True) - with p.open("a", encoding="utf-8") as f: - f.write(line + "\n") - - -def record_nonce(root: Path, token_id: str, nonce: str, exp: int) -> None: - conn = _connect(_db_path(root)) - try: - conn.execute( - "INSERT INTO replay (token_id, nonce, exp, seen_at) VALUES (?, ?, ?, ?)", - (token_id, nonce, int(exp), _now_epoch()), - ) - conn.commit() - except sqlite3.IntegrityError: - raise SystemExit("ERR: replay detected (token_id+nonce already seen)") - finally: - conn.close() - - -def prune_expired(root: Path) -> int: - state = _load_state(root) - allow = state.get("allow") or [] - now = _now_epoch() - kept = [a for a in allow if int(a.get("exp", 0)) > now] - removed = len(allow) - len(kept) - state["allow"] = kept - _save_state(root, state) - return removed - - -def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: - return subprocess.run(cmd, check=check, text=True, capture_output=True) - - -def _run_nft_script(script: str) -> None: - # Use stdin script mode for maximum compatibility across nft versions. - subprocess.run(["nft", "-f", "-"], input=script, text=True, capture_output=True, check=True) - - -def _nft_object_exists(kind: str, *parts: str) -> bool: - res = _run(["nft", "list", kind] + list(parts), check=False) - return res.returncode == 0 - - -def require_nft_baseline() -> None: - if not _nft_object_exists("table", "inet", "sourceos"): - raise SystemExit("ERR: nft baseline not found (missing table inet sourceos). Apply: sudo nft -f nft/sourceos-egress.nft") - - for setname in ("frontier_allow_v4", "frontier_allow_tcp_ports", "frontier_allow_udp_ports"): - if not _nft_object_exists("set", "inet", "sourceos", setname): - raise SystemExit( - f"ERR: nft baseline not found (missing set inet sourceos {setname}). Apply: sudo nft -f nft/sourceos-egress.nft" - ) - - if not _nft_object_exists("chain", "inet", "sourceos", "output"): - raise SystemExit("ERR: nft baseline not found (missing chain inet sourceos output). Apply: sudo nft -f nft/sourceos-egress.nft") - - -def _compute_expected_sets(root: Path) -> tuple[set[str], set[str], set[str]]: - state = _load_state(root) - allow = state.get("allow") or [] - now = _now_epoch() - - addrs: set[str] = set() - tcp_ports: set[str] = set() - udp_ports: set[str] = set() - - for a in allow: - if int(a.get("exp", 0)) <= now: - continue - for t in a.get("targets", []) or []: - addrs.add(str(t)) - proto = str(a.get("proto", "tcp")).lower() - for p in a.get("ports", []) or []: - if proto == "udp": - udp_ports.add(str(int(p))) - else: - tcp_ports.add(str(int(p))) - - # Normalize /32 to bare IP for comparison (nft may display either form). - addrs = {a.replace("/32", "") for a in addrs} - return addrs, tcp_ports, udp_ports - - -def parse_nft_set_elements_json(obj: dict) -> set[str] | None: - """Extract a flat set of element values from `nft -j` JSON output. - - This is used by: - - `verify` path (when nft JSON mode is available) - - fixtures-based parser tests under `tools/test_nft_json_parse.py` - - The parser is intentionally best-effort across nft versions. - """ - - nftables = obj.get("nftables") - if not isinstance(nftables, list): - return None - - elems: list[str] = [] - - for entry in nftables: - if not isinstance(entry, dict): - continue - - # Common structure: entry {"set": {"elem": [ {"elem": }, ... ]}} - if "set" in entry and isinstance(entry["set"], dict): - s = entry["set"] - raw = s.get("elem") - if isinstance(raw, list): - for it in raw: - if isinstance(it, dict) and "elem" in it: - v = it.get("elem") - if isinstance(v, dict) and "val" in v: - v = v.get("val") - if v is not None: - elems.append(str(v)) - elif it is not None: - elems.append(str(it)) - - # Alternate structure: entry {"elem": {"set": "", "elem": }} - if "elem" in entry and isinstance(entry["elem"], dict): - e = entry["elem"] - v = e.get("elem") - if isinstance(v, dict) and "val" in v: - v = v.get("val") - if v is not None: - elems.append(str(v)) - - return {e.strip() for e in elems if str(e).strip()} - - -# Back-compat: retained for one release; use parse_nft_set_elements_json going forward. -_nft_set_elements_json_from_obj = parse_nft_set_elements_json # type: ignore - - -def _nft_set_elements_json(setname: str) -> set[str] | None: - res = _run(["nft", "-j", "list", "set", "inet", "sourceos", setname], check=False) - if res.returncode != 0: - return None - - try: - obj = json.loads(res.stdout) - except Exception: - return None - - return parse_nft_set_elements_json(obj) - - -def _nft_set_elements_text(setname: str) -> set[str]: - res = _run(["nft", "list", "set", "inet", "sourceos", setname], check=False) - if res.returncode != 0: - raise SystemExit(f"ERR: unable to list nft set inet sourceos {setname}. Try running as root.") - - m = re.search(r"elements\s*=\s*\{(.*?)\}\s*", res.stdout, flags=re.S) - if not m: - return set() - - inside = m.group(1).strip() - if not inside: - return set() - - parts = [p.strip() for p in inside.split(",")] - out = {p for p in parts if p} - return out - - -def _nft_set_elements(setname: str) -> set[str]: - j = _nft_set_elements_json(setname) - if j is not None: - return j - return _nft_set_elements_text(setname) - - -def verify_allowlists(root: Path) -> None: - require_nft_baseline() - - exp_addrs, exp_tcp, exp_udp = _compute_expected_sets(root) - - act_addrs = {a.replace("/32", "") for a in _nft_set_elements("frontier_allow_v4")} - act_tcp = {str(int(p)) for p in _nft_set_elements("frontier_allow_tcp_ports") if p.isdigit()} - act_udp = {str(int(p)) for p in _nft_set_elements("frontier_allow_udp_ports") if p.isdigit()} - - problems: list[str] = [] - - if act_addrs != exp_addrs: - problems.append(f"frontier_allow_v4 mismatch: expected={sorted(exp_addrs)} actual={sorted(act_addrs)}") - - if act_tcp != exp_tcp: - problems.append(f"frontier_allow_tcp_ports mismatch: expected={sorted(exp_tcp)} actual={sorted(act_tcp)}") - - if act_udp != exp_udp: - problems.append(f"frontier_allow_udp_ports mismatch: expected={sorted(exp_udp)} actual={sorted(act_udp)}") - - if problems: - for p in problems: - print("ERR:", p) - raise SystemExit(2) - - print("OK: nft allow sets match allowlist.state.json") - - -def apply_allowlists(root: Path) -> None: - if os.geteuid() != 0: - raise SystemExit("ERR: --apply requires root") - - require_nft_baseline() - - exp_addrs, exp_tcp, exp_udp = _compute_expected_sets(root) - - script_lines: list[str] = [ - "flush set inet sourceos frontier_allow_v4", - "flush set inet sourceos frontier_allow_tcp_ports", - "flush set inet sourceos frontier_allow_udp_ports", - ] - - if exp_addrs: - script_lines.append("add element inet sourceos frontier_allow_v4 { " + ", ".join(sorted(exp_addrs)) + " }") - - if exp_tcp: - script_lines.append( - "add element inet sourceos frontier_allow_tcp_ports { " + ", ".join(sorted(exp_tcp)) + " }" - ) - - if exp_udp: - script_lines.append( - "add element inet sourceos frontier_allow_udp_ports { " + ", ".join(sorted(exp_udp)) + " }" - ) - - _run_nft_script("\n".join(script_lines) + "\n") - - _append_audit( - root, - { - "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "module": "sourceos-gate-egress", - "action": "apply_allowlists", - "frontier_allow_v4": sorted(exp_addrs), - "frontier_allow_tcp_ports": sorted(exp_tcp), - "frontier_allow_udp_ports": sorted(exp_udp), - }, - ) - - -def grant_install(root: Path, token_id: str, nonce: str, exp: int, targets: list[str], ports: list[int], proto: str, apply: bool) -> None: - if exp <= _now_epoch(): - raise SystemExit("ERR: grant expired") - - record_nonce(root, token_id, nonce, exp) - - state = _load_state(root) - allow = state.get("allow") or [] - - allow.append( - { - "token_id": token_id, - "nonce": nonce, - "exp": int(exp), - "targets": targets, - "ports": ports, - "proto": proto, - "installed_at": _now_epoch(), - } - ) - - state["allow"] = allow - _save_state(root, state) - - for t in targets: - print(f"ALLOW (state): {t} proto={proto} ports={ports} until exp={exp}") - - if apply: - apply_allowlists(root) +from sourceos_gate.egress import EgressGate # noqa: E402 +from sourceos_gate.errors import GateError # noqa: E402 def main() -> int: ap = argparse.ArgumentParser() - sub = ap.add_subparsers(dest="cmd", required=True) - ap.add_argument("--store-root", default="/var/lib/sourceos") - ap.add_argument("--apply", action="store_true", help="Apply allowlist sets via nft (requires nft + root; baseline must exist)") + + sub = ap.add_subparsers(dest="cmd", required=True) sub.add_parser("init") @@ -404,41 +45,65 @@ def main() -> int: p_grant.add_argument("--token-id", required=True) p_grant.add_argument("--nonce", required=True) p_grant.add_argument("--exp", required=True, type=int) - p_grant.add_argument("--proto", default="tcp", choices=["tcp", "udp"], help="protocol for allowed ports") + p_grant.add_argument("--proto", default="tcp", choices=["tcp", "udp"]) p_grant.add_argument("--target", action="append", default=[], help="CIDR or IP (repeatable)") p_grant.add_argument("--port", action="append", default=[], type=int, help="port (repeatable)") + p_grant.add_argument("--apply", action="store_true", help="Apply allowlist sets via nft (requires root + baseline)") - sub.add_parser("prune") + p_prune = sub.add_parser("prune") + p_prune.add_argument("--apply", action="store_true") + + sub.add_parser("apply") sub.add_parser("verify") + sub.add_parser("snapshot") args = ap.parse_args() root = Path(args.store_root) + gate = EgressGate.for_root(root) - if args.cmd == "init": - init_store(root) - print(str(_db_path(root))) - return 0 - - if args.cmd == "verify": - verify_allowlists(root) - return 0 - - if args.cmd == "prune": - removed = prune_expired(root) - print(f"PRUNED: {removed}") - if args.apply: - apply_allowlists(root) - return 0 - - if args.cmd == "grant": - targets = args.target or [] - ports = args.port or [] - if not targets: - raise SystemExit("ERR: at least one --target is required") - if not ports: - ports = [443] - grant_install(root, args.token_id, args.nonce, args.exp, targets, ports, args.proto, args.apply) - return 0 + try: + if args.cmd == "init": + gate.init() + print(str(gate.store.db_path)) + return 0 + + if args.cmd == "grant": + if not args.target: + raise SystemExit("ERR: at least one --target is required") + ports = args.port or [443] + gate.install_grant( + token_id=args.token_id, + nonce=args.nonce, + exp=args.exp, + targets=args.target, + ports=ports, + proto=args.proto, + apply=bool(args.apply), + ) + print("OK") + return 0 + + if args.cmd == "prune": + removed = gate.prune(apply=bool(args.apply)) + print(f"PRUNED: {removed}") + return 0 + + if args.cmd == "apply": + gate.apply() + print("APPLIED") + return 0 + + if args.cmd == "verify": + gate.verify() + print("OK") + return 0 + + if args.cmd == "snapshot": + print(gate.snapshot()) + return 0 + + except GateError as e: + raise SystemExit(f"ERR: {e}") raise SystemExit("ERR: unknown cmd") From 89cd94c9e0a004407efff303454b461e67bdd86b Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:30:21 -0400 Subject: [PATCH 12/15] gate(daemon): make runner importable via src/ path --- tools/sourceos_gate_egressd.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/sourceos_gate_egressd.py b/tools/sourceos_gate_egressd.py index 6dbfb18..54b44f9 100644 --- a/tools/sourceos_gate_egressd.py +++ b/tools/sourceos_gate_egressd.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 -"""Run the SourceOS egress gate daemon. - -This is a host-local Unix socket service. - -Example: - python tools/sourceos_gate_egressd.py --store-root /var/lib/sourceos --socket /run/sourceos/gate-egress.sock -""" +"""Run the SourceOS egress gate daemon.""" from __future__ import annotations import argparse import asyncio +import sys from pathlib import Path -from sourceos_gate.daemon import DaemonConfig, serve +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if SRC_DIR.is_dir() and str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from sourceos_gate.daemon import DaemonConfig, serve # noqa: E402 def main() -> int: From 34b6c7c14cbfe54d04792fe91db221497bd1da06 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:33:54 -0400 Subject: [PATCH 13/15] tests: add GateStore unit tests --- tests/test_gate_store.py | 53 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/test_gate_store.py diff --git a/tests/test_gate_store.py b/tests/test_gate_store.py new file mode 100644 index 0000000..0425a98 --- /dev/null +++ b/tests/test_gate_store.py @@ -0,0 +1,53 @@ +import tempfile +import unittest +from pathlib import Path +import sys + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from sourceos_gate.store import GateStore +from sourceos_gate.errors import ReplayError, ExpiredGrantError + + +class GateStoreTests(unittest.TestCase): + def test_install_and_list_active(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + store = GateStore(root) + store.init() + + exp = 9999999999 + store.install_grant("tok", "n1", exp, ["10.0.0.1/32"], [443], "tcp") + store.install_grant("tok2", "n2", exp, ["10.0.0.2/32"], [53], "udp") + + active = store.list_active() + self.assertEqual(len(active), 2) + + addrs, tcp_ports, udp_ports = store.compute_active_sets() + self.assertIn("10.0.0.1", addrs) + self.assertIn("10.0.0.2", addrs) + self.assertIn("443", tcp_ports) + self.assertIn("53", udp_ports) + + def test_replay_detection(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + store = GateStore(root) + exp = 9999999999 + store.install_grant("tok", "n1", exp, ["10.0.0.1/32"], [443], "tcp") + with self.assertRaises(ReplayError): + store.install_grant("tok", "n1", exp, ["10.0.0.1/32"], [443], "tcp") + + def test_expired_grant_rejected(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + store = GateStore(root) + with self.assertRaises(ExpiredGrantError): + store.install_grant("tok", "n1", 1, ["10.0.0.1/32"], [443], "tcp") + + +if __name__ == "__main__": + unittest.main() From 6a1152f966e3af571953e7e757156ef4fda74a3d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:34:34 -0400 Subject: [PATCH 14/15] tests: add nft json parsing tests for sourceos_gate.nft --- tests/test_nft_parse.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 tests/test_nft_parse.py diff --git a/tests/test_nft_parse.py b/tests/test_nft_parse.py new file mode 100644 index 0000000..bb22a46 --- /dev/null +++ b/tests/test_nft_parse.py @@ -0,0 +1,36 @@ +import json +import unittest +from pathlib import Path +import sys + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from sourceos_gate import nft + + +class NftParseTests(unittest.TestCase): + def _load(self, name: str) -> dict: + p = REPO_ROOT / "tools" / "fixtures" / name + return json.loads(p.read_text(encoding="utf-8")) + + def test_parse_v4(self): + obj = self._load("nft_set_frontier_allow_v4.json") + elems = nft.parse_nft_set_elements_json(obj) + self.assertEqual(sorted(elems), ["10.0.0.1", "10.0.0.2"]) + + def test_parse_ports_string(self): + obj = self._load("nft_set_frontier_allow_tcp_ports.json") + elems = nft.parse_nft_set_elements_json(obj) + self.assertEqual(sorted(elems), ["443", "8443"]) + + def test_parse_ports_int(self): + obj = self._load("nft_set_frontier_allow_tcp_ports_int.json") + elems = nft.parse_nft_set_elements_json(obj) + self.assertEqual(sorted(elems), ["11", "17"]) + + +if __name__ == "__main__": + unittest.main() From 12b546f980dff21b5ac30cecc8a7572106e2db59 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:38:54 -0400 Subject: [PATCH 15/15] docs: add unittest discovery step --- docs/DEV_VALIDATE.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/DEV_VALIDATE.md b/docs/DEV_VALIDATE.md index 68fdd30..99931c7 100644 --- a/docs/DEV_VALIDATE.md +++ b/docs/DEV_VALIDATE.md @@ -30,7 +30,7 @@ sudo nft -f nft/sourceos-egress.nft && \ --- -## 3) nft -j parser fixtures (no privilege) +## 3) Parser fixtures (no privilege) ```bash python tools/test_nft_json_parse.py @@ -38,6 +38,14 @@ python tools/test_nft_json_parse.py --- +## 4) Unit tests (no privilege) + +```bash +python -m unittest discover -s tests +``` + +--- + ## Notes - v0 targets are IP/CIDR. Hostname pinning is deferred.