|
| 1 | +"""Diagnostic redaction CLI helpers.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import argparse |
| 6 | +import json |
| 7 | +import re |
| 8 | +from pathlib import Path |
| 9 | + |
| 10 | + |
| 11 | +REDACTION_PATTERNS: list[tuple[str, re.Pattern[str], str]] = [ |
| 12 | + ("cookies", re.compile(r"(?i)(\bcookie\s*[:=]\s*)([^\n]+)"), r"\1<redacted-cookie>"), |
| 13 | + ("bearer", re.compile(r"(?i)\bbearer\s+[a-zA-Z0-9._~+/=-]+"), "Bearer <redacted-token>"), |
| 14 | + # Token-bearing identity fields (quoted JSON value or plain token blob). |
| 15 | + ("oauth", re.compile(r'(?is)("?(?:access_token|refresh_token|oauth_token|id_token)"?\s*[:=]\s*)("(?:[^"\\]|\\.)*"|[^,\s]+)'), r'\1"<redacted-token>"'), |
| 16 | + ("api_keys", re.compile(r'(?is)("?(?:api[_-]?key|x-api-key|apikey|client_secret|authorization)"?\s*[:=]\s*)("(?:[^"\\]|\\.)*"|[^,\s]+)'), r'\1"<redacted-secret>"'), |
| 17 | + ("secrets", re.compile(r'(?is)("?(?:secret|password|token)"?\s*[:=]\s*)("(?:[^"\\]|\\.)*"|[^,\s]+)'), r'\1"<redacted-secret>"'), |
| 18 | + ("sensitive_ids", re.compile(r'(?is)("?(?:user|account|session|device|customer|tenant|workspace|organization|org|principal|subject)_id"?\s*[:=]\s*)("(?:[^"\\]|\\.)*"|[^,\s]+)'), r'\1"<redacted-id>"'), |
| 19 | + # Prompt-bearing fields, including escaped multi-line quoted strings. |
| 20 | + ( |
| 21 | + "model_prompts", |
| 22 | + re.compile(r'(?is)("?(?:prompt|model_prompt|system_prompt|user_prompt)"?\s*[:=]\s*)("(?:[^"\\]|\\.)*"|[^,\n]+)'), |
| 23 | + r'\1"<redacted-prompt>"', |
| 24 | + ), |
| 25 | + ("policy_marked", re.compile(r"(?is)<policy-marked>.*?</policy-marked>"), "<policy-marked><redacted-policy-snippet></policy-marked>"), |
| 26 | +] |
| 27 | + |
| 28 | + |
| 29 | +def _redact(raw: str) -> tuple[str, dict[str, int]]: |
| 30 | + counts: dict[str, int] = {} |
| 31 | + redacted = raw |
| 32 | + for name, pattern, replacement in REDACTION_PATTERNS: |
| 33 | + redacted, count = pattern.subn(replacement, redacted) |
| 34 | + if count: |
| 35 | + counts[name] = count |
| 36 | + return redacted, counts |
| 37 | + |
| 38 | + |
| 39 | +def redact_cmd(args: argparse.Namespace) -> int: |
| 40 | + input_path = Path(args.input).expanduser().resolve() |
| 41 | + if not input_path.exists(): |
| 42 | + print(json.dumps({"type": "DiagnosticRedaction", "result": "fail", "errors": [f"missing input file: {input_path}"]}, indent=2, sort_keys=True)) |
| 43 | + return 1 |
| 44 | + raw = input_path.read_text(encoding="utf-8") |
| 45 | + redacted, counts = _redact(raw) |
| 46 | + |
| 47 | + if args.output: |
| 48 | + output_path = Path(args.output).expanduser().resolve() |
| 49 | + output_path.parent.mkdir(parents=True, exist_ok=True) |
| 50 | + output_path.write_text(redacted, encoding="utf-8") |
| 51 | + print(json.dumps({"type": "DiagnosticRedaction", "result": "pass", "input": str(input_path), "output": str(output_path), "redactionCounts": counts}, indent=2, sort_keys=True)) |
| 52 | + else: |
| 53 | + print(redacted) |
| 54 | + return 0 |
| 55 | + |
| 56 | + |
| 57 | +def build_parser() -> argparse.ArgumentParser: |
| 58 | + parser = argparse.ArgumentParser(prog="sourceosctl diagnostics", description="Diagnostic helpers") |
| 59 | + sub = parser.add_subparsers(dest="diagnostics_command", required=True) |
| 60 | + redact_p = sub.add_parser("redact", help="Redact sensitive tokens and snippets from diagnostic exports") |
| 61 | + redact_p.add_argument("input") |
| 62 | + redact_p.add_argument("--output", default=None) |
| 63 | + redact_p.set_defaults(func=redact_cmd) |
| 64 | + return parser |
| 65 | + |
| 66 | + |
| 67 | +def diagnostics_main(argv: list[str] | None = None) -> int: |
| 68 | + parser = build_parser() |
| 69 | + args = parser.parse_args(argv) |
| 70 | + return args.func(args) or 0 |
0 commit comments