Skip to content

Commit e6649e9

Browse files
committed
feat: add contract validation and repo scanning commands
1 parent 3ba4d90 commit e6649e9

1 file changed

Lines changed: 293 additions & 0 deletions

File tree

sourceosctl/commands/contracts.py

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
"""SourceOS contract validation and estate scanning helpers.
2+
3+
The M1 implementation is intentionally local-only and dependency-light. It
4+
validates JSON shape and the minimum SourceOS repo manifest contract until the
5+
full schema mirror from SourceOS-Linux/sourceos-spec is vendored or fetched by a
6+
future hardened validator.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import argparse
12+
import json
13+
from pathlib import Path
14+
from typing import Any, Dict, Iterable, List, Tuple
15+
16+
17+
REQUIRED_REPO_MANIFEST_FIELDS = [
18+
"repo",
19+
"domain",
20+
"specVersion",
21+
"ownedSchemas",
22+
"syncEngines",
23+
"sourceChannels",
24+
"policyClasses",
25+
"auditEvents",
26+
"dangerousSurfaces",
27+
]
28+
29+
VALID_DOMAINS = {
30+
"spec",
31+
"tooling",
32+
"workspace",
33+
"agent",
34+
"policy",
35+
"memory",
36+
"shell",
37+
"browser",
38+
"os",
39+
"transport",
40+
"observability",
41+
"model",
42+
"security",
43+
"integration",
44+
}
45+
46+
VALID_POLICY_CLASSES = {"low", "medium", "high", "critical"}
47+
48+
49+
def _load_json(path: Path) -> Tuple[Dict[str, Any] | None, List[str]]:
50+
if not path.exists():
51+
return None, [f"missing file: {path}"]
52+
if not path.is_file():
53+
return None, [f"not a file: {path}"]
54+
try:
55+
payload = json.loads(path.read_text(encoding="utf-8"))
56+
except json.JSONDecodeError as exc:
57+
return None, [f"invalid JSON: {exc}"]
58+
if not isinstance(payload, dict):
59+
return None, ["top-level JSON value must be an object"]
60+
return payload, []
61+
62+
63+
def validate_repo_manifest(payload: Dict[str, Any]) -> List[str]:
64+
"""Return validation errors for a SourceOSRepoManifest-like payload."""
65+
errors: List[str] = []
66+
for field in REQUIRED_REPO_MANIFEST_FIELDS:
67+
if field not in payload:
68+
errors.append(f"missing required field: {field}")
69+
70+
repo = payload.get("repo")
71+
if repo is not None and (not isinstance(repo, str) or "/" not in repo):
72+
errors.append("repo must be a GitHub owner/name string")
73+
74+
domain = payload.get("domain")
75+
if domain is not None and domain not in VALID_DOMAINS:
76+
errors.append(f"domain must be one of: {', '.join(sorted(VALID_DOMAINS))}")
77+
78+
for list_field in [
79+
"ownedSchemas",
80+
"syncEngines",
81+
"sourceChannels",
82+
"policyClasses",
83+
"auditEvents",
84+
"dangerousSurfaces",
85+
]:
86+
value = payload.get(list_field)
87+
if value is not None and not isinstance(value, list):
88+
errors.append(f"{list_field} must be an array")
89+
90+
policy_classes = payload.get("policyClasses")
91+
if isinstance(policy_classes, list):
92+
for policy_class in policy_classes:
93+
if policy_class not in VALID_POLICY_CLASSES:
94+
errors.append(f"invalid policy class: {policy_class}")
95+
96+
sync_engines = payload.get("syncEngines")
97+
if isinstance(sync_engines, list):
98+
for index, engine in enumerate(sync_engines):
99+
if not isinstance(engine, dict):
100+
errors.append(f"syncEngines[{index}] must be an object")
101+
continue
102+
for field in ["engineId", "collection", "ownerRepo", "policyClass", "mergeStrategy"]:
103+
if field not in engine:
104+
errors.append(f"syncEngines[{index}] missing {field}")
105+
106+
return errors
107+
108+
109+
def _classify_manifest(path: Path) -> Dict[str, Any]:
110+
payload, errors = _load_json(path)
111+
if payload is None:
112+
return {"path": str(path), "status": "missing-manifest", "errors": errors}
113+
errors.extend(validate_repo_manifest(payload))
114+
status = "compliant" if not errors else "invalid-manifest"
115+
return {
116+
"path": str(path),
117+
"repo": payload.get("repo"),
118+
"domain": payload.get("domain"),
119+
"status": status,
120+
"errors": errors,
121+
}
122+
123+
124+
def contract_validate(args: argparse.Namespace) -> int:
125+
path = Path(args.path)
126+
payload, errors = _load_json(path)
127+
if payload is not None and (path.name == "manifest.json" or "repo" in payload):
128+
errors.extend(validate_repo_manifest(payload))
129+
130+
result = {
131+
"path": str(path),
132+
"status": "valid" if not errors else "invalid",
133+
"errors": errors,
134+
}
135+
if args.json:
136+
print(json.dumps(result, indent=2, sort_keys=True))
137+
else:
138+
print(f"{result['status'].upper()}: {path}")
139+
for error in errors:
140+
print(f"- {error}")
141+
return 0 if not errors else 1
142+
143+
144+
def repo_scan(args: argparse.Namespace) -> int:
145+
root = Path(args.path)
146+
manifest = root / ".sourceos" / "manifest.json"
147+
result = _classify_manifest(manifest)
148+
if args.json:
149+
print(json.dumps(result, indent=2, sort_keys=True))
150+
else:
151+
print(f"{result['status']}: {root}")
152+
if result.get("repo"):
153+
print(f"repo: {result['repo']}")
154+
if result.get("domain"):
155+
print(f"domain: {result['domain']}")
156+
for error in result.get("errors", []):
157+
print(f"- {error}")
158+
return 0 if result["status"] == "compliant" else 1
159+
160+
161+
def _candidate_repos(root: Path) -> Iterable[Path]:
162+
if (root / ".sourceos" / "manifest.json").exists():
163+
yield root
164+
for child in sorted(root.iterdir()) if root.exists() and root.is_dir() else []:
165+
if child.is_dir() and (child / ".sourceos" / "manifest.json").exists():
166+
yield child
167+
168+
169+
def estate_scan(args: argparse.Namespace) -> int:
170+
root = Path(args.path)
171+
results = [_classify_manifest(repo / ".sourceos" / "manifest.json") for repo in _candidate_repos(root)]
172+
missing = not results
173+
if args.json:
174+
print(json.dumps({"root": str(root), "results": results}, indent=2, sort_keys=True))
175+
else:
176+
if missing:
177+
print(f"missing-manifest: no .sourceos/manifest.json files found under {root}")
178+
for result in results:
179+
print(f"{result['status']}: {result.get('repo') or result['path']}")
180+
for error in result.get("errors", []):
181+
print(f" - {error}")
182+
return 1 if missing or any(r["status"] != "compliant" for r in results) else 0
183+
184+
185+
def graph_doctor(args: argparse.Namespace) -> int:
186+
print("SourceGraph doctor: contract surface present; runtime graph backend not configured in sourceos-devtools.")
187+
print("Expected contracts: SourceGraphWrite, AuditEvent, PolicyDecision, AgentCapabilityLease.")
188+
return 0
189+
190+
191+
def sync_doctor(args: argparse.Namespace) -> int:
192+
print("SourceSync doctor: local manifest validation available; relay/sync runtime checks are not configured here.")
193+
print("Expected contracts: SourceOSRepoManifest and SyncEngineManifest.")
194+
return 0
195+
196+
197+
def policy_explain(args: argparse.Namespace) -> int:
198+
payload, errors = _load_json(Path(args.path))
199+
if errors:
200+
for error in errors:
201+
print(f"- {error}")
202+
return 1
203+
decision = payload.get("decision") or payload.get("outcome") or "unknown"
204+
reason = payload.get("reasonCode") or payload.get("decisionHash") or "no reasonCode/decisionHash present"
205+
print(f"decision: {decision}")
206+
print(f"reason: {reason}")
207+
if payload.get("policyId"):
208+
print(f"policy: {payload['policyId']}")
209+
if payload.get("policyDomain"):
210+
print(f"policyDomain: {payload['policyDomain']}")
211+
return 0
212+
213+
214+
def build_contract_parser() -> argparse.ArgumentParser:
215+
parser = argparse.ArgumentParser(prog="sourceosctl contract", description="SourceOS contract helpers")
216+
sub = parser.add_subparsers(dest="contract_command", metavar="<subcommand>")
217+
sub.required = True
218+
validate_p = sub.add_parser("validate", help="Validate a JSON contract file")
219+
validate_p.add_argument("path")
220+
validate_p.add_argument("--json", action="store_true", default=False)
221+
validate_p.set_defaults(func=contract_validate)
222+
return parser
223+
224+
225+
def contract_main(argv: List[str] | None = None) -> int:
226+
parser = build_contract_parser()
227+
args = parser.parse_args(argv)
228+
return args.func(args) or 0
229+
230+
231+
def build_repo_parser() -> argparse.ArgumentParser:
232+
parser = argparse.ArgumentParser(prog="sourceosctl repo", description="SourceOS repo helpers")
233+
sub = parser.add_subparsers(dest="repo_command", metavar="<subcommand>")
234+
sub.required = True
235+
scan_p = sub.add_parser("scan", help="Scan one repo for .sourceos/manifest.json")
236+
scan_p.add_argument("path")
237+
scan_p.add_argument("--json", action="store_true", default=False)
238+
scan_p.set_defaults(func=repo_scan)
239+
return parser
240+
241+
242+
def repo_main(argv: List[str] | None = None) -> int:
243+
parser = build_repo_parser()
244+
args = parser.parse_args(argv)
245+
return args.func(args) or 0
246+
247+
248+
def build_estate_parser() -> argparse.ArgumentParser:
249+
parser = argparse.ArgumentParser(prog="sourceosctl estate", description="SourceOS estate helpers")
250+
sub = parser.add_subparsers(dest="estate_command", metavar="<subcommand>")
251+
sub.required = True
252+
scan_p = sub.add_parser("scan", help="Scan child repos for .sourceos/manifest.json")
253+
scan_p.add_argument("path", nargs="?", default=".")
254+
scan_p.add_argument("--json", action="store_true", default=False)
255+
scan_p.set_defaults(func=estate_scan)
256+
return parser
257+
258+
259+
def estate_main(argv: List[str] | None = None) -> int:
260+
parser = build_estate_parser()
261+
args = parser.parse_args(argv)
262+
return args.func(args) or 0
263+
264+
265+
def graph_main(argv: List[str] | None = None) -> int:
266+
parser = argparse.ArgumentParser(prog="sourceosctl graph", description="SourceGraph helpers")
267+
sub = parser.add_subparsers(dest="graph_command", metavar="<subcommand>")
268+
sub.required = True
269+
doctor_p = sub.add_parser("doctor", help="Inspect SourceGraph contract posture")
270+
doctor_p.set_defaults(func=graph_doctor)
271+
args = parser.parse_args(argv)
272+
return args.func(args) or 0
273+
274+
275+
def sync_main(argv: List[str] | None = None) -> int:
276+
parser = argparse.ArgumentParser(prog="sourceosctl sync", description="SourceSync helpers")
277+
sub = parser.add_subparsers(dest="sync_command", metavar="<subcommand>")
278+
sub.required = True
279+
doctor_p = sub.add_parser("doctor", help="Inspect SourceSync contract posture")
280+
doctor_p.set_defaults(func=sync_doctor)
281+
args = parser.parse_args(argv)
282+
return args.func(args) or 0
283+
284+
285+
def policy_main(argv: List[str] | None = None) -> int:
286+
parser = argparse.ArgumentParser(prog="sourceosctl policy", description="SourcePolicy helpers")
287+
sub = parser.add_subparsers(dest="policy_command", metavar="<subcommand>")
288+
sub.required = True
289+
explain_p = sub.add_parser("explain", help="Explain a PolicyDecision/AuditEvent JSON file")
290+
explain_p.add_argument("path")
291+
explain_p.set_defaults(func=policy_explain)
292+
args = parser.parse_args(argv)
293+
return args.func(args) or 0

0 commit comments

Comments
 (0)