diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 8604711..55954e8 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -93,7 +93,7 @@ jobs: run: python tools/test_pipelines.py runtime-scaffold: - name: v0.9 runtime scaffold (sub-issue #120) + name: v0.9 runtime scaffold + Stage 1 Verify (sub-issues #120, #121) runs-on: ubuntu-latest strategy: fail-fast: false @@ -109,18 +109,22 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Install runtime package (editable) + - name: Install runtime package (editable) + tooling deps run: | python -m pip install --upgrade pip + python -m pip install -r tools/requirements.txt python -m pip install -e . - name: Run runtime-scaffold test suite run: python tools/test_runtime_scaffold.py + - name: Run runtime-verify test suite (Stage 1) + run: python tools/test_runtime_verify.py + - name: Confirm `lifectl` console script installed run: | lifectl version - # exit non-zero on info / run is expected (scaffold-only stubs) + # missing-path on info / run is expected to exit non-zero set +e lifectl info pretend.life info_rc=$? @@ -128,10 +132,10 @@ jobs: run_rc=$? set -e if [ "$info_rc" -eq 0 ] || [ "$run_rc" -eq 0 ]; then - echo "ERROR: lifectl info/run unexpectedly succeeded in scaffold-only build" + echo "ERROR: lifectl info/run unexpectedly succeeded for missing path" exit 1 fi - echo "scaffold stubs exit non-zero as expected (info=$info_rc, run=$run_rc)" + echo "missing-path stubs exit non-zero as expected (info=$info_rc, run=$run_rc)" docs: name: Lint docs (markdownlint + linkcheck) diff --git a/CHANGELOG.md b/CHANGELOG.md index 779a1a0..be40ebe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,48 @@ are upheld trivially: the scaffold cannot violate them because none of the gates run yet. Sub-issues #121–#126 reinstate each invariant as they implement the corresponding Stage. +### Added (sub-issue #121 — Stage 1 Verify) + +- `runtime/verify/` populated with the seven §2.1–§2.5 + lifecycle gate + sub-steps (`_structural`, `_schema`, `_time`, `_inventory`, + `_audit_chain`, `_consent`, `_lifecycle`) plus a public + `verify(life_path, *, audit, withdrawal_policy)` entry point in + `runtime/verify/__init__.py`. The function returns a structured + `VerifyResult` (with `package_id`, `lifecycle_state`, audit-chain + length, inventory entry count, errors, warnings) on every call — + ok or not — so the caller can present a structured rejection + reason to the user (D6=fail-close). +- `runtime/audit/recorder.py` — minimal in-memory `AuditRecorder` used + by Stages 1-4 until v0.9 sub-issue #125 ships the full v0.4 hash-chain + emitter that links the runtime's session log back to the bundled + `audit/events.jsonl` chain. Records `mount_attempted`, + `withdrawal_poll`, and `assembly_aborted{stage="verify"}` events. +- `runtime/cli/lifectl.py` — `lifectl info ` now prints a + structured Stage 1 report (human-readable by default, JSON via + `--json`) and exits 0 on PASS / 1 on FAIL. `lifectl run ` + executes Stage 1 and exits 1 on Stage 1 FAIL or 2 (with + `Stage 2+ pending sub-issues #122-#126` to stderr) on Stage 1 PASS. + Both subcommands accept `--withdrawal-mock {not-revoked|revoked| + unreachable|malformed}` (test-only; production runtimes MUST omit it + — spec mandates a real HTTP poll per §2.5). +- `tools/test_runtime_verify.py` — 15 sanity-test cases covering the + seven verify sub-steps plus three CLI surface assertions. Negative + fixtures are constructed by mutating a freshly-built `.life` zip + (`_rebuild_zip_with`) and the happy path drives a real local + `http.server` so the §2.5 `urllib.request.urlopen` path is exercised + end-to-end. The driver is wired into the existing + `runtime-scaffold` CI job as a new `Run runtime-verify test suite + (Stage 1)` step. +- `.github/workflows/validate.yml` — `runtime-scaffold` job renamed to + cover both #120 and #121, installs `tools/requirements.txt` (for + `jsonschema`) before running Stage 1 tests. + +Hard-rule invariants now enforced for Stage 1: D6=fail-close (any +sub-step failure aborts Stage 1, emits `assembly_aborted`, and refuses +to proceed); the §2.5 withdrawal pre-flight rejects 4xx/5xx/network +failures; the lifecycle gate refuses `withdrawn` and `tainted` +packages. + ## v0.8-asset-architecture (2026-04-26) **Status**: Released. v0.8 closes the four asset-architecture gaps left diff --git a/runtime/README.md b/runtime/README.md index b3cb9b1..5657315 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -26,12 +26,21 @@ runtime/ ``` pip install -e . # from repo root lifectl version # confirm install -lifectl run examples/minimal-life-package/out/*.life +lifectl info examples/minimal-life-package/out/*.life --withdrawal-mock not-revoked +lifectl run examples/minimal-life-package/out/*.life --withdrawal-mock not-revoked ``` -Until v0.9 sub-issues #121-#126 land, `lifectl info` and `lifectl run` exit -with a "not yet implemented in this sub-issue" message — only `lifectl version` -is functional. +As of v0.9 sub-issue #121, Stage 1 Verify is wired: + +- `lifectl info ` prints a structured §2.1–§2.5 + lifecycle report + (human-readable by default, JSON via `--json`) and exits **0** on PASS / + **1** on FAIL. +- `lifectl run ` runs Stage 1 then exits **2** with a "Stage 2+ pending + sub-issues #122-#126" message; full mount comes online sub-issue by + sub-issue. + +`--withdrawal-mock` is **test-only**; production runtimes MUST omit it so +the §2.5 withdrawal endpoint is genuinely polled over HTTP. ## Why a separate Python package? diff --git a/runtime/__init__.py b/runtime/__init__.py index da2a411..7b31f72 100644 --- a/runtime/__init__.py +++ b/runtime/__init__.py @@ -3,12 +3,13 @@ This package implements the protocol defined in ``docs/LIFE_RUNTIME_STANDARD.md`` (v0.7 §1-10 + v0.8 Part B 5-stage assembly). -Public surface today (v0.9 sub-issue #120 — scaffold only): +Public surface as of v0.9 sub-issue #121 (Stage 1 Verify wired): - ``__version__`` — runtime package version (``0.9.0.dev0``). - ``LIFE_RUNTIME_PROTOCOL_VERSION`` — declared life-runtime spec version. -- ``Runtime`` — placeholder class; concrete assembly stages land in sub-issues - #121-#126. +- ``Runtime`` — placeholder class; concrete Stages 2-5 land in sub-issues + #122-#125, end-to-end echo Provider in #126. +- ``runtime.verify.verify`` — Stage 1 Verify entry point. The ``runtime.cli.lifectl`` module exposes the ``lifectl`` console script. """ @@ -29,7 +30,7 @@ def __init__(self) -> None: def __repr__(self) -> str: return ( f"Runtime(version={self.version!r}, " - f"protocol={self.protocol!r}, stages_implemented=[])" + f"protocol={self.protocol!r}, stages_implemented=['verify'])" ) diff --git a/runtime/audit/__init__.py b/runtime/audit/__init__.py index c7a0555..af5f8ad 100644 --- a/runtime/audit/__init__.py +++ b/runtime/audit/__init__.py @@ -1,4 +1,15 @@ -"""Runtime-side audit emitter (v0.4 hash chain). +"""Runtime-side audit emitter. -Stub in v0.9 sub-issue #120; full implementation lands in #125 (Stage 5 Guard). +v0.9 sub-issue #121 lands the *recorder* surface used by Stages 1-4 +(``AuditRecorder.emit(event_type, **fields)``). The full v0.4 hash-chain +implementation that links the runtime's session log back to the bundled +``audit/events.jsonl`` chain ships in v0.9 sub-issue #125 (Stage 5 Guard); +until then events are accumulated in memory and optionally written to a +JSONL file for inspection / test assertions. """ + +from __future__ import annotations + +from runtime.audit.recorder import AuditRecorder, AuditEvent + +__all__ = ["AuditRecorder", "AuditEvent"] diff --git a/runtime/audit/recorder.py b/runtime/audit/recorder.py new file mode 100644 index 0000000..22e9cff --- /dev/null +++ b/runtime/audit/recorder.py @@ -0,0 +1,89 @@ +"""Lightweight in-memory audit recorder used until v0.9 sub-issue #125. + +This is **not** the v0.4 hash-chain emitter — that lands in #125 and will +take over both responsibilities of recording AND of chaining +``prev_hash`` from the bundled ``audit/events.jsonl`` tip. + +For Stage 1 Verify (sub-issue #121) the runtime needs a way to record +``mount_attempted``, ``withdrawal_poll``, and ``assembly_aborted`` events +deterministically so tests can assert on them. The recorder accumulates +events in order and optionally mirrors them to a JSONL file. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + +@dataclass(frozen=True) +class AuditEvent: + """One recorded audit event. + + ``event_type`` matches the v0.7 / v0.8 vocabulary (e.g. + ``mount_attempted``, ``withdrawal_poll``, ``assembly_aborted``, + ``capability_bound``, ``lifecycle_transition_observed``, ``unmount``). + ``occurred_at`` is the wall-clock timestamp at recording. + ``fields`` carries per-event payload as a plain dict. + """ + + event_type: str + occurred_at: str + fields: dict[str, Any] + + def to_dict(self) -> dict[str, Any]: + return { + "event_type": self.event_type, + "occurred_at": self.occurred_at, + "fields": self.fields, + } + + +@dataclass +class AuditRecorder: + """Append-only ordered list of audit events. + + Pass ``mirror_path`` to also stream each event to a JSONL file on + disk (used by ``lifectl info`` to produce inspectable output). The + full hash-chained emitter from sub-issue #125 will subclass / replace + this object while keeping the same ``emit`` API. + """ + + mirror_path: Path | None = None + events: list[AuditEvent] = field(default_factory=list) + + def emit(self, event_type: str, **fields: Any) -> AuditEvent: + evt = AuditEvent( + event_type=event_type, + occurred_at=_utc_now_iso(), + fields=dict(fields), + ) + self.events.append(evt) + if self.mirror_path is not None: + self.mirror_path.parent.mkdir(parents=True, exist_ok=True) + with self.mirror_path.open("a", encoding="utf-8") as fp: + fp.write(json.dumps(evt.to_dict(), sort_keys=True) + "\n") + return evt + + def types(self) -> list[str]: + return [e.event_type for e in self.events] + + def latest(self, event_type: str) -> AuditEvent | None: + for evt in reversed(self.events): + if evt.event_type == event_type: + return evt + return None + + +def default_mirror_path(package_id: str) -> Path: + """Default per-mount audit log path: ``$XDG_DATA_HOME or ~/.local/share/dlrs/mounts//events.jsonl``.""" + base = os.environ.get("XDG_DATA_HOME") or str(Path.home() / ".local" / "share") + return Path(base) / "dlrs" / "mounts" / package_id / "events.jsonl" diff --git a/runtime/cli/lifectl.py b/runtime/cli/lifectl.py index d5c5606..9105d7f 100644 --- a/runtime/cli/lifectl.py +++ b/runtime/cli/lifectl.py @@ -1,22 +1,24 @@ """``lifectl`` — DLRS reference runtime CLI. -v0.9 sub-issue #120 (scaffold). Concrete assembly logic lands in #121-#126. +v0.9 sub-issue #120 laid down the scaffold; #121 wires Stage 1 Verify. +Stages 2-5 land in #122-#125; the e2e echo Provider + conformance +harness in #126. """ from __future__ import annotations import argparse +import json import sys from pathlib import Path from runtime import LIFE_RUNTIME_PROTOCOL_VERSION, __version__ +from runtime.audit import AuditRecorder +from runtime.verify import VerifyResult, WithdrawalPolicy, verify -_NOT_IMPLEMENTED_INFO = ( - "lifectl info: not yet implemented (v0.9 sub-issue #121 — Stage 1 Verify)." -) -_NOT_IMPLEMENTED_RUN = ( - "lifectl run: not yet implemented (v0.9 sub-issues #121-#126 — full 5-stage " - "assembly)." + +_NOT_IMPLEMENTED_RUN_TAIL = ( + "Stage 2+ pending sub-issues #122-#126." ) @@ -35,9 +37,15 @@ def _build_parser() -> argparse.ArgumentParser: info = sub.add_parser( "info", - help="print a structured verification report for a `.life` archive (Stage 1 only)", + help="print a structured Stage 1 Verify report for a `.life` archive", ) info.add_argument("life_path", type=Path, help="path to a `.life` archive") + _add_verify_options(info) + info.add_argument( + "--json", + action="store_true", + help="emit a JSON document instead of human-readable text", + ) run = sub.add_parser("run", help="mount and run a `.life` archive end-to-end") run.add_argument("life_path", type=Path, help="path to a `.life` archive") @@ -61,22 +69,146 @@ def _build_parser() -> argparse.ArgumentParser: "≥24h in production)" ), ) + _add_verify_options(run) return parser +def _add_verify_options(p: argparse.ArgumentParser) -> None: + p.add_argument( + "--withdrawal-mock", + choices=["not-revoked", "revoked", "unreachable", "malformed"], + default=None, + help=( + "TEST ONLY: short-circuit the §2.5 withdrawal HTTP poll with a " + "deterministic outcome. Production runtimes MUST omit this flag." + ), + ) + p.add_argument( + "--withdrawal-timeout", + type=float, + default=10.0, + help="HTTP timeout (seconds) for the §2.5 withdrawal pre-flight (default: 10)", + ) + + +def _withdrawal_policy_from_args(args: argparse.Namespace) -> WithdrawalPolicy: + if args.withdrawal_mock is None: + return WithdrawalPolicy(mode="online", timeout_seconds=args.withdrawal_timeout) + return WithdrawalPolicy( + mode=f"mock-{args.withdrawal_mock}", # type: ignore[arg-type] + timeout_seconds=args.withdrawal_timeout, + ) + + def cmd_version() -> int: print(f"lifectl {__version__} (life-runtime v{LIFE_RUNTIME_PROTOCOL_VERSION})") return 0 -def cmd_info(_args: argparse.Namespace) -> int: - print(_NOT_IMPLEMENTED_INFO, file=sys.stderr) - return 2 - - -def cmd_run(_args: argparse.Namespace) -> int: - print(_NOT_IMPLEMENTED_RUN, file=sys.stderr) +def _verify_result_to_dict(result: VerifyResult) -> dict: + return { + "ok": result.ok, + "life_path": str(result.life_path), + "package_id": result.package_id, + "schema_version": result.schema_version, + "mode": result.mode, + "record_id": result.record_id, + "created_at": result.created_at, + "expires_at": result.expires_at, + "runtime_compatibility": result.runtime_compatibility, + "lifecycle_state": result.lifecycle_state, + "audit_chain_length": result.audit_chain_length, + "audit_event_ref": result.audit_event_ref, + "inventory_entries_verified": result.inventory_entries_verified, + "forbidden_uses_count": len(result.forbidden_uses), + "errors": [e.to_dict() for e in result.errors], + "warnings": result.warnings, + } + + +def _print_human_report(result: VerifyResult) -> None: + out = sys.stdout + print(f"life_path: {result.life_path}", file=out) + print(f"package_id: {result.package_id}", file=out) + print(f"schema_version: {result.schema_version}", file=out) + print(f"mode: {result.mode}", file=out) + print(f"record_id: {result.record_id}", file=out) + print(f"created_at: {result.created_at}", file=out) + print(f"expires_at: {result.expires_at}", file=out) + print( + f"runtime_compat: {', '.join(result.runtime_compatibility) or '(none)'}", + file=out, + ) + print(f"lifecycle_state: {result.lifecycle_state}", file=out) + print(f"audit_chain_len: {result.audit_chain_length}", file=out) + print(f"audit_event_ref: {result.audit_event_ref}", file=out) + print( + f"inventory_verified: {result.inventory_entries_verified} entries", + file=out, + ) + print(f"forbidden_uses: {len(result.forbidden_uses)} key(s)", file=out) + if result.warnings: + print("warnings:", file=out) + for w in result.warnings: + print(f" - {w}", file=out) + verdict = "PASS" if result.ok else "FAIL" + print(f"verification: {verdict}", file=out) + if not result.ok: + print("errors:", file=sys.stderr) + for err in result.errors: + line = f" [{err.step}] {err.reason}" + if err.detail: + line += f" ({err.detail})" + print(line, file=sys.stderr) + + +def cmd_info(args: argparse.Namespace) -> int: + if not args.life_path.exists(): + print(f"life_path does not exist: {args.life_path}", file=sys.stderr) + return 2 + + audit = AuditRecorder() + result = verify( + args.life_path, + audit=audit, + withdrawal_policy=_withdrawal_policy_from_args(args), + ) + if args.json: + print( + json.dumps(_verify_result_to_dict(result), indent=2, ensure_ascii=False) + ) + else: + _print_human_report(result) + return 0 if result.ok else 1 + + +def cmd_run(args: argparse.Namespace) -> int: + audit = AuditRecorder() + result = verify( + args.life_path, + audit=audit, + withdrawal_policy=_withdrawal_policy_from_args(args), + ) + if not result.ok: + first = result.first_error() + if first is not None: + print( + f"Stage 1 Verify FAIL [{first.step}] {first.reason}" + + (f" ({first.detail})" if first.detail else ""), + file=sys.stderr, + ) + else: # pragma: no cover - defensive + print("Stage 1 Verify FAIL", file=sys.stderr) + return 1 + + print("Stage 1 Verify ✓", file=sys.stdout) + print(f"package_id={result.package_id} mode={result.mode} " + f"lifecycle_state={result.lifecycle_state}", file=sys.stdout) + if result.warnings: + for w in result.warnings: + print(f"warning: {w}", file=sys.stdout) + print(_NOT_IMPLEMENTED_RUN_TAIL, file=sys.stderr) return 2 diff --git a/runtime/verify/__init__.py b/runtime/verify/__init__.py index d8348a8..647c18f 100644 --- a/runtime/verify/__init__.py +++ b/runtime/verify/__init__.py @@ -1 +1,126 @@ -"""Stage 1 — Verify. Populated in v0.9 sub-issue #121.""" +"""Stage 1 — Verify (sub-issue #121). + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.1-§2.5 (v0.7 load sequence) +plus v0.8 Part B §B.1 row 1 (lifecycle gate + withdrawal pre-flight + +audit-chain integrity). + +Public surface:: + + from runtime.verify import verify, VerifyResult, WithdrawalPolicy + result = verify(life_path, audit=recorder, withdrawal_policy=...) + +The ``verify`` function executes seven sub-steps in order. The first +failure aborts, emits an ``assembly_aborted{stage="verify", reason}`` +audit event (when an audit recorder is supplied), and returns the +``VerifyResult`` with ``ok=False`` so the caller can present a +structured rejection to the user. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from runtime.verify._audit_chain import verify_audit_chain +from runtime.verify._consent import ( + WithdrawalPolicy, + poll_withdrawal_endpoint, + verify_consent_readable, +) +from runtime.verify._inventory import verify_inventory +from runtime.verify._lifecycle import gate_lifecycle +from runtime.verify._schema import ( + validate_descriptor, + validate_forbidden_uses_namespace, +) +from runtime.verify._structural import open_archive, parse_descriptor +from runtime.verify._time import check_time_bounds +from runtime.verify.result import VerifyError, VerifyResult + + +__all__ = [ + "verify", + "VerifyResult", + "VerifyError", + "WithdrawalPolicy", +] + + +def _abort(vr: VerifyResult, audit: Any | None) -> VerifyResult: + if audit is not None and vr.errors: + first = vr.first_error() + audit.emit( + "assembly_aborted", + stage="verify", + reason=first.reason if first else "unknown", + step=first.step if first else None, + detail=first.detail if first else None, + ) + return vr + + +def verify( + life_path: str | Path, + *, + audit: Any | None = None, + withdrawal_policy: WithdrawalPolicy | None = None, +) -> VerifyResult: + """Run Stage 1 Verify against ``life_path``. + + ``audit`` should be a ``runtime.audit.AuditRecorder`` (or any object + with a compatible ``emit(event_type, **fields)`` method). Pass + ``None`` to skip emission entirely (only in tests / introspection + flows). + + ``withdrawal_policy`` defaults to ``WithdrawalPolicy(mode="online")`` + which performs a real HTTP GET. Tests use ``mock-…`` modes. + """ + + path = Path(life_path) + policy = withdrawal_policy or WithdrawalPolicy() + + vr = VerifyResult(ok=True, life_path=path) + + if audit is not None: + audit.emit( + "mount_attempted", + life_path=str(path), + stage_about_to_run="verify", + ) + + zf = open_archive(path, vr) + if zf is None: + return _abort(vr, audit) + + try: + descriptor = parse_descriptor(zf, vr) + if descriptor is None: + return _abort(vr, audit) + + if not validate_descriptor(descriptor, vr): + return _abort(vr, audit) + if not validate_forbidden_uses_namespace(vr): + return _abort(vr, audit) + + if not check_time_bounds(vr): + return _abort(vr, audit) + + if not verify_inventory(zf, descriptor, vr): + return _abort(vr, audit) + + if not verify_audit_chain(zf, descriptor, vr): + return _abort(vr, audit) + + if not verify_consent_readable(zf, descriptor, vr): + return _abort(vr, audit) + + emit = audit.emit if audit is not None else None + if not poll_withdrawal_endpoint(descriptor, policy, vr, audit_emit=emit): + return _abort(vr, audit) + + if not gate_lifecycle(zf, vr): + return _abort(vr, audit) + finally: + zf.close() + + return vr diff --git a/runtime/verify/_audit_chain.py b/runtime/verify/_audit_chain.py new file mode 100644 index 0000000..067766a --- /dev/null +++ b/runtime/verify/_audit_chain.py @@ -0,0 +1,147 @@ +"""Stage 1.5 — Audit chain verification. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.4 + v0.4 audit hash-chain +semantics (canonical JSON: sorted keys, no whitespace; hash = ``sha256:`` ++ hex of the canonical-without-``hash`` line). + +Steps: + +1. Read ``audit/events.jsonl`` line by line. +2. For each event verify ``hash`` is the canonical sha256 of the rest + of the event AND that ``prev_hash`` matches the previous event's + ``hash`` (or is ``null`` for the first event). +3. Resolve ``life-package.json::audit_event_ref`` (``audit/events.jsonl#L``) + and verify the referenced line is a ``package_emitted`` event whose + payload references the package's ``package_id``. +""" + +from __future__ import annotations + +import hashlib +import json +import re +import zipfile +from typing import Any + +from runtime.verify.result import VerifyResult + + +_AUDIT_PATH = "audit/events.jsonl" +_AUDIT_REF_RE = re.compile(r"^audit/events\.jsonl#L([1-9][0-9]*)$") + + +def _canonical_dump(obj: dict[str, Any]) -> str: + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + + +def _sha256_of(s: str) -> str: + return "sha256:" + hashlib.sha256(s.encode("utf-8")).hexdigest() + + +def verify_audit_chain( + zf: zipfile.ZipFile, + descriptor: dict[str, Any], + vr: VerifyResult, +) -> bool: + if _AUDIT_PATH not in zf.namelist(): + vr.add_error("audit_chain", "audit_log_missing", _AUDIT_PATH) + return False + + try: + raw = zf.read(_AUDIT_PATH).decode("utf-8") + except UnicodeDecodeError as exc: + vr.add_error("audit_chain", "audit_log_not_utf8", str(exc)) + return False + lines = [line for line in raw.splitlines() if line.strip()] + if not lines: + vr.add_error("audit_chain", "audit_log_empty") + return False + + parsed: list[dict[str, Any]] = [] + for idx, line in enumerate(lines, start=1): + try: + evt = json.loads(line) + except json.JSONDecodeError as exc: + vr.add_error( + "audit_chain", + "audit_line_not_json", + f"L{idx}: {exc}", + ) + return False + if not isinstance(evt, dict): + vr.add_error("audit_chain", "audit_line_not_object", f"L{idx}") + return False + parsed.append(evt) + + prev_hash: str | None = None + for idx, evt in enumerate(parsed, start=1): + declared_prev = evt.get("prev_hash", None) + if declared_prev != prev_hash: + vr.add_error( + "audit_chain", + "prev_hash_break", + f"L{idx}: declared={declared_prev!r} expected={prev_hash!r}", + ) + return False + + declared_hash = evt.get("hash") + if not isinstance(declared_hash, str): + vr.add_error( + "audit_chain", + "missing_hash", + f"L{idx}", + ) + return False + recompute_input = {k: v for k, v in evt.items() if k != "hash"} + recomputed = _sha256_of(_canonical_dump(recompute_input)) + if recomputed != declared_hash: + vr.add_error( + "audit_chain", + "hash_mismatch", + f"L{idx}: declared={declared_hash} recomputed={recomputed}", + ) + return False + prev_hash = declared_hash + + vr.audit_chain_length = len(parsed) + + aer = descriptor.get("audit_event_ref") + if not isinstance(aer, str): + vr.add_error("audit_chain", "audit_event_ref_missing") + return False + m = _AUDIT_REF_RE.match(aer) + if not m: + vr.add_error( + "audit_chain", + "audit_event_ref_unparseable", + aer, + ) + return False + line_num = int(m.group(1)) + if line_num < 1 or line_num > len(parsed): + vr.add_error( + "audit_chain", + "audit_event_ref_out_of_range", + f"line={line_num} chain_length={len(parsed)}", + ) + return False + + referenced = parsed[line_num - 1] + if referenced.get("event_type") != "package_emitted": + vr.add_error( + "audit_chain", + "audit_event_ref_wrong_type", + f"event_type={referenced.get('event_type')!r}", + ) + return False + metadata = referenced.get("metadata") or {} + declared_pkg = metadata.get("package_id") if isinstance(metadata, dict) else None + if declared_pkg != descriptor.get("package_id"): + vr.add_error( + "audit_chain", + "audit_event_ref_wrong_package", + f"event.package_id={declared_pkg!r} descriptor.package_id={descriptor.get('package_id')!r}", + ) + return False + + return True diff --git a/runtime/verify/_consent.py b/runtime/verify/_consent.py new file mode 100644 index 0000000..46696a4 --- /dev/null +++ b/runtime/verify/_consent.py @@ -0,0 +1,227 @@ +"""Stage 1.6 — Consent + withdrawal pre-flight. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.5. + +Two responsibilities: + +1. ``consent_evidence_ref`` MUST be readable. v0.9 supports + in-archive references (``consent/...``) and same-archive paths + only; external URIs are deferred to v0.10 once the federation + appendix lands. +2. The withdrawal endpoint MUST be polled at session start. By default + the runtime issues an HTTP GET; the body MUST parse to a JSON object + whose ``status`` is anything other than ``"withdrawn"``. + +For testing / offline evaluation the caller may pass a non-default +``WithdrawalPolicy`` that short-circuits the HTTP call. ``offline`` +mode is rejected unless the issuer's consent document explicitly +opted into offline operation; this gate is delegated to v0.10's +consent-document parser. v0.9 only supports the test-side mock policy +``mock-…`` which is forbidden in production builds. +""" + +from __future__ import annotations + +import json +import urllib.error +import urllib.parse +import urllib.request +import zipfile +from dataclasses import dataclass +from typing import Any, Literal + +from runtime.verify.result import VerifyResult + + +WithdrawalMode = Literal[ + "online", + "mock-not-revoked", + "mock-revoked", + "mock-unreachable", + "mock-malformed", +] + + +@dataclass(frozen=True) +class WithdrawalPolicy: + """How to evaluate ``life-package.withdrawal_endpoint`` at Stage 1. + + ``mode == "online"`` (default): real HTTP GET, follow the spec. + ``mode.startswith("mock-")``: test-only short-circuit. + """ + + mode: WithdrawalMode = "online" + timeout_seconds: float = 10.0 + + def is_mock(self) -> bool: + return self.mode.startswith("mock-") + + +def verify_consent_readable( + zf: zipfile.ZipFile, + descriptor: dict[str, Any], + vr: VerifyResult, +) -> bool: + ref = descriptor.get("consent_evidence_ref") + if not isinstance(ref, str) or not ref: + vr.add_error("consent", "consent_evidence_ref_missing") + return False + + parsed = urllib.parse.urlparse(ref) + if parsed.scheme and parsed.scheme not in ("file",): + vr.add_error( + "consent", + "external_consent_uri_not_supported_at_v0_9", + ref, + ) + return False + + candidate = ref + if parsed.scheme == "file": + candidate = parsed.path.lstrip("/") + try: + data = zf.read(candidate) + except KeyError: + vr.add_error("consent", "consent_evidence_ref_unreadable", ref) + return False + if not data: + vr.add_error("consent", "consent_evidence_empty", ref) + return False + return True + + +def _interpret_response(body: bytes, vr: VerifyResult) -> bool: + try: + text = body.decode("utf-8") + except UnicodeDecodeError as exc: + vr.add_error("withdrawal", "response_not_utf8", str(exc)) + return False + try: + parsed = json.loads(text) + except json.JSONDecodeError as exc: + vr.add_error("withdrawal", "response_not_json", str(exc)) + return False + if not isinstance(parsed, dict): + vr.add_error("withdrawal", "response_not_object") + return False + status = parsed.get("status") + if status == "withdrawn": + vr.add_error("withdrawal", "package_withdrawn") + return False + return True + + +def poll_withdrawal_endpoint( + descriptor: dict[str, Any], + policy: WithdrawalPolicy, + vr: VerifyResult, + audit_emit: Any | None = None, +) -> bool: + """Issue the §2.5 step-2 pre-flight poll. + + On success returns ``True`` and emits a ``withdrawal_poll`` audit + event with ``result="not_revoked"``. On any failure returns + ``False`` and emits the same audit event with the appropriate + ``result`` string. + """ + + endpoint = descriptor.get("withdrawal_endpoint") + if not isinstance(endpoint, str) or not endpoint: + vr.add_error("withdrawal", "withdrawal_endpoint_missing") + return False + + if policy.mode == "mock-not-revoked": + if audit_emit: + audit_emit("withdrawal_poll", endpoint=endpoint, result="not_revoked") + return True + if policy.mode == "mock-revoked": + if audit_emit: + audit_emit("withdrawal_poll", endpoint=endpoint, result="revoked") + vr.add_error("withdrawal", "package_withdrawn") + return False + if policy.mode == "mock-unreachable": + if audit_emit: + audit_emit("withdrawal_poll", endpoint=endpoint, result="unreachable") + vr.add_error("withdrawal", "endpoint_unreachable", "(mock)") + return False + if policy.mode == "mock-malformed": + if audit_emit: + audit_emit("withdrawal_poll", endpoint=endpoint, result="malformed") + vr.add_error("withdrawal", "response_not_json", "(mock)") + return False + + # Real HTTP GET path (default). Construct the Request defensively — + # urllib raises ``ValueError("unknown url type")`` for schemeless URLs, + # which the descriptor schema does not currently reject. + try: + req = urllib.request.Request( + endpoint, + method="GET", + headers={"User-Agent": "lifectl/0.9"}, + ) + except ValueError as exc: + if audit_emit: + audit_emit( + "withdrawal_poll", + endpoint=endpoint, + result="malformed_url", + ) + vr.add_error("withdrawal", "endpoint_malformed_url", str(exc)) + return False + try: + with urllib.request.urlopen(req, timeout=policy.timeout_seconds) as resp: + status_code = getattr(resp, "status", 200) + body = resp.read() + except urllib.error.HTTPError as exc: + if audit_emit: + audit_emit( + "withdrawal_poll", + endpoint=endpoint, + result="unreachable", + http_status=exc.code, + ) + vr.add_error( + "withdrawal", + "endpoint_http_error", + f"{endpoint} -> HTTP {exc.code}", + ) + return False + except (urllib.error.URLError, TimeoutError, OSError) as exc: + if audit_emit: + audit_emit("withdrawal_poll", endpoint=endpoint, result="unreachable") + vr.add_error("withdrawal", "endpoint_unreachable", str(exc)) + return False + + if status_code >= 400: + if audit_emit: + audit_emit( + "withdrawal_poll", + endpoint=endpoint, + result="unreachable", + http_status=status_code, + ) + vr.add_error( + "withdrawal", + "endpoint_http_error", + f"{endpoint} -> HTTP {status_code}", + ) + return False + + if not _interpret_response(body, vr): + if audit_emit: + audit_emit( + "withdrawal_poll", + endpoint=endpoint, + result="malformed_or_revoked", + http_status=status_code, + ) + return False + + if audit_emit: + audit_emit( + "withdrawal_poll", + endpoint=endpoint, + result="not_revoked", + http_status=status_code, + ) + return True diff --git a/runtime/verify/_inventory.py b/runtime/verify/_inventory.py new file mode 100644 index 0000000..dfe98e3 --- /dev/null +++ b/runtime/verify/_inventory.py @@ -0,0 +1,108 @@ +"""Stage 1.4 — Inventory integrity. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.3. + +Walk every entry in ``life-package.json::contents[]``: + +- verify the path exists in the zip +- verify the decompressed sha256 matches +- verify the decompressed size matches + +Then ensure every zip entry (other than ``life-package.json``) is listed +in ``contents[]`` — extra entries indicate tampering or build-tool +misuse and MUST be rejected. +""" + +from __future__ import annotations + +import hashlib +import zipfile +from typing import Any + +from runtime.verify.result import VerifyResult + + +_DESCRIPTOR_NAME = "life-package.json" + + +def _strip_sha_prefix(s: str) -> str: + return s[len("sha256:") :] if s.startswith("sha256:") else s + + +def _sha256_bytes(data: bytes) -> str: + return "sha256:" + hashlib.sha256(data).hexdigest() + + +def verify_inventory( + zf: zipfile.ZipFile, + descriptor: dict[str, Any], + vr: VerifyResult, +) -> bool: + contents = descriptor.get("contents", []) + if not isinstance(contents, list): + vr.add_error("inventory", "contents_not_array") + return False + + listed_paths: set[str] = set() + verified = 0 + + for idx, entry in enumerate(contents): + if not isinstance(entry, dict): + vr.add_error("inventory", "entry_not_object", f"contents[{idx}]") + return False + path = entry.get("path") + expected_sha = entry.get("sha256") + expected_size = entry.get("size") + if not isinstance(path, str): + vr.add_error("inventory", "entry_path_missing", f"contents[{idx}]") + return False + if not isinstance(expected_sha, str): + vr.add_error("inventory", "entry_sha_missing", path) + return False + if not isinstance(expected_size, int): + vr.add_error("inventory", "entry_size_missing", path) + return False + + listed_paths.add(path) + + try: + data = zf.read(path) + except KeyError: + vr.add_error("inventory", "missing_zip_entry", path) + return False + + if len(data) != expected_size: + vr.add_error( + "inventory", + "size_mismatch", + f"{path}: declared={expected_size} actual={len(data)}", + ) + return False + + actual_sha = _sha256_bytes(data) + if _strip_sha_prefix(actual_sha) != _strip_sha_prefix(expected_sha): + vr.add_error( + "inventory", + "hash_mismatch", + f"{path}: declared={expected_sha} actual={actual_sha}", + ) + return False + + verified += 1 + + actual_files = { + info.filename + for info in zf.infolist() + if not info.is_dir() + } + extra = sorted(actual_files - listed_paths - {_DESCRIPTOR_NAME}) + if extra: + vr.add_error( + "inventory", + "unlisted_zip_entry", + ", ".join(extra[:5]) + (" …" if len(extra) > 5 else ""), + ) + return False + + vr.inventory_entries_verified = verified + return True diff --git a/runtime/verify/_lifecycle.py b/runtime/verify/_lifecycle.py new file mode 100644 index 0000000..225ab68 --- /dev/null +++ b/runtime/verify/_lifecycle.py @@ -0,0 +1,79 @@ +"""Stage 1.7 — Lifecycle gate. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` Part B §B.1 row 1 + v0.8 +``docs/LIFE_LIFECYCLE_SPEC.md``. + +The gate behaviour: + +- ``active`` / ``superseded`` → proceed (Stage 2 carries on). +- ``frozen`` (memorial) → proceed but flag the runtime so Stage 4 Run + enters memorial read-only mode (full enforcement of memorial mode is + v0.9 sub-issue #125 — Stage 5 Guard). +- ``withdrawn`` → reject. +- ``tainted`` → reject (per lifecycle spec a tainted package MUST NOT + be served). + +Packages predating v0.8 do not carry ``lifecycle/lifecycle.json``; +absence is treated as ``active`` (the v0.7 default) and a warning is +recorded so the runtime operator knows the package is pre-v0.8. +""" + +from __future__ import annotations + +import json +import zipfile +from typing import Any + +from runtime.verify.result import VerifyResult + + +_LIFECYCLE_PATH = "lifecycle/lifecycle.json" +_VALID_STATES = {"active", "superseded", "frozen", "withdrawn", "tainted"} + + +def gate_lifecycle(zf: zipfile.ZipFile, vr: VerifyResult) -> bool: + if _LIFECYCLE_PATH not in zf.namelist(): + vr.lifecycle_state = "active" + vr.warnings.append( + f"{_LIFECYCLE_PATH} absent — treating package as `active` " + "(pre-v0.8 emission)." + ) + return True + + try: + raw = zf.read(_LIFECYCLE_PATH) + except KeyError: # pragma: no cover — guarded above + vr.add_error("lifecycle", "lifecycle_unreadable", _LIFECYCLE_PATH) + return False + try: + doc: Any = json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + vr.add_error("lifecycle", "lifecycle_unparseable", str(exc)) + return False + if not isinstance(doc, dict): + vr.add_error("lifecycle", "lifecycle_not_object") + return False + + state = doc.get("lifecycle_state") + if not isinstance(state, str): + vr.add_error("lifecycle", "lifecycle_state_missing") + return False + if state not in _VALID_STATES: + vr.add_error("lifecycle", "lifecycle_state_unknown", state) + return False + + vr.lifecycle_state = state + + if state == "withdrawn": + vr.add_error("lifecycle", "package_withdrawn") + return False + if state == "tainted": + vr.add_error("lifecycle", "package_tainted") + return False + if state == "frozen": + vr.warnings.append( + "package lifecycle_state=frozen — Stage 4 Run will enter " + "memorial read-only mode (full enforcement at sub-issue #125)." + ) + + return True diff --git a/runtime/verify/_schema.py b/runtime/verify/_schema.py new file mode 100644 index 0000000..d411e7c --- /dev/null +++ b/runtime/verify/_schema.py @@ -0,0 +1,104 @@ +"""Stage 1.2 — Schema validation. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.1 (life-package schema) + +v0.8 ``docs/LIFE_BINDING_SPEC.md`` §7 (forbidden_uses key namespace). + +The runtime trusts the existing repository schema at +``schemas/life-package.schema.json`` as its source of truth. Authoring- +time validation already runs against it via ``tools/build_life_package`` ++ ``tools/test_life_package_schema``; the runtime re-validates at mount +because runtimes MUST NOT trust unverified inputs. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import jsonschema + +from runtime.verify.result import VerifyResult + + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_LIFE_PACKAGE_SCHEMA_PATH = _REPO_ROOT / "schemas" / "life-package.schema.json" + + +# v0.8 binding-spec §7 hybrid namespace — the core enum the runtime +# recognises out of the box. ``x-`` extension keys are advisory at v0.9 +# (the runtime warns but does not reject); unknown non-``x-`` keys +# fail-close per binding-spec §7. +# +# The set is deliberately conservative for v0.9; the official registry +# document lives in the binding spec itself. Any v0.7/v0.8-era life- +# package emitted by the existing builder uses the keys below. +_CORE_FORBIDDEN_USES_KEYS: set[str] = { + # Identity / impersonation + "impersonation_for_fraud", + "impersonation_real_person", + "voice_clone_for_fraud", + "avatar_clone", + # Memorial + "memorial_reanimation_without_executor", + # Sensitive content + "explicit_content", + "explicit_sexual_content", + "harassment", + "spam_advertising", + # Influence / endorsement + "political_endorsement", + "fraud", + # Specialised advice + "medical_diagnosis", + "legal_advice", + "financial_advice", +} + + +def _load_schema() -> dict[str, Any]: + return json.loads(_LIFE_PACKAGE_SCHEMA_PATH.read_text(encoding="utf-8")) + + +def validate_descriptor(descriptor: dict[str, Any], vr: VerifyResult) -> bool: + """Validate ``life-package.json`` against its JSON Schema.""" + + schema = _load_schema() + validator = jsonschema.Draft202012Validator(schema) + errors = sorted(validator.iter_errors(descriptor), key=lambda e: e.path) + if errors: + for err in errors: + path = "/".join(str(p) for p in err.absolute_path) or "" + vr.add_error( + "schema", + "life_package_schema_violation", + f"{path}: {err.message}", + ) + return False + return True + + +def validate_forbidden_uses_namespace(vr: VerifyResult) -> bool: + """Reject unknown non-``x-`` ``forbidden_uses[]`` keys (binding-spec §7). + + ``x-`` extension keys are accepted but flagged via ``vr.warnings`` — + the runtime recognises them as advisory until a Provider explicitly + enforces them in Stage 4 Run. + """ + + ok = True + for key in vr.forbidden_uses: + if key.startswith("x-"): + vr.warnings.append( + f"forbidden_uses extension key {key!r} has no built-in enforcer " + "(advisory only at v0.9; binding-spec §7)." + ) + continue + if key not in _CORE_FORBIDDEN_USES_KEYS: + vr.add_error( + "schema", + "forbidden_use_unknown_key", + f"{key!r} is not in the v0.8 core enum and lacks an `x-` prefix", + ) + ok = False + return ok diff --git a/runtime/verify/_structural.py b/runtime/verify/_structural.py new file mode 100644 index 0000000..cbb4bfb --- /dev/null +++ b/runtime/verify/_structural.py @@ -0,0 +1,124 @@ +"""Stage 1.1 — Open + structural validation. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.1. +""" + +from __future__ import annotations + +import json +import zipfile +from pathlib import Path +from typing import Any + +from runtime.verify.result import VerifyResult + + +_DESCRIPTOR_NAME = "life-package.json" + + +def _is_safe_member_name(name: str) -> bool: + """Reject path traversal / absolute paths / device paths inside the zip. + + The .life format is a portable archive — every entry must be a plain + relative POSIX path with no ``..`` segment and no leading ``/``. + """ + + if not name: + return False + if name.startswith("/") or "\\" in name: + return False + parts = name.split("/") + if any(part in ("", ".", "..") for part in parts): + return False + return True + + +def open_archive(life_path: Path, vr: VerifyResult) -> zipfile.ZipFile | None: + """Open the .life archive and validate its structural shape. + + On success returns the open ``ZipFile`` (caller is responsible for + closing). On failure adds an error to ``vr`` and returns ``None``. + """ + + if not life_path.exists(): + vr.add_error("structural", "life_path_missing", str(life_path)) + return None + if not life_path.is_file(): + vr.add_error("structural", "life_path_not_file", str(life_path)) + return None + + try: + zf = zipfile.ZipFile(life_path, "r") + except zipfile.BadZipFile as exc: + vr.add_error("structural", "bad_zip", str(exc)) + return None + except OSError as exc: + vr.add_error("structural", "open_failed", str(exc)) + return None + + # Reject path-traversal / absolute / backslash names. + bad_names = [ + info.filename + for info in zf.infolist() + if not info.is_dir() and not _is_safe_member_name(info.filename) + ] + if bad_names: + vr.add_error( + "structural", + "unsafe_zip_member_name", + ", ".join(sorted(bad_names)[:5]), + ) + zf.close() + return None + + if _DESCRIPTOR_NAME not in zf.namelist(): + vr.add_error("structural", "missing_life_package_json") + zf.close() + return None + + return zf + + +def parse_descriptor(zf: zipfile.ZipFile, vr: VerifyResult) -> dict[str, Any] | None: + """Read + ``json.loads`` ``life-package.json``.""" + + try: + raw = zf.read(_DESCRIPTOR_NAME) + except KeyError: + vr.add_error("structural", "missing_life_package_json") + return None + + try: + descriptor: Any = json.loads(raw.decode("utf-8")) + except UnicodeDecodeError as exc: + vr.add_error("structural", "descriptor_not_utf8", str(exc)) + return None + except json.JSONDecodeError as exc: + vr.add_error("structural", "descriptor_not_json", str(exc)) + return None + + if not isinstance(descriptor, dict): + vr.add_error( + "structural", + "descriptor_not_object", + f"top-level type was {type(descriptor).__name__}", + ) + return None + + vr.descriptor = descriptor + vr.package_id = descriptor.get("package_id") + vr.schema_version = descriptor.get("schema_version") + vr.mode = descriptor.get("mode") + vr.record_id = descriptor.get("record_id") + vr.created_at = descriptor.get("created_at") + vr.expires_at = descriptor.get("expires_at") + rc = descriptor.get("runtime_compatibility") + if isinstance(rc, list): + vr.runtime_compatibility = [str(x) for x in rc] + fu = descriptor.get("forbidden_uses") + if isinstance(fu, list): + vr.forbidden_uses = [str(x) for x in fu] + aer = descriptor.get("audit_event_ref") + if isinstance(aer, str): + vr.audit_event_ref = aer + return descriptor diff --git a/runtime/verify/_time.py b/runtime/verify/_time.py new file mode 100644 index 0000000..cad9514 --- /dev/null +++ b/runtime/verify/_time.py @@ -0,0 +1,84 @@ +"""Stage 1.3 — Time-bound check. + +Spec: ``docs/LIFE_RUNTIME_STANDARD.md`` §2.2 — refuse to mount after +``expires_at``. We additionally reject ``created_at`` in the future +(>30s clock skew tolerance) because that indicates either tampering or +a badly-set issuer clock. +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from runtime.verify.result import VerifyResult + + +_FUTURE_SKEW_TOLERANCE = timedelta(seconds=30) + + +def _parse_iso(s: str) -> datetime | None: + # The schema mandates RFC 3339 UTC. ``fromisoformat`` accepts both + # ``Z`` (Python ≥3.11) and explicit offset; normalise both. Naive + # datetimes (no tzinfo) violate RFC 3339 and are rejected — letting + # them through would crash the offset-aware comparisons below. + try: + if s.endswith("Z"): + s = s[:-1] + "+00:00" + dt = datetime.fromisoformat(s) + except (TypeError, ValueError): + return None + if dt.tzinfo is None: + return None + return dt + + +def check_time_bounds(vr: VerifyResult, now: datetime | None = None) -> bool: + now = now or datetime.now(timezone.utc) + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + + created_raw = vr.created_at + expires_raw = vr.expires_at + if not created_raw or not expires_raw: + # Schema validation should have caught this; defensive only. + vr.add_error( + "time", + "missing_time_fields", + f"created_at={created_raw!r} expires_at={expires_raw!r}", + ) + return False + + created = _parse_iso(created_raw) + expires = _parse_iso(expires_raw) + if created is None: + vr.add_error("time", "created_at_unparseable", created_raw) + return False + if expires is None: + vr.add_error("time", "expires_at_unparseable", expires_raw) + return False + + if created > now + _FUTURE_SKEW_TOLERANCE: + vr.add_error( + "time", + "created_at_in_future", + f"created_at={created.isoformat()} now={now.isoformat()}", + ) + return False + + if expires <= now: + vr.add_error( + "time", + "package_expired", + f"expires_at={expires.isoformat()} now={now.isoformat()}", + ) + return False + + if expires <= created: + vr.add_error( + "time", + "expires_before_created", + f"created_at={created.isoformat()} expires_at={expires.isoformat()}", + ) + return False + + return True diff --git a/runtime/verify/result.py b/runtime/verify/result.py new file mode 100644 index 0000000..5fe001d --- /dev/null +++ b/runtime/verify/result.py @@ -0,0 +1,70 @@ +"""Public Stage 1 Verify result types.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class VerifyError: + """One verification failure. + + ``step`` is one of: + + - ``structural`` (zip open / required entries / descriptor parse) + - ``schema`` (life-package schema or forbidden_uses key namespace) + - ``time`` (created_at in the future / expires_at in the past) + - ``inventory`` (missing entry / hash mismatch / unlisted entry) + - ``audit_chain`` (prev_hash break or audit_event_ref pointer wrong) + - ``consent`` (consent_evidence_ref unreadable) + - ``withdrawal`` (endpoint unreachable, 4xx/5xx, or status=withdrawn) + - ``lifecycle`` (lifecycle_state == withdrawn etc.) + """ + + step: str + reason: str + detail: str | None = None + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = {"step": self.step, "reason": self.reason} + if self.detail is not None: + d["detail"] = self.detail + return d + + +@dataclass +class VerifyResult: + """Full Stage 1 outcome. + + The result is *always* returned (even on failure) so the CLI / Stage + 2 caller can inspect what was verified, what was attempted, and which + step rejected the package. Stage gating is the caller's job — Stage + 2 MUST refuse to proceed when ``ok is False``. + """ + + ok: bool + life_path: Path + package_id: str | None = None + schema_version: str | None = None + mode: str | None = None + record_id: str | None = None + created_at: str | None = None + expires_at: str | None = None + runtime_compatibility: list[str] = field(default_factory=list) + forbidden_uses: list[str] = field(default_factory=list) + lifecycle_state: str | None = None + audit_chain_length: int | None = None + audit_event_ref: str | None = None + inventory_entries_verified: int = 0 + descriptor: dict[str, Any] | None = None + errors: list[VerifyError] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + + def add_error(self, step: str, reason: str, detail: str | None = None) -> None: + self.errors.append(VerifyError(step=step, reason=reason, detail=detail)) + self.ok = False + + def first_error(self) -> VerifyError | None: + return self.errors[0] if self.errors else None diff --git a/tools/test_runtime_scaffold.py b/tools/test_runtime_scaffold.py index 06a0e9f..8e1b7f5 100644 --- a/tools/test_runtime_scaffold.py +++ b/tools/test_runtime_scaffold.py @@ -68,7 +68,9 @@ def test_lifectl_version_via_module() -> None: assert "life-runtime v0.1.1" in out, out -def test_lifectl_info_not_implemented() -> None: +def test_lifectl_info_rejects_missing_path() -> None: + # Post-#121: `lifectl info` is wired to Stage 1 Verify. A missing + # path still exits non-zero (life_path validation runs first). proc = subprocess.run( [_python(), "-m", "runtime.cli.lifectl", "info", "pretend.life"], cwd=REPO_ROOT, @@ -76,11 +78,12 @@ def test_lifectl_info_not_implemented() -> None: text=True, ) assert proc.returncode != 0 - assert "not yet implemented" in proc.stderr - assert "#121" in proc.stderr # points the reader at the right sub-issue + assert "life_path" in proc.stderr or "does not exist" in proc.stderr -def test_lifectl_run_not_implemented() -> None: +def test_lifectl_run_rejects_missing_path() -> None: + # Post-#121: `lifectl run` runs Stage 1; a missing path produces a + # structural failure that exits non-zero. proc = subprocess.run( [_python(), "-m", "runtime.cli.lifectl", "run", "pretend.life"], cwd=REPO_ROOT, @@ -88,8 +91,10 @@ def test_lifectl_run_not_implemented() -> None: text=True, ) assert proc.returncode != 0 - assert "not yet implemented" in proc.stderr - assert "#121-#126" in proc.stderr or "121" in proc.stderr + assert ( + "Stage 1 Verify FAIL" in proc.stderr + or "life_path" in proc.stderr + ) def test_lifectl_help_lists_three_commands() -> None: @@ -151,8 +156,8 @@ def main() -> int: test_runtime_module_importable, test_runtime_class_present, test_lifectl_version_via_module, - test_lifectl_info_not_implemented, - test_lifectl_run_not_implemented, + test_lifectl_info_rejects_missing_path, + test_lifectl_run_rejects_missing_path, test_lifectl_help_lists_three_commands, test_pyproject_parses_and_declares_lifectl_script, test_runtime_subpackages_present, diff --git a/tools/test_runtime_verify.py b/tools/test_runtime_verify.py new file mode 100644 index 0000000..498f8bb --- /dev/null +++ b/tools/test_runtime_verify.py @@ -0,0 +1,644 @@ +#!/usr/bin/env python3 +"""Stage 1 Verify sanity-test driver (v0.9 sub-issue #121). + +Covers all seven §2.1-§2.5 + lifecycle gate sub-steps. + +Each test builds (or mutates) a tiny `.life` archive in a fresh tempdir +and asserts the expected ``VerifyResult`` outcome — both as a Python +import and via the ``lifectl info`` subprocess to keep the CLI surface +under coverage. + +Because the spec mandates a real HTTP poll of ``withdrawal_endpoint`` +in default mode, every test that exercises Stage 1 end-to-end either +spins up a local ``http.server`` HTTP fixture (the ``with_mock_server`` +helper) or passes ``--withdrawal-mock not-revoked`` to short-circuit +the call deterministically. +""" + +from __future__ import annotations + +import contextlib +import io +import json +import os +import shutil +import socket +import subprocess +import sys +import tempfile +import threading +import time +import zipfile +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +BUILDER = REPO_ROOT / "tools" / "build_life_package.py" +SOURCE_RECORD = REPO_ROOT / "examples" / "minimal-life-package" + +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from runtime.audit import AuditRecorder # noqa: E402 +from runtime.verify import ( # noqa: E402 + VerifyResult, + WithdrawalPolicy, + verify, +) + + +# --------------------------------------------------------------------------- +# Fixture helpers + + +class _WithdrawalHandler(BaseHTTPRequestHandler): + """HTTP handler that returns a configurable withdrawal poll body. + + The active body / status code is read from the server's + ``response_status`` / ``response_body`` attributes so individual + tests can rewire them without restarting the server. + """ + + def do_GET(self) -> None: # noqa: N802 + srv: Any = self.server + self.send_response(srv.response_status) + self.send_header("Content-Type", "application/json") + body = srv.response_body if isinstance(srv.response_body, bytes) else srv.response_body.encode("utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 + return # silence default request log + + +@contextlib.contextmanager +def with_mock_server( + *, + status: int = 200, + body: str | bytes = '{"status":"active"}', +): + """Yield ``(server, base_url)`` for a local HTTP fixture.""" + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + + server = HTTPServer(("127.0.0.1", port), _WithdrawalHandler) + server.response_status = status # type: ignore[attr-defined] + server.response_body = body # type: ignore[attr-defined] + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + yield server, f"http://127.0.0.1:{port}/withdraw" + finally: + server.shutdown() + server.server_close() + + +def _stage_record(dst: Path) -> Path: + """Copy the minimal-life-package source record into ``dst`` so the + builder can mutate ``audit/events.jsonl`` without touching git.""" + + record = dst / "record" + shutil.copytree(SOURCE_RECORD, record, ignore=shutil.ignore_patterns("out")) + return record + + +def _build_life( + *, + withdrawal_endpoint: str, + workdir: Path, + extra_builder_args: list[str] | None = None, +) -> Path: + """Run ``tools/build_life_package.py`` against a fresh staging copy.""" + + record = _stage_record(workdir) + out_dir = workdir / "out" + out_dir.mkdir() + args = [ + sys.executable, + str(BUILDER), + "--record", + str(record), + "--output-dir", + str(out_dir), + "--withdrawal-endpoint", + withdrawal_endpoint, + "--deterministic", + ] + if extra_builder_args: + args.extend(extra_builder_args) + proc = subprocess.run(args, capture_output=True, text=True, cwd=REPO_ROOT) + if proc.returncode != 0: + raise RuntimeError( + f"build_life_package failed (rc={proc.returncode}): {proc.stderr}" + ) + out_files = list(out_dir.glob("*.life")) + assert len(out_files) == 1, out_files + return out_files[0] + + +def _rebuild_zip_with( + src: Path, + dst: Path, + mutate: Any, +) -> None: + """Copy ``src`` into ``dst`` while letting ``mutate(name, data)`` rewrite + or drop entries (return ``None`` to drop, ``(new_name, new_data)`` to + rewrite, or ``True`` to keep verbatim). + + Used to corrupt fixtures for negative tests. + """ + + with zipfile.ZipFile(src, "r") as zin, zipfile.ZipFile( + dst, "w", compression=zipfile.ZIP_DEFLATED + ) as zout: + for info in zin.infolist(): + data = zin.read(info.filename) + outcome = mutate(info.filename, data) + if outcome is None: + continue + if outcome is True: + zout.writestr(info, data) + continue + new_name, new_data = outcome + zout.writestr(new_name, new_data) + + +# --------------------------------------------------------------------------- +# Tests + + +def test_happy_path_with_real_http() -> None: + """End-to-end mount succeeds when the withdrawal endpoint replies 200 + active.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + with with_mock_server(status=200, body='{"status":"active"}') as (_srv, url): + life = _build_life(withdrawal_endpoint=url, workdir=tmp_path) + recorder = AuditRecorder() + result = verify(life, audit=recorder) + assert result.ok, result.errors + assert result.package_id is not None + assert result.lifecycle_state == "active" + assert result.inventory_entries_verified >= 5 + assert "mount_attempted" in recorder.types() + assert "withdrawal_poll" in recorder.types() + poll = recorder.latest("withdrawal_poll") + assert poll is not None and poll.fields["result"] == "not_revoked" + + +def test_bad_zip_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + bad = Path(tmp) / "bad.life" + bad.write_bytes(b"not a zip file") + result = verify(bad, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + assert result.first_error().step == "structural" + assert result.first_error().reason == "bad_zip" + + +def test_missing_descriptor_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + bad = Path(tmp) / "no-descriptor.life" + with zipfile.ZipFile(bad, "w") as zf: + zf.writestr("audit/events.jsonl", "{}\n") + result = verify(bad) + assert not result.ok + assert result.first_error().reason == "missing_life_package_json" + + +def test_inventory_hash_mismatch_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + corrupted = tmp_path / "corrupt.life" + + def mutate(name: str, data: bytes) -> Any: + if name == "manifest.json": + return (name, data + b"\n# tampered") + return True + + _rebuild_zip_with(good, corrupted, mutate) + result = verify(corrupted, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + first = result.first_error() + assert first.step == "inventory" + assert first.reason in {"hash_mismatch", "size_mismatch"} + + +def test_unlisted_extra_entry_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + tampered = tmp_path / "tampered.life" + + def mutate(name: str, data: bytes) -> Any: + return True + + _rebuild_zip_with(good, tampered, mutate) + with zipfile.ZipFile(tampered, "a") as zf: + zf.writestr("rogue.txt", "hello") + + result = verify(tampered, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + first = result.first_error() + assert first.step == "inventory" + assert first.reason == "unlisted_zip_entry" + + +def test_audit_chain_break_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + broken = tmp_path / "broken.life" + + def mutate(name: str, data: bytes) -> Any: + if name == "audit/events.jsonl": + lines = data.decode("utf-8").splitlines() + if len(lines) >= 2: + second = json.loads(lines[1]) + second["prev_hash"] = ( + "sha256:" + "0" * 64 + ) # break the chain + lines[1] = json.dumps(second, sort_keys=True, separators=(",", ":")) + return (name, ("\n".join(lines) + "\n").encode("utf-8")) + return True + + _rebuild_zip_with(good, broken, mutate) + # NB: mutating the audit file invalidates its sha256 in + # life-package.json::contents[] — Stage 1.4 catches that BEFORE + # Stage 1.5 sees the chain. Both reasons are valid spec + # rejections; we accept either. + result = verify(broken, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + first = result.first_error() + assert first.step in {"audit_chain", "inventory"} + + +def test_expired_package_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + extra_builder_args=["--lifetime-days", "1"], + ) + expired = tmp_path / "expired.life" + + def mutate(name: str, data: bytes) -> Any: + if name == "life-package.json": + pkg = json.loads(data) + pkg["expires_at"] = "2000-01-01T00:00:00.000000Z" + return (name, json.dumps(pkg).encode("utf-8")) + return True + + _rebuild_zip_with(good, expired, mutate) + # Structural / inventory hash will fail first (descriptor was + # rewritten without re-walking inventory). Mounting expired + # is exercised more directly via the synthetic VerifyResult + # below, but ensure the CLI also rejects this case. + result = verify(expired, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + + +def test_withdrawn_response_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + with with_mock_server(status=200, body='{"status":"withdrawn"}') as (_srv, url): + life = _build_life(withdrawal_endpoint=url, workdir=tmp_path) + result = verify(life) + assert not result.ok + steps = [e.step for e in result.errors] + assert "withdrawal" in steps + reasons = [e.reason for e in result.errors] + assert "package_withdrawn" in reasons + + +def test_unreachable_endpoint_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + # Bind a socket to claim a port, then immediately release it so + # the actual HTTP call fails with "connection refused". + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + url = f"http://127.0.0.1:{port}/withdraw" + life = _build_life(withdrawal_endpoint=url, workdir=tmp_path) + result = verify( + life, + withdrawal_policy=WithdrawalPolicy(mode="online", timeout_seconds=2.0), + ) + assert not result.ok + first = result.first_error() + assert first.step == "withdrawal" + assert first.reason in {"endpoint_unreachable", "endpoint_http_error"} + + +def test_withdrawal_4xx_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + with with_mock_server(status=403, body='{"detail":"forbidden"}') as (_srv, url): + life = _build_life(withdrawal_endpoint=url, workdir=tmp_path) + result = verify(life) + assert not result.ok + first = result.first_error() + assert first.step == "withdrawal" + assert first.reason in {"endpoint_http_error", "endpoint_unreachable"} + + +def test_lifecycle_withdrawn_rejected() -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + # Inject a lifecycle/lifecycle.json with state=withdrawn AND + # patch the descriptor's contents[] to keep the inventory + # check happy. + with_lc = tmp_path / "with-lifecycle.life" + + lifecycle_doc = { + "schema_version": "0.1.0", + "doc_kind": "package_lifecycle", + "package_id": "PLACEHOLDER", + "record_id": "PLACEHOLDER", + "lifecycle_state": "withdrawn", + "frozen": True, + "withdrawn_at": "2026-04-26T00:00:00.000000Z", + } + + def patch_descriptor(pkg: dict, lc_bytes: bytes) -> dict: + import hashlib + + sha = "sha256:" + hashlib.sha256(lc_bytes).hexdigest() + pkg = dict(pkg) + pkg["contents"] = list(pkg.get("contents", [])) + [ + { + "path": "lifecycle/lifecycle.json", + "sha256": sha, + "size": len(lc_bytes), + } + ] + return pkg + + # Read the original descriptor + lifecycle bytes first so we can + # inject consistently. + with zipfile.ZipFile(good, "r") as zin: + descriptor = json.loads(zin.read("life-package.json")) + lifecycle_doc["package_id"] = descriptor["package_id"] + lifecycle_doc["record_id"] = descriptor["record_id"] + lc_bytes = json.dumps( + lifecycle_doc, sort_keys=True, separators=(",", ":") + ).encode("utf-8") + + new_descriptor = patch_descriptor(descriptor, lc_bytes) + + def mutate(name: str, data: bytes) -> Any: + if name == "life-package.json": + return (name, json.dumps(new_descriptor).encode("utf-8")) + return True + + _rebuild_zip_with(good, with_lc, mutate) + with zipfile.ZipFile(with_lc, "a") as zout: + zout.writestr("lifecycle/lifecycle.json", lc_bytes) + + result = verify(with_lc, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + steps = [e.step for e in result.errors] + assert "lifecycle" in steps + + +def test_audit_log_non_utf8_returns_structured_error() -> None: + """Regression: crafted audit/events.jsonl with non-UTF-8 bytes must not crash.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + broken = tmp_path / "non-utf8-audit.life" + + def mutate(name: str, data: bytes) -> Any: + if name == "audit/events.jsonl": + return (name, b"\xff\xfe\xfd not utf-8\n") + return True + + _rebuild_zip_with(good, broken, mutate) + result = verify(broken, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + first = result.first_error() + # Inventory may catch it first (mutated bytes hash differently) — that's + # also a valid spec rejection. The point is we got a structured + # rejection, not a crash. + assert first.step in {"audit_chain", "inventory"} + + +def test_naive_datetime_in_descriptor_returns_structured_error() -> None: + """Regression: naive (no-tz) timestamps must return parse_failure, not crash.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=tmp_path / "build", + ) + bad = tmp_path / "naive-time.life" + + def mutate(name: str, data: bytes) -> Any: + if name == "life-package.json": + pkg = json.loads(data) + pkg["created_at"] = "2026-04-26T00:00:00" # no Z, no offset + return (name, json.dumps(pkg).encode("utf-8")) + return True + + _rebuild_zip_with(good, bad, mutate) + result = verify(bad, withdrawal_policy=WithdrawalPolicy(mode="mock-not-revoked")) + assert not result.ok + # Inventory hash of life-package.json now differs, so inventory may + # fail first; either step counts as fail-close. + steps = {e.step for e in result.errors} + assert steps & {"time", "inventory", "schema"} + + +def test_schemeless_withdrawal_endpoint_returns_structured_error() -> None: + """Regression: schemeless withdrawal_endpoint must not crash urllib.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + good = _build_life( + withdrawal_endpoint="example.invalid/withdraw", + workdir=tmp_path / "build", + ) + # Default policy = online -> hits the urllib.Request constructor + # path that previously crashed with ValueError("unknown url type"). + result = verify(good) + assert not result.ok + reasons = [e.reason for e in result.errors] + assert "endpoint_malformed_url" in reasons + + +def test_assembly_aborted_audit_event_emitted() -> None: + """Stage gating: any fail emits assembly_aborted{stage="verify"}.""" + with tempfile.TemporaryDirectory() as tmp: + bad = Path(tmp) / "bad.life" + bad.write_bytes(b"not a zip file") + recorder = AuditRecorder() + verify(bad, audit=recorder) + types = recorder.types() + assert types[0] == "mount_attempted", types + assert "assembly_aborted" in types, types + last = recorder.latest("assembly_aborted") + assert last is not None and last.fields["stage"] == "verify" + + +def test_lifectl_info_passes_for_good_package() -> None: + """CLI path: `lifectl info` exits 0 + prints PASS for a freshly-built package. + + Builds the fixture on the fly because the bundled + ``examples/minimal-life-package/out/*.life`` is gitignored — see + ``examples/minimal-life-package/.gitignore``. + """ + with tempfile.TemporaryDirectory() as tmp: + fixture = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=Path(tmp), + ) + proc = subprocess.run( + [ + sys.executable, + "-m", + "runtime.cli.lifectl", + "info", + str(fixture), + "--withdrawal-mock", + "not-revoked", + ], + cwd=REPO_ROOT, + capture_output=True, + text=True, + ) + assert proc.returncode == 0, proc.stderr + assert "verification: PASS" in proc.stdout + assert "package_id: " in proc.stdout + + +def test_lifectl_info_json_contains_structured_errors() -> None: + """CLI path: `--json` returns parsable structured output.""" + with tempfile.TemporaryDirectory() as tmp: + fixture = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=Path(tmp), + ) + proc = subprocess.run( + [ + sys.executable, + "-m", + "runtime.cli.lifectl", + "info", + str(fixture), + "--withdrawal-mock", + "revoked", + "--json", + ], + cwd=REPO_ROOT, + capture_output=True, + text=True, + ) + assert proc.returncode == 1, proc.stderr + payload = json.loads(proc.stdout) + assert payload["ok"] is False + assert isinstance(payload["package_id"], str) and payload["package_id"] + reasons = [e["reason"] for e in payload["errors"]] + assert "package_withdrawn" in reasons + + +def test_lifectl_run_stage1_pass_then_pending() -> None: + """CLI path: `lifectl run` exits 2 once Stage 1 passes (pending Stage 2).""" + with tempfile.TemporaryDirectory() as tmp: + fixture = _build_life( + withdrawal_endpoint="http://127.0.0.1:1/withdraw", + workdir=Path(tmp), + ) + proc = subprocess.run( + [ + sys.executable, + "-m", + "runtime.cli.lifectl", + "run", + str(fixture), + "--withdrawal-mock", + "not-revoked", + "--once", + ], + cwd=REPO_ROOT, + capture_output=True, + text=True, + ) + assert proc.returncode == 2, (proc.stdout, proc.stderr) + assert "Stage 1 Verify ✓" in proc.stdout + assert "Stage 2+ pending" in proc.stderr + + +# --------------------------------------------------------------------------- +# Driver + + +def main() -> int: + tests = [ + test_happy_path_with_real_http, + test_bad_zip_rejected, + test_missing_descriptor_rejected, + test_inventory_hash_mismatch_rejected, + test_unlisted_extra_entry_rejected, + test_audit_chain_break_rejected, + test_expired_package_rejected, + test_withdrawn_response_rejected, + test_unreachable_endpoint_rejected, + test_withdrawal_4xx_rejected, + test_lifecycle_withdrawn_rejected, + test_audit_log_non_utf8_returns_structured_error, + test_naive_datetime_in_descriptor_returns_structured_error, + test_schemeless_withdrawal_endpoint_returns_structured_error, + test_assembly_aborted_audit_event_emitted, + test_lifectl_info_passes_for_good_package, + test_lifectl_info_json_contains_structured_errors, + test_lifectl_run_stage1_pass_then_pending, + ] + failures: list[str] = [] + for test in tests: + name = test.__name__ + try: + test() + except AssertionError as exc: + failures.append(f"{name}: {exc}") + print(f"FAIL {name}:") + for line in str(exc).splitlines(): + print(f" {line}") + except Exception as exc: # noqa: BLE001 - sanity test driver + failures.append(f"{name}: {type(exc).__name__}: {exc}") + print(f"FAIL {name}: {type(exc).__name__}: {exc}") + else: + print(f"ok {name}") + + print() + if failures: + print(f"{len(failures)} of {len(tests)} runtime-verify tests failed.") + return 1 + print(f"all {len(tests)} runtime-verify tests passed.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())