Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/DEV_VALIDATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,22 @@ sudo nft -f nft/sourceos-egress.nft && \

---

## 3) nft -j parser fixtures (no privilege)
## 3) Parser fixtures (no privilege)

```bash
python tools/test_nft_json_parse.py
```

---

## 4) Unit tests (no privilege)

```bash
python -m unittest discover -s tests
```

---

## Notes

- v0 targets are IP/CIDR. Hostname pinning is deferred.
Expand Down
1 change: 1 addition & 0 deletions src/sourceos_gate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""SourceOS Gate package."""
28 changes: 28 additions & 0 deletions src/sourceos_gate/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Append-only NDJSON audit log writer."""

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path

from .timeutil import now_iso_utc


@dataclass(frozen=True)
class AuditLog:
root: Path

def path_for(self, stream: str) -> Path:
# Store by day for easy rotation.
day = now_iso_utc()[:10]
return self.root / "audit" / "events" / day / f"{stream}.ndjson"

def append(self, stream: str, record: dict) -> None:
p = self.path_for(stream)
p.parent.mkdir(parents=True, exist_ok=True)
rec = dict(record)
rec.setdefault("ts", now_iso_utc())
line = json.dumps(rec, sort_keys=True)
with p.open("a", encoding="utf-8") as f:
f.write(line + "\n")
103 changes: 103 additions & 0 deletions src/sourceos_gate/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Egress gate daemon.

Runs a local Unix socket server and serves one-JSON-per-line requests.

This is intended for host-local orchestration (systemd socket activation or
static socket path), and is not exposed to the network.
"""

from __future__ import annotations

import asyncio
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any

from .egress import EgressGate
from .protocol import err_response, map_error, ok_response, require_fields
from .timeutil import now_iso_utc


@dataclass(frozen=True)
class DaemonConfig:
socket_path: Path
store_root: Path


async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, gate: EgressGate) -> None:
peer = writer.get_extra_info("peername")
try:
while not reader.at_eof():
line = await reader.readline()
if not line:
break
raw = line.decode("utf-8", errors="replace").strip()
if not raw:
continue

req_id: str | None = None
try:
req = json.loads(raw)
if not isinstance(req, dict):
raise ValueError("request is not an object")
req_id = req.get("id")
method = req.get("method")
params = req.get("params") or {}
if not isinstance(method, str):
raise ValueError("missing method")
if not isinstance(params, dict):
raise ValueError("params must be object")

if method == "health":
res = ok_response(req_id, {"ts": now_iso_utc(), "status": "ok"})
elif method == "snapshot":
res = ok_response(req_id, gate.snapshot())
elif method == "grant.install":
require_fields(params, ["token_id", "nonce", "exp", "targets", "ports", "proto"])
gate.install_grant(
token_id=str(params["token_id"]),
nonce=str(params["nonce"]),
exp=int(params["exp"]),
targets=[str(x) for x in params["targets"]],
ports=[int(x) for x in params["ports"]],
proto=str(params["proto"]),
apply=bool(params.get("apply", False)),
)
res = ok_response(req_id, {"status": "installed"})
elif method == "prune":
removed = gate.prune(apply=bool(params.get("apply", False)))
res = ok_response(req_id, {"removed": removed})
elif method == "apply":
gate.apply()
res = ok_response(req_id, {"status": "applied"})
elif method == "verify":
gate.verify()
res = ok_response(req_id, {"status": "ok"})
else:
raise ValueError(f"unknown method: {method}")
except Exception as e:
res = err_response(req_id, map_error(e))

writer.write((json.dumps(res, sort_keys=True) + "\n").encode("utf-8"))
await writer.drain()
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass


async def serve(cfg: DaemonConfig) -> None:
cfg.socket_path.parent.mkdir(parents=True, exist_ok=True)
if cfg.socket_path.exists():
cfg.socket_path.unlink()

gate = EgressGate.for_root(cfg.store_root)
gate.init()

server = await asyncio.start_unix_server(lambda r, w: handle_client(r, w, gate), path=str(cfg.socket_path))

async with server:
await server.serve_forever()
114 changes: 114 additions & 0 deletions src/sourceos_gate/egress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""High-level egress gate operations.

This module ties together:
- GateStore (sqlite state)
- nft integration (apply/verify)
- audit log

It provides a composable API used by both CLI and daemon.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

from .audit import AuditLog
from .errors import GateError
from .nft import NftConfig, apply_sets, require_baseline, verify_sets
from .store import GateStore
from .timeutil import now_iso_utc


@dataclass(frozen=True)
class EgressGate:
store: GateStore
audit: AuditLog
nft_cfg: NftConfig = NftConfig()

@classmethod
def for_root(cls, root: Path) -> "EgressGate":
return cls(store=GateStore(root), audit=AuditLog(root))

def init(self) -> None:
self.store.init()

def install_grant(
self,
token_id: str,
nonce: str,
exp: int,
targets: list[str],
ports: list[int],
proto: str,
apply: bool = False,
) -> None:
g = self.store.install_grant(token_id, nonce, exp, targets, ports, proto)
self.audit.append(
"gate.egress",
{
"action": "grant.install",
"token_id": g.token_id,
"nonce": g.nonce,
"exp": g.exp,
"proto": g.proto,
"targets": g.targets,
"ports": g.ports,
},
)
if apply:
self.apply()

def prune(self, apply: bool = False) -> int:
removed = self.store.prune_expired()
self.audit.append(
"gate.egress",
{
"action": "grant.prune",
"removed": removed,
},
)
if apply:
self.apply()
return removed

def apply(self) -> None:
require_baseline(self.nft_cfg)
addrs, tcp_ports, udp_ports = self.store.compute_active_sets()
apply_sets(addrs, tcp_ports, udp_ports, self.nft_cfg)
self.audit.append(
"gate.egress",
{
"action": "apply",
"allow_v4": sorted(addrs),
"tcp_ports": sorted(tcp_ports),
"udp_ports": sorted(udp_ports),
},
)

def verify(self) -> None:
require_baseline(self.nft_cfg)
addrs, tcp_ports, udp_ports = self.store.compute_active_sets()
verify_sets(addrs, tcp_ports, udp_ports, self.nft_cfg)
self.audit.append(
"gate.egress",
{
"action": "verify",
"status": "ok",
"allow_v4": sorted(addrs),
"tcp_ports": sorted(tcp_ports),
"udp_ports": sorted(udp_ports),
},
)

def snapshot(self) -> dict:
# A small structured status object for UI/ops.
addrs, tcp_ports, udp_ports = self.store.compute_active_sets()
return {
"ts": now_iso_utc(),
"active": {
"allow_v4": sorted(addrs),
"tcp_ports": sorted(tcp_ports),
"udp_ports": sorted(udp_ports),
},
}
31 changes: 31 additions & 0 deletions src/sourceos_gate/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Error types for SourceOS gate components."""

from __future__ import annotations


class GateError(Exception):
"""Base error for gate operations."""


class ReplayError(GateError):
"""Raised when a token+nonce pair has already been seen."""


class ExpiredGrantError(GateError):
"""Raised when a grant has expired."""


class BaselineMissingError(GateError):
"""Raised when nft baseline objects are missing."""


class PermissionError(GateError):
"""Raised when operation requires elevated privileges."""


class NftError(GateError):
"""Raised when nft operations fail."""


class ValidationError(GateError):
"""Raised when request payload is invalid."""
Loading
Loading