diff --git a/README.ko.md b/README.ko.md index 49ac659..25cd8fd 100644 --- a/README.ko.md +++ b/README.ko.md @@ -131,7 +131,7 @@ brief 모드는 코딩 에이전트가 군더더기를 줄이도록 요청하되 | 출력 축약과 민감정보 가림 | 테스트·빌드·검색·diff 출력을 작게 만들고, 에이전트 컨텍스트에 들어가기 전에 민감해 보이는 값을 가립니다. | | 선언형 출력 필터 | 사용자 정의 JSON DSL로 성공 출력만 명시적으로 줄이고, 보호해야 하는 실패 출력은 원문 stdout/stderr와 종료 코드를 보존합니다. | | 로컬 로그 보관소 | 큰 로그를 대화 밖 로컬 저장소에 보관하고, 요약 정보나 요청한 줄 범위만 다시 가져옵니다. | -| Anthropic 비용 가드 | `context-guard cost preflight/observe/ledger/compile`이 cache 위험과 비용 범위를 추정하고, 원문 대신 keyed HMAC fingerprint만 저장하며, `--enforce`를 명시하지 않으면 경고만 합니다. | +| Anthropic 비용 가드 | `context-guard cost preflight/observe/ledger/compile`이 cache 위험과 비용 범위를 추정하고, `context-guard route-advisor`가 로컬 총비용·batchability route 후보를 요약하며, ledger를 쓸 때도 원문 대신 keyed HMAC fingerprint만 저장합니다. `--enforce`를 명시하지 않으면 경고만 합니다. | | 예산 기반 컨텍스트 패커 | 우선순위가 있는 로컬 파일 근거를 바이트 예산 안의 Markdown 팩으로 조립하고, 로컬 신호에서 `build`용 manifest를 추천하며, `--explain`, `--adaptive-k`, `--symbol-memory`로 로컬 자문 메타데이터를 덧붙일 수 있습니다. | | Tool/MCP schema pruner | 로컬 catalog에서 bounded top-k tool/schema 자문 리포트를 만들고, compact 요약 기록과 전체 가림 처리된 payload 재조회 경로를 남깁니다. | | 보수적 stdin 압축기 | 선택한 JSON, diff, 로그, 검색 출력, 코드, 산문을 줄이고, 관측 바이트 근거와 추정 토큰 proxy를 함께 표시합니다. `--mode readable`은 exact fallback 안내가 있는 opt-in 산문 preview를 추가합니다. | @@ -287,6 +287,15 @@ long-command 2>&1 | ./plugins/context-guard/bin/context-guard-artifact store --c `context-guard-tool-prune`은 로컬 tool 또는 MCP catalog를 결정적 lexical heuristic(어휘 기반 휴리스틱)으로 순위화해 제한된 top-k 자문 리포트를 만듭니다. inline schema는 관측된 UTF-8 바이트 예산을 지키고, 누락되거나 예산 때문에 생략된 schema는 `.context-guard/tool-prune`의 compact 요약 기록과 별도 가림 처리 payload로 다시 조회할 수 있습니다. 이 기능은 안내용이며 MCP 설정을 변경하지 않습니다. 토큰 값은 provider가 측정한 절감 수치가 아니라 추정 proxy입니다. +### 총비용, batchability, routing 후보 자문 + +```bash +./plugins/context-guard/bin/context-guard route-advisor --workload workload.json --json +./plugins/context-guard/bin/context-guard-cost route-advisor --feature batch_api=true --feature structured_outputs=true --json < workload.json +``` + +`context-guard route-advisor`는 로컬 passive advisor입니다. caller가 제공한 workload JSON, provider feature 선언, usage telemetry, 외부·로컬 shifted cost를 읽고 total-cost accounting, batchability blocker, batch API·prompt-cache prefix 보존·structured outputs·저비용 모델 평가 같은 route 후보를 출력합니다. queue를 시작하거나 provider를 호출하거나 pricing 문서를 새로 가져오지 않으며, provider feature는 caller-supplied 또는 unknown/recheck-required로 표시합니다. 추천은 후보일 뿐이고, hosted token/cost 절감 주장은 여전히 matched successful task, 비열등 quality gate, shifted-cost evidence가 있어야 합니다. + ### 선택한 로컬 텍스트를 보수적으로 압축하기 ```bash diff --git a/README.md b/README.md index 461fbc7..6dba10d 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ Legacy local CLI wrappers (`claude-token-*`, `claude-read-symbol`, `claude-trim- | Output trimming and sanitizing | Keeps test, build, search, and diff output compact while redacting likely secrets before they enter agent context. | | Declarative output filter | Opt-in JSON DSL for user-owned command filters with protected failure passthrough and validation before use. | | Local artifact store | Saves large sanitized logs outside the conversation and returns compact receipts or exact requested slices. | -| Anthropic cost guard | `context-guard cost preflight/observe/ledger/compile` estimates cache-risk and cost ranges, stores only keyed HMAC fingerprints, and stays passive unless `--enforce` is explicit. | +| Anthropic cost guard | `context-guard cost preflight/observe/ledger/compile` estimates cache-risk and cost ranges, `context-guard route-advisor` summarizes local total-cost/batchability route candidates, stores only keyed HMAC fingerprints where a ledger is used, and stays passive unless `--enforce` is explicit. | | Budgeted context packer | Assembles prioritized local file evidence into a byte-budgeted Markdown pack, can suggest a build-compatible manifest from local signals, adds `--explain` for compact local selection reasons plus bounded repo-map metadata, and adds opt-in `--adaptive-k` / `--symbol-memory` advisory metadata. | | Tool/MCP schema pruner | Emits bounded top-k tool/schema advisory reports from local catalogs with compact receipts and full sanitized payload retrieval. | | Conservative stdin compressor | Shrinks selected JSON, diffs, logs, search output, code, and prose with observed byte evidence and estimated token proxies; `--mode readable` adds an opt-in readable prose preview with exact fallback guidance. | @@ -314,6 +314,15 @@ The packer uses deterministic standard-library heuristics only: no network, mode `context-guard-cache-score` is a local static lint for prompt/request layout. It estimates total and cacheable-prefix size with a tokenizer-free char/4 proxy, warns about dynamic-looking values near the prefix, and records provider caveats for OpenAI, Anthropic, Gemini, or a generic threshold. It does not call providers, store raw prompts, estimate prices, observe cache hits, or prove token/cost savings; verify real cache behavior with provider usage telemetry. +### Advise on total cost, batchability, and routing + +```bash +./plugins/context-guard/bin/context-guard route-advisor --workload workload.json --json +./plugins/context-guard/bin/context-guard-cost route-advisor --feature batch_api=true --feature structured_outputs=true --json < workload.json +``` + +`context-guard route-advisor` is a local, passive advisor. It reads caller-supplied workload JSON, provider feature declarations, usage telemetry, and shifted external/local costs, then emits total-cost accounting, batchability blockers, and candidate routes such as batch API, prompt-cache prefix preservation, structured outputs, or cheaper-model evaluation. It does not start a queue, call providers, refresh pricing docs, or treat bundled provider feature knowledge as authoritative; unknown or caller-supplied features are marked recheck-required. Recommendations are candidates only, and hosted token/cost savings claims still require matched successful tasks with non-inferior quality and shifted-cost evidence. + ### Compress selected local text conservatively ```bash diff --git a/context-guard-kit/README.md b/context-guard-kit/README.md index f699ca3..11222a6 100644 --- a/context-guard-kit/README.md +++ b/context-guard-kit/README.md @@ -16,6 +16,7 @@ Claude Code CLI 컨텍스트 낭비를 줄이기 위한 도구 모음입니다. - `context_pack.py` — 우선순위가 있는 로컬 파일 근거를 바이트 예산 안의 Markdown context pack으로 조립하고, 로컬 query/diff/output 신호에서 build manifest를 추천합니다. - `context_filter.py` — 사용자 소유 JSON DSL로 성공 출력 라인 필터를 적용하되, 보호해야 할 실패 출력은 원문 그대로 통과시킵니다. - `tool_schema_pruner.py` — 로컬 tool/MCP catalog를 top-k schema 자문 리포트로 줄이고, 전체 정제된 schema는 receipt/payload로 재조회할 수 있게 합니다. +- `cost_guard.py` — provider usage/cache cost preflight·observe·compile과 local-only route-advisor total-cost/batchability 후보를 출력합니다. - `benchmark_runner.py` — 고정 task/variant fixture로 A/B token/cost 절감 benchmark, cost-shift ledger, report를 생성합니다. - `setup_wizard.py` — 설치 후 project-local `.claude/settings.json`을 대화형으로 선택하고 병합합니다. - `failed_attempt_nudge.py` — 반복 Bash 실패 시 `/clear`/`/compact`와 전략 전환을 짧게 권유합니다. @@ -64,6 +65,8 @@ python3 context-guard-kit/sanitize_output.py -- git diff `cost_guard.py compile`은 section manifest의 `protected`, `semantic_sensitive`, `protected_zone_classes`, `content_type`, `volatile`, `ttl`, `bytes` 필드를 읽어 `protected_zone_policy`와 `transform_policy`를 출력합니다. `protected=true`와 `volatile=true`가 같이 있으면 volatile이 cache ordering을 tail 쪽으로 보내고, protection은 transform/retrieval 정책만 제어합니다. 대용량 protected section에는 local artifact retrieval을 안내하지만 provider prompt cache를 대체한다고 주장하지 않습니다. +`cost_guard.py route-advisor`와 dispatcher alias `context-guard route-advisor`는 caller-supplied workload JSON, provider feature 선언, usage telemetry, shifted external/local cost sidecar를 합쳐 local-only total-cost/batchability/routing 후보 자문을 출력합니다. queue를 시작하거나 provider를 호출하지 않고, provider feature matrix를 authoritative하게 내장하지 않으며, batch API·prompt cache·structured outputs·lower-cost model 추천은 matched successful task와 shifted-cost evidence 전까지 절감 주장으로 해석하면 안 됩니다. + `experimental_registry.py`는 `context-guard experiments`의 project-local 메타데이터 진입점입니다. 기본 비활성이며, `enable`/`disable`은 `.context-guard/experiments.json`만 갱신하고 기존 헬퍼 동작은 여전히 명시적 flag가 있어야 바뀝니다. 레지스트리는 receipt-backed 출력 축약 경로(`trim_command_output.py --digest markdown|json --artifact-receipt`)와 protected-zone 정책 경로(`context_compress.py --protected-policy`, `cost_guard.py compile`의 protected section 메타데이터)를 명시적 flag 실험으로 표시합니다. `experimental_registry.py plan context-diff-compaction`은 읽기 전용 dry-run planner이고, `experimental_registry.py emit context-diff-compaction --receipt-id ... --reexpand-command ...`는 명시적 로컬 runtime입니다. `plan`은 diff 파일과 hunk만 요약하고 replacement text를 만들지 않습니다. `emit`은 review 가능한 hunk, 입력 diff와 일치하는 로컬 보관본 내용, 유효한 재확장 메타데이터, 더 작은 caller-supplied replacement가 모두 있을 때만 caller-supplied compact replacement text를 출력합니다. 로컬 보관본 내용을 검증하지만 re-expand 명령을 실행하거나 hosted savings를 주장하지 않습니다. diff --git a/context-guard-kit/context_guard_commands.py b/context-guard-kit/context_guard_commands.py index c5de196..efbda3e 100644 --- a/context-guard-kit/context_guard_commands.py +++ b/context-guard-kit/context_guard_commands.py @@ -82,6 +82,8 @@ "tool-prune": ("context-guard-tool-prune",), "compress": ("context-guard-compress",), "cost": ("context-guard-cost",), + "route-advisor": ("context-guard-cost", "route-advisor"), + "route": ("context-guard-cost", "route-advisor"), "cache-score": ("context-guard-cache-score",), "bench": ("context-guard-bench",), "read-symbol": ("context-guard-read-symbol",), @@ -186,6 +188,7 @@ DISPATCHER_SMOKE_CASES: tuple[dict[str, Any], ...] = ( {"entrypoint": "context-guard", "args": ["experiments", "list", "--json"], "mode": "json"}, {"entrypoint": "context-guard", "args": ["cost", "--help"], "mode": "text"}, + {"entrypoint": "context-guard", "args": ["route-advisor", "--help"], "mode": "text"}, {"entrypoint": "context-guard", "args": ["cache-score", "--help"], "mode": "text"}, {"entrypoint": "context-guard-pack", "args": ["suggest", "--help"], "mode": "text"}, {"entrypoint": "context-guard-pack", "args": ["auto", "--help"], "mode": "text"}, diff --git a/context-guard-kit/cost_guard.py b/context-guard-kit/cost_guard.py index 0a048a3..20f31d8 100755 --- a/context-guard-kit/cost_guard.py +++ b/context-guard-kit/cost_guard.py @@ -55,6 +55,42 @@ TTL_SECONDS = {"5m": 5 * 60, "1h": 60 * 60} ANTHROPIC_DOCS_URL = "https://docs.anthropic.com/en/build-with-claude/prompt-caching" ANTHROPIC_PRICING_URL = "https://platform.claude.com/docs/en/about-claude/pricing" +ROUTE_FEATURE_KEYS = ("batch_api", "prompt_cache", "structured_outputs", "lower_cost_models", "tool_search") +ROUTE_FEATURE_ALIASES = { + "batch": "batch_api", + "batch-api": "batch_api", + "batch_api": "batch_api", + "batchapi": "batch_api", + "prompt-cache": "prompt_cache", + "prompt_cache": "prompt_cache", + "cache": "prompt_cache", + "structured-output": "structured_outputs", + "structured-outputs": "structured_outputs", + "structured_output": "structured_outputs", + "structured_outputs": "structured_outputs", + "json-schema": "structured_outputs", + "json_schema": "structured_outputs", + "lower-cost-models": "lower_cost_models", + "lower_cost_models": "lower_cost_models", + "cheap-model": "lower_cost_models", + "cheap_models": "lower_cost_models", + "tool-search": "tool_search", + "tool_search": "tool_search", +} +ROUTE_ALLOWED_LATENCY_CLASSES = {"interactive", "async", "batch", "offline", "unknown"} +ROUTE_ALLOWED_RISK_LEVELS = {"low", "medium", "high", "unknown"} +ROUTE_ALLOWED_QUALITY_GATES = {"pass", "unknown", "fail"} +ROUTE_STRUCTURED_TASK_KINDS = { + "classify", + "classification", + "extract", + "extraction", + "transform", + "summarize", + "summary", + "batch_eval", + "eval", +} ALLOWED_FIRST_COMPONENT_SYMLINKS = { "tmp": Path("/private/tmp"), "var": Path("/private/var"), @@ -1851,6 +1887,718 @@ def preflight_command(args: argparse.Namespace) -> int: return 3 if block else 0 +def advisory_label(value: Any, *, default: str = "unknown", limit: int = 80) -> str: + """Return a bounded identifier-like label without echoing secrets or paths.""" + + if value is None: + return default + text = str(value).strip() + if not text: + return default + if secret_count_in_text(text): + return "redacted" + if "/" in text or "\\" in text: + return "path-redacted" + cleaned = re.sub(r"[^A-Za-z0-9_.:-]+", "-", text).strip("-") + if not cleaned: + return default + return cleaned[:limit] + + +ROUTE_MODEL_LOCAL_PATH_FIRST_SEGMENTS = { + "checkpoint", + "checkpoints", + "ckpt", + "data", + "dataset", + "datasets", + "model", + "models", + "private", + "tmp", + "weights", +} +ROUTE_MODEL_LOCAL_PATH_EXTENSIONS = { + ".bin", + ".ckpt", + ".gguf", + ".json", + ".onnx", + ".pt", + ".pth", + ".safetensors", + ".yaml", + ".yml", +} + + +def route_model_path_like(text: str) -> bool: + lower = text.lower() + if ( + text.startswith(("/", "\\", "~", "./", "../")) + or "\\" in text + or re.match(r"^[A-Za-z]:[\\/]", text) is not None + or "/users/" in lower + or "/home/" in lower + or "/private/" in lower + ): + return True + if "/" not in text: + return False + segments = text.split("/") + if len(segments) != 2 or any(seg in {"", ".", ".."} for seg in segments): + return True + first = segments[0].strip().lower() + if first in ROUTE_MODEL_LOCAL_PATH_FIRST_SEGMENTS: + return True + last = segments[-1].strip().lower() + return any(last.endswith(ext) for ext in ROUTE_MODEL_LOCAL_PATH_EXTENSIONS) + + +def route_model_label(value: Any, *, default: str = "unknown", limit: int = 120) -> str: + """Return a model identifier label while redacting local-path-like values.""" + + if value is None: + return default + text = str(value).strip() + if not text: + return default + if secret_count_in_text(text): + return "redacted" + if route_model_path_like(text): + return "path-redacted" + cleaned = re.sub(r"[^A-Za-z0-9_.:/-]+", "-", text).strip("-") + if not cleaned: + return default + return cleaned[:limit] + + +def route_model_for_pricing(value: Any, fallback: str) -> str: + if value is None: + return fallback + text = str(value).strip() + if not text or secret_count_in_text(text): + return fallback + return text + + +def finite_nonnegative_value(value: Any) -> float | None: + if value is None or isinstance(value, bool): + return None + try: + number = float(value) + except (TypeError, ValueError, OverflowError): + return None + if not math.isfinite(number) or number < 0: + return None + return number + + +def route_bool(value: Any) -> bool | None: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)) and not isinstance(value, bool): + if value == 1: + return True + if value == 0: + return False + if isinstance(value, str): + text = value.strip().lower() + if text in {"1", "true", "yes", "y", "on", "supported", "available"}: + return True + if text in {"0", "false", "no", "n", "off", "unsupported", "unavailable"}: + return False + if text in {"", "unknown", "unset", "null", "none"}: + return None + return None + + +def route_choice(value: Any, allowed: set[str], *, default: str = "unknown") -> str: + if value is None: + return default + text = str(value).strip().lower().replace("-", "_") + return text if text in allowed else default + + +def route_nested_dict(data: dict[str, Any], *keys: str) -> dict[str, Any]: + for key in keys: + value = data.get(key) + if isinstance(value, dict): + return value + return {} + + +def first_present_mapping_value(*containers: dict[str, Any], keys: tuple[str, ...]) -> Any: + for container in containers: + for key in keys: + if key in container: + return container.get(key) + return None + + +def first_nonnegative_cost(*containers: dict[str, Any], keys: tuple[str, ...]) -> float | None: + for container in containers: + for key in keys: + if key not in container: + continue + value = finite_nonnegative_value(container.get(key)) + if value is not None: + return value + return None + + +def sum_nonnegative_costs(container: dict[str, Any], keys: tuple[str, ...]) -> tuple[float, list[str]]: + total = 0.0 + observed: list[str] = [] + for key in keys: + value = finite_nonnegative_value(container.get(key)) + if value is None: + continue + total += value + observed.append(key) + return total, observed + + +def sum_nonnegative_costs_from(*containers: dict[str, Any], keys: tuple[str, ...]) -> tuple[float, list[str]]: + total = 0.0 + observed: list[str] = [] + for key in keys: + value = first_nonnegative_cost(*containers, keys=(key,)) + if value is None: + continue + total += value + observed.append(key) + return total, observed + + +def parse_feature_overrides(raw_features: list[str] | None) -> dict[str, bool]: + out: dict[str, bool] = {} + for raw in raw_features or []: + if "=" in raw: + key, raw_value = raw.split("=", 1) + elif ":" in raw: + key, raw_value = raw.split(":", 1) + else: + key, raw_value = raw, "true" + normalized_key = ROUTE_FEATURE_ALIASES.get(key.strip().lower().replace("_", "-")) + display_key = advisory_label(key, default="redacted-route-feature") + if normalized_key is None: + fail(f"unknown route feature {display_key!r}; expected one of {', '.join(ROUTE_FEATURE_KEYS)}") + parsed = route_bool(raw_value) + if parsed is None: + fail(f"route feature {display_key!r} must be true or false") + out[normalized_key] = parsed + return out + + +def provider_features_for_workload(workload: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]: + raw_features = workload.get("provider_features") + workload_features = raw_features if isinstance(raw_features, dict) else {} + flag_features = parse_feature_overrides(getattr(args, "feature", None)) + features: dict[str, dict[str, Any]] = {} + for key in ROUTE_FEATURE_KEYS: + supported: bool | None = None + source = "unknown" + aliases = {key, key.replace("_", "-")} + aliases.update(alias for alias, canonical in ROUTE_FEATURE_ALIASES.items() if canonical == key) + for alias in sorted(aliases): + if alias in workload_features: + parsed = route_bool(workload_features.get(alias)) + if parsed is not None: + supported = parsed + source = "workload" + break + if key in flag_features: + supported = flag_features[key] + source = "flag" + features[key] = { + "supported": supported, + "source": source, + "recheck_required": True, + "reason": "provider_features are caller-supplied or unknown; recheck current provider documentation before operational routing", + } + declared = sum(1 for item in features.values() if item["supported"] is not None) + return { + "features": features, + "declared_feature_count": declared, + "unknown_feature_count": len(features) - declared, + "caller_supplied": declared > 0, + "authoritative_provider_matrix": False, + "recheck_required": True, + } + + +def route_usage_object(workload: dict[str, Any]) -> dict[str, Any]: + usage = workload.get("usage") or workload.get("provider_usage") + if isinstance(usage, dict): + return usage.get("usage") if isinstance(usage.get("usage"), dict) else usage + response = workload.get("response") + if isinstance(response, dict) and isinstance(response.get("usage"), dict): + return response["usage"] + telemetry = workload.get("telemetry") + if isinstance(telemetry, dict): + usage = telemetry.get("usage") or telemetry.get("provider_usage") + if isinstance(usage, dict): + return usage.get("usage") if isinstance(usage.get("usage"), dict) else usage + return {} + + +def usage_has_measured_tokens(usage: dict[str, Any]) -> bool: + return any( + usage_int(usage, key) > 0 + for key in ( + "input_tokens", + "output_tokens", + "cache_creation_input_tokens", + "cache_creation_input_tokens_5m", + "cache_creation_input_tokens_1h", + "cache_read_input_tokens", + ) + ) or bool(usage.get("cache_creation")) + + +def cost_from_usage(usage: dict[str, Any], *, profile: dict[str, Any], model: str, exchange: float) -> dict[str, Any]: + input_rate, output_rate, model_rate_key = rates_for_model(profile, model) + write_mult, read_mult = pricing_multipliers(profile) + input_tokens = usage_int(usage, "input_tokens") + output_tokens = usage_int(usage, "output_tokens") + cache_creation_5m, cache_creation_1h = cache_creation_buckets(usage) + cache_read = usage_int(usage, "cache_read_input_tokens") + cost_usd = ( + money(input_tokens, input_rate) + + money(output_tokens, output_rate) + + money(cache_creation_5m, input_rate, write_mult["5m"]) + + money(cache_creation_1h, input_rate, write_mult["1h"]) + + money(cache_read, input_rate, read_mult) + ) + return { + "cost_usd": round(cost_usd, 8), + "cost_krw": round(krw(cost_usd, exchange), 2), + "model_rate_key": model_rate_key, + "usage": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_creation_input_tokens_5m": cache_creation_5m, + "cache_creation_input_tokens_1h": cache_creation_1h, + "cache_read_input_tokens": cache_read, + }, + } + + +def request_profile_for_route(workload: dict[str, Any]) -> dict[str, Any]: + request = workload.get("request") + if not isinstance(request, dict): + return { + "present": False, + "token_proxy": "unavailable", + "prompt_tokens_estimated": None, + "cache_breakpoint_count": 0, + "cacheable_tokens_estimated": 0, + "raw_request_emitted": False, + } + breakpoints, parse_meta = extract_cache_breakpoints(request) + fingerprints, redactions = build_fingerprints(breakpoints, b"\0" * 32) + cacheable_tokens = max((int(fp.get("tokens_estimated") or 0) for fp in fingerprints), default=0) + return { + "present": True, + "token_proxy": f"chars_div_{TOKEN_PROXY_CHARS_PER_TOKEN}", + "prompt_tokens_estimated": token_proxy_obj(strip_known_cache_controls(request)), + "cache_breakpoint_count": len(breakpoints), + "cacheable_tokens_estimated": cacheable_tokens, + "cache_control_markers": int(parse_meta.get("cache_control_markers") or 0), + "unsupported_cache_controls": int(parse_meta.get("unsupported_cache_controls") or 0), + "secret_like_values_detected": redactions, + "raw_request_emitted": False, + } + + +def route_task_metadata(workload: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]: + task = route_nested_dict(workload, "task", "task_metadata", "routing") + telemetry = route_nested_dict(workload, "telemetry") + latency = route_choice( + getattr(args, "latency_class", None) + or first_present_mapping_value(task, workload, keys=("latency_class", "latency", "mode")), + ROUTE_ALLOWED_LATENCY_CLASSES, + ) + risk = route_choice( + getattr(args, "risk", None) + or first_present_mapping_value(task, workload, keys=("risk", "risk_level")), + ROUTE_ALLOWED_RISK_LEVELS, + ) + quality_gate = route_choice( + getattr(args, "quality_gate", None) + or first_present_mapping_value(task, workload, telemetry, keys=("quality_gate", "quality")), + ROUTE_ALLOWED_QUALITY_GATES, + ) + task_kind = advisory_label( + getattr(args, "task_kind", None) + or first_present_mapping_value(task, workload, keys=("task_kind", "kind", "type")), + default="unknown", + limit=48, + ).lower() + deadline_seconds = safe_int(first_present_mapping_value(task, workload, keys=("deadline_seconds", "max_latency_seconds")), 0) + return { + "latency_class": latency, + "risk": risk, + "quality_gate": quality_gate, + "task_kind": task_kind, + "deadline_seconds": deadline_seconds, + "requires_interaction": bool(route_bool(first_present_mapping_value(task, workload, keys=("requires_interaction", "interactive_required", "user_blocking")))), + "has_external_side_effects": bool(route_bool(first_present_mapping_value(task, workload, keys=("has_external_side_effects", "side_effects")))), + "order_sensitive": bool(route_bool(first_present_mapping_value(task, workload, keys=("order_sensitive", "requires_order")))), + } + + +def total_cost_accounting_for_route( + workload: dict[str, Any], + *, + profile: dict[str, Any], + model: str, + exchange: float, +) -> dict[str, Any]: + telemetry = route_nested_dict(workload, "telemetry") + shifted = route_nested_dict(workload, "shifted_costs", "shifted_cost", "auxiliary_costs") + usage = route_usage_object(workload) + usage_cost = cost_from_usage(usage, profile=profile, model=model, exchange=exchange) if usage_has_measured_tokens(usage) else None + + primary_cost = first_nonnegative_cost( + telemetry, + workload, + keys=("primary_cost_usd", "provider_cost_usd", "observed_cost_usd", "cost_usd"), + ) + primary_source = "explicit_telemetry" if primary_cost is not None else "unavailable" + if primary_cost is None and usage_cost is not None: + primary_cost = float(usage_cost["cost_usd"]) + primary_source = "estimated_from_provider_usage_fields" + if primary_cost is None: + primary_cost = 0.0 + + external_cost_value = first_nonnegative_cost(telemetry, shifted, workload, keys=("external_cost_usd",)) + external_component_sum, external_components = sum_nonnegative_costs_from( + telemetry, + shifted, + keys=("subagent_cost_usd", "embedding_cost_usd", "reranker_cost_usd", "tool_call_cost_usd", "retry_cost_usd", "auxiliary_provider_cost_usd"), + ) + external_cost_from_aggregate = external_cost_value is not None + if external_cost_value is None: + external_cost = external_component_sum + else: + external_cost = external_cost_value + + local_cost_value = first_nonnegative_cost( + telemetry, + shifted, + workload, + keys=("local_cost_usd", "self_hosted_cost_usd", "local_model_cost_usd"), + ) + local_component_sum, local_components = sum_nonnegative_costs_from( + telemetry, + shifted, + keys=("local_server_cost_usd", "local_energy_cost_usd", "storage_cost_usd"), + ) + local_cost_from_aggregate = local_cost_value is not None + if local_cost_value is None: + local_cost = local_component_sum + else: + local_cost = local_cost_value + + provided_total = first_nonnegative_cost( + telemetry, + shifted, + workload, + keys=("total_cost_with_shift_usd", "total_shifted_cost_usd"), + ) + computed_total = primary_cost + external_cost + local_cost + total = provided_total if provided_total is not None else computed_total + external_tokens = safe_int(first_present_mapping_value(telemetry, shifted, workload, keys=("external_tokens", "subagent_tokens", "embedding_tokens")), 0) + retry_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("retry_count", "retries")), 0) + subagent_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("subagent_count", "subagents")), 0) + tool_call_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("tool_call_count", "tool_calls")), 0) + external_cost_supplied = external_cost_from_aggregate or bool(external_components) + local_cost_supplied = local_cost_from_aggregate or bool(local_components) + provided_total_supplied = provided_total is not None + missing_shifted_cost = bool( + (external_tokens or retry_count or subagent_count or tool_call_count) + and not (external_cost_supplied or local_cost_supplied or provided_total_supplied) + ) + return { + "currency": "USD", + "primary_cost_usd": round(primary_cost, 8), + "primary_cost_source": primary_source, + "external_cost_usd": round(external_cost, 8), + "local_cost_usd": round(local_cost, 8), + "external_cost_supplied": external_cost_supplied, + "local_cost_supplied": local_cost_supplied, + "external_component_breakdown_usd": round(external_component_sum, 8), + "local_component_breakdown_usd": round(local_component_sum, 8), + "computed_total_cost_with_shift_usd": round(computed_total, 8), + "total_cost_with_shift_usd": round(total, 8), + "total_cost_with_shift_krw": round(krw(total, exchange), 2), + "provided_total_cost_with_shift_usd": round(provided_total, 8) if provided_total is not None else None, + "pricing": { + "profile": str(profile.get("name") or "custom"), + "release_recheck_required": bool(profile.get("release_recheck_required", True)), + "source_urls": profile.get("source_urls", [ANTHROPIC_DOCS_URL, ANTHROPIC_PRICING_URL]), + "usd_to_krw": exchange, + }, + "usage_cost_estimate": usage_cost, + "components_observed": sorted(set(external_components + local_components)), + "run_counters": { + "external_tokens": external_tokens, + "retry_count": retry_count, + "subagent_count": subagent_count, + "tool_call_count": tool_call_count, + }, + "measurement_availability": { + "provider_usage_tokens": usage_has_measured_tokens(usage), + "primary_cost": primary_source != "unavailable", + "external_cost": external_cost_supplied, + "local_cost": local_cost_supplied, + "shifted_cost": bool(external_cost_supplied or local_cost_supplied or provided_total_supplied), + }, + "shifted_cost_accounting": { + "required": True, + "diagnostic_only": True, + "includes_external_or_local_components": bool(external_cost_supplied or local_cost_supplied), + "missing_shifted_cost_warning": missing_shifted_cost, + "claim_boundary": "total-cost routing is advisory; hosted savings claims require matched successful tasks with non-inferior quality and measured shifted costs", + }, + } + + +def batchability_for_route(task: dict[str, Any], provider_features: dict[str, Any]) -> dict[str, Any]: + feature = provider_features["features"]["batch_api"] + batch_supported = feature["supported"] + blockers: list[str] = [] + reasons: list[str] = [] + latency = str(task.get("latency_class") or "unknown") + deadline = int(task.get("deadline_seconds") or 0) + if latency == "interactive": + blockers.append("interactive_latency") + elif latency in {"async", "batch", "offline"}: + reasons.append(f"latency_class_{latency}") + elif deadline >= 3600: + reasons.append("deadline_allows_batch_window") + else: + reasons.append("latency_unknown") + if task.get("requires_interaction"): + blockers.append("requires_user_interaction") + if task.get("has_external_side_effects"): + blockers.append("external_side_effects_need_idempotency_review") + if task.get("order_sensitive"): + blockers.append("order_sensitive") + if task.get("risk") == "high": + blockers.append("high_risk_route") + if task.get("quality_gate") == "fail": + blockers.append("quality_gate_failed") + if batch_supported is False: + blockers.append("provider_batch_api_not_declared") + elif batch_supported is None: + reasons.append("provider_batch_api_unknown_recheck_required") + else: + reasons.append("provider_batch_api_declared") + if blockers: + level = "not_recommended" + eligible = False + elif batch_supported is True and (latency in {"async", "batch", "offline"} or deadline >= 3600): + level = "candidate" + eligible = True + else: + level = "conditional" + eligible = False + return { + "eligible": eligible, + "level": level, + "latency_class": latency, + "deadline_seconds": deadline, + "reasons": sorted(set(reasons)), + "blockers": sorted(set(blockers)), + "requires_current_provider_docs_check": batch_supported is None, + } + + +def recommendation( + rec_id: str, + *, + decision: str, + priority: str, + rationale: str, + prerequisites: list[str], +) -> dict[str, Any]: + return { + "id": rec_id, + "decision": decision, + "priority": priority, + "rationale": rationale, + "prerequisites": prerequisites, + "claim_boundary": "candidate routing advice only; validate on matched successful tasks before claiming token or cost savings", + } + + +def route_recommendations( + *, + task: dict[str, Any], + provider_features: dict[str, Any], + request_profile: dict[str, Any], + batchability: dict[str, Any], + total_cost: dict[str, Any], +) -> list[dict[str, Any]]: + recs: list[dict[str, Any]] = [ + recommendation( + "measure-before-claim", + decision="required", + priority="P0", + rationale="Route changes can shift work into retries, subagents, batch queues, local servers, or provider cache writes; measure total cost with quality gates before claims.", + prerequisites=["matched_successful_tasks", "non_inferior_quality", "shifted_cost_accounting"], + ) + ] + batch_decision = "candidate" if batchability.get("eligible") else str(batchability.get("level") or "conditional") + recs.append( + recommendation( + "use-batch-api-for-noninteractive-work", + decision=batch_decision, + priority="P1" if batch_decision == "candidate" else "P2", + rationale="Batch APIs can reduce cost for non-interactive work only when provider support, latency tolerance, idempotency, and quality gates are satisfied.", + prerequisites=["provider_batch_support_current", "async_or_offline_latency", "idempotency_review", "matched_replay"], + ) + ) + + prompt_cache_feature = provider_features["features"]["prompt_cache"]["supported"] + cache_breakpoints = int(request_profile.get("cache_breakpoint_count") or 0) + cacheable_tokens = int(request_profile.get("cacheable_tokens_estimated") or 0) + if prompt_cache_feature is False: + cache_decision = "not_recommended" + elif cache_breakpoints or cacheable_tokens: + cache_decision = "candidate" if prompt_cache_feature is True else "conditional" + else: + cache_decision = "needs_request_evidence" + recs.append( + recommendation( + "preserve-prompt-cache-prefix", + decision=cache_decision, + priority="P1" if cache_decision == "candidate" else "P2", + rationale="Stable-prefix prompt caching is useful only when current provider support and repeated cacheable request prefixes are verified.", + prerequisites=["stable_prefix_first", "volatile_tail", "provider_usage_cache_telemetry"], + ) + ) + + structured_feature = provider_features["features"]["structured_outputs"]["supported"] + task_kind = str(task.get("task_kind") or "unknown") + if structured_feature is False: + structured_decision = "not_recommended" + elif task_kind in ROUTE_STRUCTURED_TASK_KINDS: + structured_decision = "candidate" if structured_feature is True else "conditional" + else: + structured_decision = "needs_task_fit" + recs.append( + recommendation( + "use-structured-outputs-when-task-fits", + decision=structured_decision, + priority="P2", + rationale="Structured outputs can reduce retries and parsing repairs for extraction/classification style work, but they are not a token-savings proof.", + prerequisites=["schema_fit_review", "retry_rate_measurement", "quality_non_regression"], + ) + ) + + lower_cost_feature = provider_features["features"]["lower_cost_models"]["supported"] + risk = str(task.get("risk") or "unknown") + quality_gate = str(task.get("quality_gate") or "unknown") + if lower_cost_feature is False or risk == "high" or quality_gate == "fail": + cheaper_decision = "not_recommended" + elif risk == "low" and quality_gate in {"pass", "unknown"}: + cheaper_decision = "candidate" if lower_cost_feature is True else "conditional" + else: + cheaper_decision = "conditional" + recs.append( + recommendation( + "evaluate-cheaper-model-route", + decision=cheaper_decision, + priority="P2", + rationale="Lower-cost model routing is acceptable only for low-risk or well-gated work and must include corrections, retries, and shifted cost.", + prerequisites=["risk_tier_low_or_reviewed", "matched_replay", "corrections_guardrail", "retry_cost_accounting"], + ) + ) + + if total_cost["shifted_cost_accounting"].get("missing_shifted_cost_warning"): + recs.append( + recommendation( + "record-missing-shifted-costs", + decision="required", + priority="P1", + rationale="Telemetry indicates external tokens, retries, or subagents but no shifted external/local cost component was supplied.", + prerequisites=["external_cost_usd_or_local_cost_usd", "retry_or_subagent_cost_measurement"], + ) + ) + return recs + + +def route_advisor_command(args: argparse.Namespace) -> int: + workload_raw, _truncated = load_json_input(args.workload, max_bytes=args.max_bytes) + workload = require_json_object(workload_raw.get("workload") if isinstance(workload_raw, dict) and isinstance(workload_raw.get("workload"), dict) else workload_raw, "workload") + profile = load_pricing_profile(args.pricing_profile, max_bytes=args.max_bytes) + if args.usd_to_krw is not None: + profile["usd_to_krw"] = usd_to_krw(profile, args.usd_to_krw) + exchange = usd_to_krw(profile, None) + request = workload.get("request") if isinstance(workload.get("request"), dict) else {} + provider = advisory_label(getattr(args, "provider", None) or workload.get("provider") or (request.get("provider") if isinstance(request, dict) else None)) + model_raw = getattr(args, "model", None) or workload.get("model") or (request.get("model") if isinstance(request, dict) else None) + model = route_model_label(model_raw) + model_for_pricing = route_model_for_pricing(model_raw, model) + provider_features = provider_features_for_workload(workload, args) + task = route_task_metadata(workload, args) + request_profile = request_profile_for_route(workload) + total_cost = total_cost_accounting_for_route(workload, profile=profile, model=model_for_pricing, exchange=exchange) + batchability = batchability_for_route(task, provider_features) + recommendations = route_recommendations( + task=task, + provider_features=provider_features, + request_profile=request_profile, + batchability=batchability, + total_cost=total_cost, + ) + report = { + "schema_version": SCHEMA_VERSION, + "tool": TOOL_NAME, + "mode": "route_advisor", + "provider": { + "name": provider, + "model": model, + "feature_matrix_authoritative": False, + "feature_recheck_required": True, + }, + "provider_features": provider_features, + "task": task, + "request_profile": request_profile, + "total_cost_accounting": total_cost, + "batchability": batchability, + "route_recommendations": recommendations, + "routing_decision": { + "best_current_action": "measure_before_claim" if any(rec["decision"] == "required" for rec in recommendations) else "review_candidates", + "candidate_count": sum(1 for rec in recommendations if rec.get("decision") == "candidate"), + "conditional_count": sum(1 for rec in recommendations if rec.get("decision") == "conditional"), + "not_recommended_count": sum(1 for rec in recommendations if rec.get("decision") == "not_recommended"), + }, + "claim_boundary": { + "hosted_api_token_savings_claim_allowed": False, + "hosted_api_cost_savings_claim_allowed": False, + "requires_matched_successful_tasks": True, + "requires_non_inferior_quality": True, + "requires_shifted_cost_accounting": True, + "provider_features_are_caller_supplied_or_unknown": True, + }, + "privacy": { + "raw_prompt_emitted": False, + "raw_request_emitted": False, + "raw_paths_emitted": False, + "workload_stored": False, + "provider_call_performed": False, + "queue_started": False, + }, + } + emit(report, json_mode=args.json) + return 0 + + def usage_int(data: dict[str, Any], key: str) -> int: value = data.get(key, 0) try: @@ -2282,6 +3030,15 @@ def emit(data: dict[str, Any], *, json_mode: bool) -> None: elif mode == "compile": findings = data.get("findings", []) if isinstance(data.get("findings"), list) else [] print(f"{TOOL_NAME}: compile findings={len(findings)}") + elif mode == "route_advisor": + batchability = data.get("batchability", {}) if isinstance(data.get("batchability"), dict) else {} + routing = data.get("routing_decision", {}) if isinstance(data.get("routing_decision"), dict) else {} + total = data.get("total_cost_accounting", {}) if isinstance(data.get("total_cost_accounting"), dict) else {} + print( + f"{TOOL_NAME}: route-advisor batch={batchability.get('level', 'unknown')} " + f"candidates={routing.get('candidate_count', 0)} conditional={routing.get('conditional_count', 0)} " + f"total_with_shift=${total.get('total_cost_with_shift_usd', 0)}" + ) else: summary = data.get("summary", {}) if isinstance(data.get("summary"), dict) else {} print(f"{TOOL_NAME}: ledger entries={summary.get('entries', 0)}") @@ -2335,6 +3092,22 @@ def build_parser() -> argparse.ArgumentParser: compile_parser.add_argument("--json", action="store_true", help="emit machine-readable JSON") compile_parser.set_defaults(func=compile_command) + route = sub.add_parser( + "route-advisor", + help="advise on batchability, provider features, total cost, and route candidates", + description="advise on batchability, provider features, total cost, and route candidates without provider calls or queue runtime", + ) + route.add_argument("--workload", default="-", help="workload JSON path, or '-' for stdin") + route.add_argument("--provider", help="provider label override; advisory only") + route.add_argument("--model", help="model label override for pricing lookup; advisory only") + route.add_argument("--feature", action="append", default=[], help="provider feature override such as batch_api=true or structured_outputs=false") + route.add_argument("--latency-class", choices=sorted(ROUTE_ALLOWED_LATENCY_CLASSES), help="latency class override") + route.add_argument("--risk", choices=sorted(ROUTE_ALLOWED_RISK_LEVELS), help="risk tier override") + route.add_argument("--quality-gate", choices=sorted(ROUTE_ALLOWED_QUALITY_GATES), help="quality gate override") + route.add_argument("--task-kind", help="task kind label such as extract, summarize, code_edit, or unknown") + add_common_cost_args(route) + route.set_defaults(func=route_advisor_command) + return parser diff --git a/plugins/context-guard/README.ko.md b/plugins/context-guard/README.ko.md index 1709d5f..a86ec77 100644 --- a/plugins/context-guard/README.ko.md +++ b/plugins/context-guard/README.ko.md @@ -71,6 +71,7 @@ context-guard-artifact get --lines 1:80 context-guard-compress --json < large-output.txt context-guard cost preflight --request request.json --budget-krw 3000 --json context-guard cost observe --usage usage.json --json +context-guard route-advisor --workload workload.json --json context-guard-trim-output --max-lines 120 -- npm test context-guard-read-symbol path/to/file.py TargetSymbol context-guard-sanitize-output -- rg -n "TOKEN|SECRET" . @@ -93,7 +94,7 @@ context-guard-statusline-merged - **예산 기반 컨텍스트 패커**는 우선순위가 있는 로컬 파일 근거를 렌더링된 바이트 예산 안의 Markdown pack으로 조립하고, 포함·부분 포함·누락 source 메타데이터, bounded `.context-guard/packs` 요약 기록, 안전할 때만 정확한 가림 처리 `slice` 명령, 안전하지 않을 때의 `retrieval_omitted_reason`을 남깁니다. 추가된 `auto` 하위 명령은 추천과 pack build를 한 번에 실행하고, `auto --explain`은 manifest, pack 본문, receipt, byte budget을 바꾸지 않으면서 결정적 로컬 선택/build 이유를 짧게 추가합니다. JSON explain의 bounded repo-map은 sampled byte/token-proxy tree, category-only secret risk count, signature-first hint, explain-only graph rank, 기존 `slice`/symbol 재조회 힌트를 제공하지만 pack 선택이나 provider savings claim은 아닙니다. `suggest`는 로컬 query, diff, 명시 파일, 가림 처리된 output/test-output 신호를 `build`와 호환되는 manifest로 순위화하며 네트워크·모델 호출·임베딩·provider 비용 추정은 하지 않습니다. 토큰 수는 측정된 provider token 절감이 아니라 추정 `chars_div_4` proxy입니다. - **Tool/MCP schema pruner**는 로컬 tool catalog를 bounded top-k 자문 리포트로 순위화하고, compact 요약 기록과 payload integrity check로 전체 가림 처리된 schema 재조회를 보존합니다. - **보수적 압축기**는 가림 처리된 stdin을 JSON, diff, 로그, 검색 출력, 코드, 산문으로 분류하고, 관측 바이트 근거와 추정 토큰 proxy를 함께 노출합니다. -- **Anthropic 비용 가드**는 `context-guard cost preflight/observe/ledger/compile`로 호출 전 비용 추정, provider usage 대조, keyed-HMAC cache 위험 기록, 안정적인 prefix 배치 안내를 제공합니다. 원문 프롬프트를 저장하지 않으며 Anthropic prompt cache를 대체하지 않습니다. +- **Anthropic 비용 가드와 route advisor**는 `context-guard cost preflight/observe/ledger/compile`로 호출 전 비용 추정, provider usage 대조, keyed-HMAC cache 위험 기록, 안정적인 prefix 배치 안내를 제공합니다. `context-guard route-advisor`는 caller가 제공한 workload JSON, provider feature 선언, usage telemetry, 외부·로컬 shifted cost를 읽는 local-only passive advisor이며 queue를 시작하거나 provider를 호출하거나 pricing 문서를 새로 가져오거나 provider feature 지식을 authoritative하게 취급하지 않고 total-cost accounting, batchability blocker, route 후보를 출력합니다. 원문 프롬프트를 저장하지 않고 Anthropic/provider prompt cache를 대체하지 않으며, 추천은 matched successful task, 비열등 quality evidence, shifted-cost accounting 없이는 hosted token/cost 절감 주장이 아닙니다. - **출력 축약기**는 감싼 명령의 종료 코드를 보존하면서 긴 로그를 줄이고, `--digest markdown` 또는 `--digest json`으로 실행기 실패 정보, 가림 처리된 failure signature, 중복 라인 그룹, 다음 조회 제안이 담긴 요약을 만들 수 있습니다. - **민감정보 가림 도구**는 검색, diff, 로그 출력에서 자격 증명 패턴, 비공개 키 블록, 인증 헤더, 자격 증명이 포함된 URL, 민감해 보이는 경로를 가립니다. - **상태표시줄**은 모델, 컨텍스트, 비용 신호를 짧게 보여주고, 대화 기록 데이터가 있으면 캐시 읽기와 캐시 재사용 신호도 함께 표시합니다. diff --git a/plugins/context-guard/README.md b/plugins/context-guard/README.md index afa2b05..b628282 100644 --- a/plugins/context-guard/README.md +++ b/plugins/context-guard/README.md @@ -75,6 +75,7 @@ context-guard-compress --json --protected-policy < evidence.txt context-guard-compress --json --type prose --mode readable < sanitized-prose.txt context-guard cost preflight --request request.json --budget-krw 3000 --json context-guard cost observe --usage usage.json --json +context-guard route-advisor --workload workload.json --json context-guard-trim-output --max-lines 120 -- npm test context-guard-read-symbol path/to/file.py TargetSymbol context-guard-sanitize-output -- rg -n "TOKEN|SECRET" . @@ -104,7 +105,7 @@ context-guard-statusline-merged - **Budgeted context packer** assembles prioritized local file evidence into a rendered byte-budgeted Markdown pack with included/partial/omitted source metadata, bounded `.context-guard/packs` receipts, exact sanitized `slice` commands when safe, and `retrieval_omitted_reason` when a path/root should not be echoed. The additive `auto` subcommand runs that recommendation and pack build in one step, and `auto --explain` adds compact deterministic local selection/build reasons without changing the manifest, pack body, receipt, or byte budget. JSON explain also includes bounded repo-map metadata: sampled byte/token-proxy tree entries, category-only secret-risk counts, signature-first hints, explain-only graph ranks, and exact `slice`/symbol retrieval hints. `suggest` remains available to rank local query, diff, explicit file, and sanitized output/test-output signals into a build-compatible manifest without network, model, embedding, or provider-cost calls. `suggest/auto --adaptive-k` adds advisory-only shrink/expand top-k metadata from local score distribution, byte-budget fit, and score-mass recall/precision proxies; it never applies the recommendation automatically or changes the manifest, pack body, receipt, or byte budget. `auto --symbol-memory` adds repo-map-derived symbol/graph advisory metadata with exact `slice`/`read-symbol` verification hints and still does not change selection or pack output. Token counts are estimated `chars_div_4` proxies, not measured provider-token savings. - **Tool/MCP schema pruner** ranks local tool catalogs into bounded top-k advisory reports while preserving full sanitized schema fallback through compact receipts and payload integrity checks. - **Conservative compressor** classifies sanitized stdin as JSON, diff, log, search output, code, or prose and shrinks it with observed byte evidence plus estimated token proxies. Add `--protected-policy` for opt-in protected-zone class/count metadata that denies semantic rewrites for code fences, diffs, identifiers, numeric constants, hashes, paths, stack frames, quoted strings, and JSON keys while preserving exact-retrieval guidance. Add `--mode readable` only for sanitized prose previews: it uses deterministic sentence windows, blocks prompt-like/high-risk protected signals, stores no raw protected spans, and does not run learned compressors, models, embeddings, or rerankers. -- **Anthropic cost guard** provides `context-guard cost preflight/observe/ledger/compile` for passive pre-call estimates, provider-usage reconciliation, keyed-HMAC cache-risk history, and stable-prefix layout advice. It stores no raw prompt text and does not replace Anthropic prompt caching. +- **Anthropic cost guard and route advisor** provides `context-guard cost preflight/observe/ledger/compile` for passive pre-call estimates, provider-usage reconciliation, keyed-HMAC cache-risk history, and stable-prefix layout advice. `context-guard route-advisor` is a local-only passive advisor for caller-supplied workload JSON, provider feature declarations, usage telemetry, and shifted external/local costs; it emits total-cost accounting, batchability blockers, and route candidates without starting a queue, calling providers, refreshing pricing docs, or treating provider feature knowledge as authoritative. It stores no raw prompt text, does not replace Anthropic/provider prompt caching, and its recommendations are not hosted token/cost savings claims without matched successful tasks, non-inferior quality evidence, and shifted-cost accounting. - **Output trimmer** preserves the wrapped command exit code, trims long logs, and can emit `--digest markdown` or `--digest json` summaries with runner failure facts, sanitized failure signatures, duplicate-line groups, and suggested next queries. Add `--artifact-receipt` with digest mode to store the exact sanitized full output as a local artifact receipt and re-expand omitted slices with the emitted `context-guard-artifact get ...` command. - **Sanitizer** redacts common credential patterns, private key blocks, auth headers, credential URLs, and sensitive-looking paths from search, diff, and log output. - **Statusline** displays compact model/context/cost signals and, when transcript data is available, cache-read and cache-reuse signals. diff --git a/plugins/context-guard/bin/context-guard-cost b/plugins/context-guard/bin/context-guard-cost index 0a048a3..20f31d8 100755 --- a/plugins/context-guard/bin/context-guard-cost +++ b/plugins/context-guard/bin/context-guard-cost @@ -55,6 +55,42 @@ LEDGER_OPEN_RETRY_SECONDS = 0.01 TTL_SECONDS = {"5m": 5 * 60, "1h": 60 * 60} ANTHROPIC_DOCS_URL = "https://docs.anthropic.com/en/build-with-claude/prompt-caching" ANTHROPIC_PRICING_URL = "https://platform.claude.com/docs/en/about-claude/pricing" +ROUTE_FEATURE_KEYS = ("batch_api", "prompt_cache", "structured_outputs", "lower_cost_models", "tool_search") +ROUTE_FEATURE_ALIASES = { + "batch": "batch_api", + "batch-api": "batch_api", + "batch_api": "batch_api", + "batchapi": "batch_api", + "prompt-cache": "prompt_cache", + "prompt_cache": "prompt_cache", + "cache": "prompt_cache", + "structured-output": "structured_outputs", + "structured-outputs": "structured_outputs", + "structured_output": "structured_outputs", + "structured_outputs": "structured_outputs", + "json-schema": "structured_outputs", + "json_schema": "structured_outputs", + "lower-cost-models": "lower_cost_models", + "lower_cost_models": "lower_cost_models", + "cheap-model": "lower_cost_models", + "cheap_models": "lower_cost_models", + "tool-search": "tool_search", + "tool_search": "tool_search", +} +ROUTE_ALLOWED_LATENCY_CLASSES = {"interactive", "async", "batch", "offline", "unknown"} +ROUTE_ALLOWED_RISK_LEVELS = {"low", "medium", "high", "unknown"} +ROUTE_ALLOWED_QUALITY_GATES = {"pass", "unknown", "fail"} +ROUTE_STRUCTURED_TASK_KINDS = { + "classify", + "classification", + "extract", + "extraction", + "transform", + "summarize", + "summary", + "batch_eval", + "eval", +} ALLOWED_FIRST_COMPONENT_SYMLINKS = { "tmp": Path("/private/tmp"), "var": Path("/private/var"), @@ -1851,6 +1887,718 @@ def preflight_command(args: argparse.Namespace) -> int: return 3 if block else 0 +def advisory_label(value: Any, *, default: str = "unknown", limit: int = 80) -> str: + """Return a bounded identifier-like label without echoing secrets or paths.""" + + if value is None: + return default + text = str(value).strip() + if not text: + return default + if secret_count_in_text(text): + return "redacted" + if "/" in text or "\\" in text: + return "path-redacted" + cleaned = re.sub(r"[^A-Za-z0-9_.:-]+", "-", text).strip("-") + if not cleaned: + return default + return cleaned[:limit] + + +ROUTE_MODEL_LOCAL_PATH_FIRST_SEGMENTS = { + "checkpoint", + "checkpoints", + "ckpt", + "data", + "dataset", + "datasets", + "model", + "models", + "private", + "tmp", + "weights", +} +ROUTE_MODEL_LOCAL_PATH_EXTENSIONS = { + ".bin", + ".ckpt", + ".gguf", + ".json", + ".onnx", + ".pt", + ".pth", + ".safetensors", + ".yaml", + ".yml", +} + + +def route_model_path_like(text: str) -> bool: + lower = text.lower() + if ( + text.startswith(("/", "\\", "~", "./", "../")) + or "\\" in text + or re.match(r"^[A-Za-z]:[\\/]", text) is not None + or "/users/" in lower + or "/home/" in lower + or "/private/" in lower + ): + return True + if "/" not in text: + return False + segments = text.split("/") + if len(segments) != 2 or any(seg in {"", ".", ".."} for seg in segments): + return True + first = segments[0].strip().lower() + if first in ROUTE_MODEL_LOCAL_PATH_FIRST_SEGMENTS: + return True + last = segments[-1].strip().lower() + return any(last.endswith(ext) for ext in ROUTE_MODEL_LOCAL_PATH_EXTENSIONS) + + +def route_model_label(value: Any, *, default: str = "unknown", limit: int = 120) -> str: + """Return a model identifier label while redacting local-path-like values.""" + + if value is None: + return default + text = str(value).strip() + if not text: + return default + if secret_count_in_text(text): + return "redacted" + if route_model_path_like(text): + return "path-redacted" + cleaned = re.sub(r"[^A-Za-z0-9_.:/-]+", "-", text).strip("-") + if not cleaned: + return default + return cleaned[:limit] + + +def route_model_for_pricing(value: Any, fallback: str) -> str: + if value is None: + return fallback + text = str(value).strip() + if not text or secret_count_in_text(text): + return fallback + return text + + +def finite_nonnegative_value(value: Any) -> float | None: + if value is None or isinstance(value, bool): + return None + try: + number = float(value) + except (TypeError, ValueError, OverflowError): + return None + if not math.isfinite(number) or number < 0: + return None + return number + + +def route_bool(value: Any) -> bool | None: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)) and not isinstance(value, bool): + if value == 1: + return True + if value == 0: + return False + if isinstance(value, str): + text = value.strip().lower() + if text in {"1", "true", "yes", "y", "on", "supported", "available"}: + return True + if text in {"0", "false", "no", "n", "off", "unsupported", "unavailable"}: + return False + if text in {"", "unknown", "unset", "null", "none"}: + return None + return None + + +def route_choice(value: Any, allowed: set[str], *, default: str = "unknown") -> str: + if value is None: + return default + text = str(value).strip().lower().replace("-", "_") + return text if text in allowed else default + + +def route_nested_dict(data: dict[str, Any], *keys: str) -> dict[str, Any]: + for key in keys: + value = data.get(key) + if isinstance(value, dict): + return value + return {} + + +def first_present_mapping_value(*containers: dict[str, Any], keys: tuple[str, ...]) -> Any: + for container in containers: + for key in keys: + if key in container: + return container.get(key) + return None + + +def first_nonnegative_cost(*containers: dict[str, Any], keys: tuple[str, ...]) -> float | None: + for container in containers: + for key in keys: + if key not in container: + continue + value = finite_nonnegative_value(container.get(key)) + if value is not None: + return value + return None + + +def sum_nonnegative_costs(container: dict[str, Any], keys: tuple[str, ...]) -> tuple[float, list[str]]: + total = 0.0 + observed: list[str] = [] + for key in keys: + value = finite_nonnegative_value(container.get(key)) + if value is None: + continue + total += value + observed.append(key) + return total, observed + + +def sum_nonnegative_costs_from(*containers: dict[str, Any], keys: tuple[str, ...]) -> tuple[float, list[str]]: + total = 0.0 + observed: list[str] = [] + for key in keys: + value = first_nonnegative_cost(*containers, keys=(key,)) + if value is None: + continue + total += value + observed.append(key) + return total, observed + + +def parse_feature_overrides(raw_features: list[str] | None) -> dict[str, bool]: + out: dict[str, bool] = {} + for raw in raw_features or []: + if "=" in raw: + key, raw_value = raw.split("=", 1) + elif ":" in raw: + key, raw_value = raw.split(":", 1) + else: + key, raw_value = raw, "true" + normalized_key = ROUTE_FEATURE_ALIASES.get(key.strip().lower().replace("_", "-")) + display_key = advisory_label(key, default="redacted-route-feature") + if normalized_key is None: + fail(f"unknown route feature {display_key!r}; expected one of {', '.join(ROUTE_FEATURE_KEYS)}") + parsed = route_bool(raw_value) + if parsed is None: + fail(f"route feature {display_key!r} must be true or false") + out[normalized_key] = parsed + return out + + +def provider_features_for_workload(workload: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]: + raw_features = workload.get("provider_features") + workload_features = raw_features if isinstance(raw_features, dict) else {} + flag_features = parse_feature_overrides(getattr(args, "feature", None)) + features: dict[str, dict[str, Any]] = {} + for key in ROUTE_FEATURE_KEYS: + supported: bool | None = None + source = "unknown" + aliases = {key, key.replace("_", "-")} + aliases.update(alias for alias, canonical in ROUTE_FEATURE_ALIASES.items() if canonical == key) + for alias in sorted(aliases): + if alias in workload_features: + parsed = route_bool(workload_features.get(alias)) + if parsed is not None: + supported = parsed + source = "workload" + break + if key in flag_features: + supported = flag_features[key] + source = "flag" + features[key] = { + "supported": supported, + "source": source, + "recheck_required": True, + "reason": "provider_features are caller-supplied or unknown; recheck current provider documentation before operational routing", + } + declared = sum(1 for item in features.values() if item["supported"] is not None) + return { + "features": features, + "declared_feature_count": declared, + "unknown_feature_count": len(features) - declared, + "caller_supplied": declared > 0, + "authoritative_provider_matrix": False, + "recheck_required": True, + } + + +def route_usage_object(workload: dict[str, Any]) -> dict[str, Any]: + usage = workload.get("usage") or workload.get("provider_usage") + if isinstance(usage, dict): + return usage.get("usage") if isinstance(usage.get("usage"), dict) else usage + response = workload.get("response") + if isinstance(response, dict) and isinstance(response.get("usage"), dict): + return response["usage"] + telemetry = workload.get("telemetry") + if isinstance(telemetry, dict): + usage = telemetry.get("usage") or telemetry.get("provider_usage") + if isinstance(usage, dict): + return usage.get("usage") if isinstance(usage.get("usage"), dict) else usage + return {} + + +def usage_has_measured_tokens(usage: dict[str, Any]) -> bool: + return any( + usage_int(usage, key) > 0 + for key in ( + "input_tokens", + "output_tokens", + "cache_creation_input_tokens", + "cache_creation_input_tokens_5m", + "cache_creation_input_tokens_1h", + "cache_read_input_tokens", + ) + ) or bool(usage.get("cache_creation")) + + +def cost_from_usage(usage: dict[str, Any], *, profile: dict[str, Any], model: str, exchange: float) -> dict[str, Any]: + input_rate, output_rate, model_rate_key = rates_for_model(profile, model) + write_mult, read_mult = pricing_multipliers(profile) + input_tokens = usage_int(usage, "input_tokens") + output_tokens = usage_int(usage, "output_tokens") + cache_creation_5m, cache_creation_1h = cache_creation_buckets(usage) + cache_read = usage_int(usage, "cache_read_input_tokens") + cost_usd = ( + money(input_tokens, input_rate) + + money(output_tokens, output_rate) + + money(cache_creation_5m, input_rate, write_mult["5m"]) + + money(cache_creation_1h, input_rate, write_mult["1h"]) + + money(cache_read, input_rate, read_mult) + ) + return { + "cost_usd": round(cost_usd, 8), + "cost_krw": round(krw(cost_usd, exchange), 2), + "model_rate_key": model_rate_key, + "usage": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_creation_input_tokens_5m": cache_creation_5m, + "cache_creation_input_tokens_1h": cache_creation_1h, + "cache_read_input_tokens": cache_read, + }, + } + + +def request_profile_for_route(workload: dict[str, Any]) -> dict[str, Any]: + request = workload.get("request") + if not isinstance(request, dict): + return { + "present": False, + "token_proxy": "unavailable", + "prompt_tokens_estimated": None, + "cache_breakpoint_count": 0, + "cacheable_tokens_estimated": 0, + "raw_request_emitted": False, + } + breakpoints, parse_meta = extract_cache_breakpoints(request) + fingerprints, redactions = build_fingerprints(breakpoints, b"\0" * 32) + cacheable_tokens = max((int(fp.get("tokens_estimated") or 0) for fp in fingerprints), default=0) + return { + "present": True, + "token_proxy": f"chars_div_{TOKEN_PROXY_CHARS_PER_TOKEN}", + "prompt_tokens_estimated": token_proxy_obj(strip_known_cache_controls(request)), + "cache_breakpoint_count": len(breakpoints), + "cacheable_tokens_estimated": cacheable_tokens, + "cache_control_markers": int(parse_meta.get("cache_control_markers") or 0), + "unsupported_cache_controls": int(parse_meta.get("unsupported_cache_controls") or 0), + "secret_like_values_detected": redactions, + "raw_request_emitted": False, + } + + +def route_task_metadata(workload: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]: + task = route_nested_dict(workload, "task", "task_metadata", "routing") + telemetry = route_nested_dict(workload, "telemetry") + latency = route_choice( + getattr(args, "latency_class", None) + or first_present_mapping_value(task, workload, keys=("latency_class", "latency", "mode")), + ROUTE_ALLOWED_LATENCY_CLASSES, + ) + risk = route_choice( + getattr(args, "risk", None) + or first_present_mapping_value(task, workload, keys=("risk", "risk_level")), + ROUTE_ALLOWED_RISK_LEVELS, + ) + quality_gate = route_choice( + getattr(args, "quality_gate", None) + or first_present_mapping_value(task, workload, telemetry, keys=("quality_gate", "quality")), + ROUTE_ALLOWED_QUALITY_GATES, + ) + task_kind = advisory_label( + getattr(args, "task_kind", None) + or first_present_mapping_value(task, workload, keys=("task_kind", "kind", "type")), + default="unknown", + limit=48, + ).lower() + deadline_seconds = safe_int(first_present_mapping_value(task, workload, keys=("deadline_seconds", "max_latency_seconds")), 0) + return { + "latency_class": latency, + "risk": risk, + "quality_gate": quality_gate, + "task_kind": task_kind, + "deadline_seconds": deadline_seconds, + "requires_interaction": bool(route_bool(first_present_mapping_value(task, workload, keys=("requires_interaction", "interactive_required", "user_blocking")))), + "has_external_side_effects": bool(route_bool(first_present_mapping_value(task, workload, keys=("has_external_side_effects", "side_effects")))), + "order_sensitive": bool(route_bool(first_present_mapping_value(task, workload, keys=("order_sensitive", "requires_order")))), + } + + +def total_cost_accounting_for_route( + workload: dict[str, Any], + *, + profile: dict[str, Any], + model: str, + exchange: float, +) -> dict[str, Any]: + telemetry = route_nested_dict(workload, "telemetry") + shifted = route_nested_dict(workload, "shifted_costs", "shifted_cost", "auxiliary_costs") + usage = route_usage_object(workload) + usage_cost = cost_from_usage(usage, profile=profile, model=model, exchange=exchange) if usage_has_measured_tokens(usage) else None + + primary_cost = first_nonnegative_cost( + telemetry, + workload, + keys=("primary_cost_usd", "provider_cost_usd", "observed_cost_usd", "cost_usd"), + ) + primary_source = "explicit_telemetry" if primary_cost is not None else "unavailable" + if primary_cost is None and usage_cost is not None: + primary_cost = float(usage_cost["cost_usd"]) + primary_source = "estimated_from_provider_usage_fields" + if primary_cost is None: + primary_cost = 0.0 + + external_cost_value = first_nonnegative_cost(telemetry, shifted, workload, keys=("external_cost_usd",)) + external_component_sum, external_components = sum_nonnegative_costs_from( + telemetry, + shifted, + keys=("subagent_cost_usd", "embedding_cost_usd", "reranker_cost_usd", "tool_call_cost_usd", "retry_cost_usd", "auxiliary_provider_cost_usd"), + ) + external_cost_from_aggregate = external_cost_value is not None + if external_cost_value is None: + external_cost = external_component_sum + else: + external_cost = external_cost_value + + local_cost_value = first_nonnegative_cost( + telemetry, + shifted, + workload, + keys=("local_cost_usd", "self_hosted_cost_usd", "local_model_cost_usd"), + ) + local_component_sum, local_components = sum_nonnegative_costs_from( + telemetry, + shifted, + keys=("local_server_cost_usd", "local_energy_cost_usd", "storage_cost_usd"), + ) + local_cost_from_aggregate = local_cost_value is not None + if local_cost_value is None: + local_cost = local_component_sum + else: + local_cost = local_cost_value + + provided_total = first_nonnegative_cost( + telemetry, + shifted, + workload, + keys=("total_cost_with_shift_usd", "total_shifted_cost_usd"), + ) + computed_total = primary_cost + external_cost + local_cost + total = provided_total if provided_total is not None else computed_total + external_tokens = safe_int(first_present_mapping_value(telemetry, shifted, workload, keys=("external_tokens", "subagent_tokens", "embedding_tokens")), 0) + retry_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("retry_count", "retries")), 0) + subagent_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("subagent_count", "subagents")), 0) + tool_call_count = safe_int(first_present_mapping_value(telemetry, workload, keys=("tool_call_count", "tool_calls")), 0) + external_cost_supplied = external_cost_from_aggregate or bool(external_components) + local_cost_supplied = local_cost_from_aggregate or bool(local_components) + provided_total_supplied = provided_total is not None + missing_shifted_cost = bool( + (external_tokens or retry_count or subagent_count or tool_call_count) + and not (external_cost_supplied or local_cost_supplied or provided_total_supplied) + ) + return { + "currency": "USD", + "primary_cost_usd": round(primary_cost, 8), + "primary_cost_source": primary_source, + "external_cost_usd": round(external_cost, 8), + "local_cost_usd": round(local_cost, 8), + "external_cost_supplied": external_cost_supplied, + "local_cost_supplied": local_cost_supplied, + "external_component_breakdown_usd": round(external_component_sum, 8), + "local_component_breakdown_usd": round(local_component_sum, 8), + "computed_total_cost_with_shift_usd": round(computed_total, 8), + "total_cost_with_shift_usd": round(total, 8), + "total_cost_with_shift_krw": round(krw(total, exchange), 2), + "provided_total_cost_with_shift_usd": round(provided_total, 8) if provided_total is not None else None, + "pricing": { + "profile": str(profile.get("name") or "custom"), + "release_recheck_required": bool(profile.get("release_recheck_required", True)), + "source_urls": profile.get("source_urls", [ANTHROPIC_DOCS_URL, ANTHROPIC_PRICING_URL]), + "usd_to_krw": exchange, + }, + "usage_cost_estimate": usage_cost, + "components_observed": sorted(set(external_components + local_components)), + "run_counters": { + "external_tokens": external_tokens, + "retry_count": retry_count, + "subagent_count": subagent_count, + "tool_call_count": tool_call_count, + }, + "measurement_availability": { + "provider_usage_tokens": usage_has_measured_tokens(usage), + "primary_cost": primary_source != "unavailable", + "external_cost": external_cost_supplied, + "local_cost": local_cost_supplied, + "shifted_cost": bool(external_cost_supplied or local_cost_supplied or provided_total_supplied), + }, + "shifted_cost_accounting": { + "required": True, + "diagnostic_only": True, + "includes_external_or_local_components": bool(external_cost_supplied or local_cost_supplied), + "missing_shifted_cost_warning": missing_shifted_cost, + "claim_boundary": "total-cost routing is advisory; hosted savings claims require matched successful tasks with non-inferior quality and measured shifted costs", + }, + } + + +def batchability_for_route(task: dict[str, Any], provider_features: dict[str, Any]) -> dict[str, Any]: + feature = provider_features["features"]["batch_api"] + batch_supported = feature["supported"] + blockers: list[str] = [] + reasons: list[str] = [] + latency = str(task.get("latency_class") or "unknown") + deadline = int(task.get("deadline_seconds") or 0) + if latency == "interactive": + blockers.append("interactive_latency") + elif latency in {"async", "batch", "offline"}: + reasons.append(f"latency_class_{latency}") + elif deadline >= 3600: + reasons.append("deadline_allows_batch_window") + else: + reasons.append("latency_unknown") + if task.get("requires_interaction"): + blockers.append("requires_user_interaction") + if task.get("has_external_side_effects"): + blockers.append("external_side_effects_need_idempotency_review") + if task.get("order_sensitive"): + blockers.append("order_sensitive") + if task.get("risk") == "high": + blockers.append("high_risk_route") + if task.get("quality_gate") == "fail": + blockers.append("quality_gate_failed") + if batch_supported is False: + blockers.append("provider_batch_api_not_declared") + elif batch_supported is None: + reasons.append("provider_batch_api_unknown_recheck_required") + else: + reasons.append("provider_batch_api_declared") + if blockers: + level = "not_recommended" + eligible = False + elif batch_supported is True and (latency in {"async", "batch", "offline"} or deadline >= 3600): + level = "candidate" + eligible = True + else: + level = "conditional" + eligible = False + return { + "eligible": eligible, + "level": level, + "latency_class": latency, + "deadline_seconds": deadline, + "reasons": sorted(set(reasons)), + "blockers": sorted(set(blockers)), + "requires_current_provider_docs_check": batch_supported is None, + } + + +def recommendation( + rec_id: str, + *, + decision: str, + priority: str, + rationale: str, + prerequisites: list[str], +) -> dict[str, Any]: + return { + "id": rec_id, + "decision": decision, + "priority": priority, + "rationale": rationale, + "prerequisites": prerequisites, + "claim_boundary": "candidate routing advice only; validate on matched successful tasks before claiming token or cost savings", + } + + +def route_recommendations( + *, + task: dict[str, Any], + provider_features: dict[str, Any], + request_profile: dict[str, Any], + batchability: dict[str, Any], + total_cost: dict[str, Any], +) -> list[dict[str, Any]]: + recs: list[dict[str, Any]] = [ + recommendation( + "measure-before-claim", + decision="required", + priority="P0", + rationale="Route changes can shift work into retries, subagents, batch queues, local servers, or provider cache writes; measure total cost with quality gates before claims.", + prerequisites=["matched_successful_tasks", "non_inferior_quality", "shifted_cost_accounting"], + ) + ] + batch_decision = "candidate" if batchability.get("eligible") else str(batchability.get("level") or "conditional") + recs.append( + recommendation( + "use-batch-api-for-noninteractive-work", + decision=batch_decision, + priority="P1" if batch_decision == "candidate" else "P2", + rationale="Batch APIs can reduce cost for non-interactive work only when provider support, latency tolerance, idempotency, and quality gates are satisfied.", + prerequisites=["provider_batch_support_current", "async_or_offline_latency", "idempotency_review", "matched_replay"], + ) + ) + + prompt_cache_feature = provider_features["features"]["prompt_cache"]["supported"] + cache_breakpoints = int(request_profile.get("cache_breakpoint_count") or 0) + cacheable_tokens = int(request_profile.get("cacheable_tokens_estimated") or 0) + if prompt_cache_feature is False: + cache_decision = "not_recommended" + elif cache_breakpoints or cacheable_tokens: + cache_decision = "candidate" if prompt_cache_feature is True else "conditional" + else: + cache_decision = "needs_request_evidence" + recs.append( + recommendation( + "preserve-prompt-cache-prefix", + decision=cache_decision, + priority="P1" if cache_decision == "candidate" else "P2", + rationale="Stable-prefix prompt caching is useful only when current provider support and repeated cacheable request prefixes are verified.", + prerequisites=["stable_prefix_first", "volatile_tail", "provider_usage_cache_telemetry"], + ) + ) + + structured_feature = provider_features["features"]["structured_outputs"]["supported"] + task_kind = str(task.get("task_kind") or "unknown") + if structured_feature is False: + structured_decision = "not_recommended" + elif task_kind in ROUTE_STRUCTURED_TASK_KINDS: + structured_decision = "candidate" if structured_feature is True else "conditional" + else: + structured_decision = "needs_task_fit" + recs.append( + recommendation( + "use-structured-outputs-when-task-fits", + decision=structured_decision, + priority="P2", + rationale="Structured outputs can reduce retries and parsing repairs for extraction/classification style work, but they are not a token-savings proof.", + prerequisites=["schema_fit_review", "retry_rate_measurement", "quality_non_regression"], + ) + ) + + lower_cost_feature = provider_features["features"]["lower_cost_models"]["supported"] + risk = str(task.get("risk") or "unknown") + quality_gate = str(task.get("quality_gate") or "unknown") + if lower_cost_feature is False or risk == "high" or quality_gate == "fail": + cheaper_decision = "not_recommended" + elif risk == "low" and quality_gate in {"pass", "unknown"}: + cheaper_decision = "candidate" if lower_cost_feature is True else "conditional" + else: + cheaper_decision = "conditional" + recs.append( + recommendation( + "evaluate-cheaper-model-route", + decision=cheaper_decision, + priority="P2", + rationale="Lower-cost model routing is acceptable only for low-risk or well-gated work and must include corrections, retries, and shifted cost.", + prerequisites=["risk_tier_low_or_reviewed", "matched_replay", "corrections_guardrail", "retry_cost_accounting"], + ) + ) + + if total_cost["shifted_cost_accounting"].get("missing_shifted_cost_warning"): + recs.append( + recommendation( + "record-missing-shifted-costs", + decision="required", + priority="P1", + rationale="Telemetry indicates external tokens, retries, or subagents but no shifted external/local cost component was supplied.", + prerequisites=["external_cost_usd_or_local_cost_usd", "retry_or_subagent_cost_measurement"], + ) + ) + return recs + + +def route_advisor_command(args: argparse.Namespace) -> int: + workload_raw, _truncated = load_json_input(args.workload, max_bytes=args.max_bytes) + workload = require_json_object(workload_raw.get("workload") if isinstance(workload_raw, dict) and isinstance(workload_raw.get("workload"), dict) else workload_raw, "workload") + profile = load_pricing_profile(args.pricing_profile, max_bytes=args.max_bytes) + if args.usd_to_krw is not None: + profile["usd_to_krw"] = usd_to_krw(profile, args.usd_to_krw) + exchange = usd_to_krw(profile, None) + request = workload.get("request") if isinstance(workload.get("request"), dict) else {} + provider = advisory_label(getattr(args, "provider", None) or workload.get("provider") or (request.get("provider") if isinstance(request, dict) else None)) + model_raw = getattr(args, "model", None) or workload.get("model") or (request.get("model") if isinstance(request, dict) else None) + model = route_model_label(model_raw) + model_for_pricing = route_model_for_pricing(model_raw, model) + provider_features = provider_features_for_workload(workload, args) + task = route_task_metadata(workload, args) + request_profile = request_profile_for_route(workload) + total_cost = total_cost_accounting_for_route(workload, profile=profile, model=model_for_pricing, exchange=exchange) + batchability = batchability_for_route(task, provider_features) + recommendations = route_recommendations( + task=task, + provider_features=provider_features, + request_profile=request_profile, + batchability=batchability, + total_cost=total_cost, + ) + report = { + "schema_version": SCHEMA_VERSION, + "tool": TOOL_NAME, + "mode": "route_advisor", + "provider": { + "name": provider, + "model": model, + "feature_matrix_authoritative": False, + "feature_recheck_required": True, + }, + "provider_features": provider_features, + "task": task, + "request_profile": request_profile, + "total_cost_accounting": total_cost, + "batchability": batchability, + "route_recommendations": recommendations, + "routing_decision": { + "best_current_action": "measure_before_claim" if any(rec["decision"] == "required" for rec in recommendations) else "review_candidates", + "candidate_count": sum(1 for rec in recommendations if rec.get("decision") == "candidate"), + "conditional_count": sum(1 for rec in recommendations if rec.get("decision") == "conditional"), + "not_recommended_count": sum(1 for rec in recommendations if rec.get("decision") == "not_recommended"), + }, + "claim_boundary": { + "hosted_api_token_savings_claim_allowed": False, + "hosted_api_cost_savings_claim_allowed": False, + "requires_matched_successful_tasks": True, + "requires_non_inferior_quality": True, + "requires_shifted_cost_accounting": True, + "provider_features_are_caller_supplied_or_unknown": True, + }, + "privacy": { + "raw_prompt_emitted": False, + "raw_request_emitted": False, + "raw_paths_emitted": False, + "workload_stored": False, + "provider_call_performed": False, + "queue_started": False, + }, + } + emit(report, json_mode=args.json) + return 0 + + def usage_int(data: dict[str, Any], key: str) -> int: value = data.get(key, 0) try: @@ -2282,6 +3030,15 @@ def emit(data: dict[str, Any], *, json_mode: bool) -> None: elif mode == "compile": findings = data.get("findings", []) if isinstance(data.get("findings"), list) else [] print(f"{TOOL_NAME}: compile findings={len(findings)}") + elif mode == "route_advisor": + batchability = data.get("batchability", {}) if isinstance(data.get("batchability"), dict) else {} + routing = data.get("routing_decision", {}) if isinstance(data.get("routing_decision"), dict) else {} + total = data.get("total_cost_accounting", {}) if isinstance(data.get("total_cost_accounting"), dict) else {} + print( + f"{TOOL_NAME}: route-advisor batch={batchability.get('level', 'unknown')} " + f"candidates={routing.get('candidate_count', 0)} conditional={routing.get('conditional_count', 0)} " + f"total_with_shift=${total.get('total_cost_with_shift_usd', 0)}" + ) else: summary = data.get("summary", {}) if isinstance(data.get("summary"), dict) else {} print(f"{TOOL_NAME}: ledger entries={summary.get('entries', 0)}") @@ -2335,6 +3092,22 @@ def build_parser() -> argparse.ArgumentParser: compile_parser.add_argument("--json", action="store_true", help="emit machine-readable JSON") compile_parser.set_defaults(func=compile_command) + route = sub.add_parser( + "route-advisor", + help="advise on batchability, provider features, total cost, and route candidates", + description="advise on batchability, provider features, total cost, and route candidates without provider calls or queue runtime", + ) + route.add_argument("--workload", default="-", help="workload JSON path, or '-' for stdin") + route.add_argument("--provider", help="provider label override; advisory only") + route.add_argument("--model", help="model label override for pricing lookup; advisory only") + route.add_argument("--feature", action="append", default=[], help="provider feature override such as batch_api=true or structured_outputs=false") + route.add_argument("--latency-class", choices=sorted(ROUTE_ALLOWED_LATENCY_CLASSES), help="latency class override") + route.add_argument("--risk", choices=sorted(ROUTE_ALLOWED_RISK_LEVELS), help="risk tier override") + route.add_argument("--quality-gate", choices=sorted(ROUTE_ALLOWED_QUALITY_GATES), help="quality gate override") + route.add_argument("--task-kind", help="task kind label such as extract, summarize, code_edit, or unknown") + add_common_cost_args(route) + route.set_defaults(func=route_advisor_command) + return parser diff --git a/plugins/context-guard/lib/context_guard_commands.py b/plugins/context-guard/lib/context_guard_commands.py index c5de196..efbda3e 100644 --- a/plugins/context-guard/lib/context_guard_commands.py +++ b/plugins/context-guard/lib/context_guard_commands.py @@ -82,6 +82,8 @@ "tool-prune": ("context-guard-tool-prune",), "compress": ("context-guard-compress",), "cost": ("context-guard-cost",), + "route-advisor": ("context-guard-cost", "route-advisor"), + "route": ("context-guard-cost", "route-advisor"), "cache-score": ("context-guard-cache-score",), "bench": ("context-guard-bench",), "read-symbol": ("context-guard-read-symbol",), @@ -186,6 +188,7 @@ DISPATCHER_SMOKE_CASES: tuple[dict[str, Any], ...] = ( {"entrypoint": "context-guard", "args": ["experiments", "list", "--json"], "mode": "json"}, {"entrypoint": "context-guard", "args": ["cost", "--help"], "mode": "text"}, + {"entrypoint": "context-guard", "args": ["route-advisor", "--help"], "mode": "text"}, {"entrypoint": "context-guard", "args": ["cache-score", "--help"], "mode": "text"}, {"entrypoint": "context-guard-pack", "args": ["suggest", "--help"], "mode": "text"}, {"entrypoint": "context-guard-pack", "args": ["auto", "--help"], "mode": "text"}, diff --git a/tests/test_context_guard_kit.py b/tests/test_context_guard_kit.py index f283951..0a4141c 100644 --- a/tests/test_context_guard_kit.py +++ b/tests/test_context_guard_kit.py @@ -6394,6 +6394,8 @@ def write_target(target: str, path: Path) -> None: data = {"sections": [{"id": "stable", "ttl": "1h", "bytes": 10}]} elif target == "pricing": data = cost_guard_pricing() + elif target == "workload": + data = {"provider": "generic", "task": {"latency_class": "async"}, "provider_features": {"batch_api": True}} else: # pragma: no cover - defensive test helper guard. raise AssertionError(target) path.write_text(json.dumps(data), encoding="utf-8") @@ -6411,10 +6413,12 @@ def run_target(script: Path, target: str, path: Path, tmp: Path) -> subprocess.C ["preflight", "--pricing-profile", str(path), "--store-dir", str(tmp / "ledger"), "--json"], {"model": "claude-sonnet-4-5", "messages": [{"role": "user", "content": "hi"}]}, ) + if target == "workload": + return run_cost_guard(script, ["route-advisor", "--workload", str(path), "--json"]) raise AssertionError(target) # pragma: no cover for script in COST_GUARD_SCRIPTS: - for target in ("request", "usage", "manifest", "pricing"): + for target in ("request", "usage", "manifest", "pricing", "workload"): with self.subTest(script=script, target=target, shape="final-symlink"): with tempfile.TemporaryDirectory() as tmp_raw: tmp = Path(tmp_raw) @@ -6469,6 +6473,7 @@ def test_cost_guard_json_file_inputs_are_bounded_and_accept_normal_files(self): "usage": {"model": "claude-sonnet-4-5", "usage": {"input_tokens": 10, "output_tokens": 1}}, "manifest": {"sections": [{"id": "stable", "ttl": "1h", "bytes": 10}]}, "pricing": cost_guard_pricing(), + "workload": {"provider": "generic", "task": {"latency_class": "async"}, "provider_features": {"batch_api": True}}, } def run_regular(script: Path, target: str, path: Path, tmp: Path, extra: list[str] | None = None) -> subprocess.CompletedProcess[str]: @@ -6485,6 +6490,8 @@ def run_regular(script: Path, target: str, path: Path, tmp: Path, extra: list[st ["preflight", "--pricing-profile", str(path), "--store-dir", str(tmp / "ledger"), "--json", *extra], {"model": "x"}, ) + if target == "workload": + return run_cost_guard(script, ["route-advisor", "--workload", str(path), "--json", *extra]) raise AssertionError(target) # pragma: no cover for script in COST_GUARD_SCRIPTS: @@ -6901,6 +6908,258 @@ def test_cost_guard_observe_requires_provider_tokens_to_cover_breakpoint(self): self.assertEqual(preflight_payload["cache_risk"]["summary"]["predicted_miss"], 1) self.assertEqual(preflight_payload["cache_risk"]["summary"]["predicted_hit"], 0) + def test_cost_guard_route_advisor_accounts_for_shifted_costs_and_batch_candidates(self): + sentinel = "UNIQUE_ROUTE_RAW_PROMPT_SENTINEL" + private_path = "/Users/example/private/route-secret.txt" + workload = { + "provider": "openai", + "model": "claude-sonnet-4-5", + "provider_features": { + "batch_api": True, + "prompt_cache": True, + "structured_outputs": True, + "lower_cost_models": True, + }, + "task": { + "latency_class": "async", + "risk": "low", + "quality_gate": "pass", + "task_kind": "extract", + "requires_interaction": False, + }, + "request": cost_guard_request(cacheable_text=f"stable prefix {sentinel} {private_path} " + ("x" * 1000)), + "usage": { + "input_tokens": 1000, + "output_tokens": 200, + "cache_creation_input_tokens": 300, + "cache_read_input_tokens": 700, + }, + "telemetry": { + "external_cost_usd": 0.25, + "local_cost_usd": 0.05, + "external_tokens": 500, + "retry_count": 1, + "subagent_count": 2, + }, + } + for script in COST_GUARD_SCRIPTS: + with self.subTest(script=script): + proc = run_cost_guard( + script, + ["route-advisor", "--pricing-profile", json.dumps(cost_guard_pricing()), "--json"], + workload, + ) + self.assertEqual(proc.returncode, 0, proc.stderr) + for forbidden in (sentinel, private_path): + self.assertNotIn(forbidden, proc.stdout) + payload = json.loads(proc.stdout) + self.assertEqual(payload["mode"], "route_advisor") + self.assertEqual(payload["provider"]["name"], "openai") + self.assertFalse(payload["provider"]["feature_matrix_authoritative"]) + self.assertTrue(payload["provider"]["feature_recheck_required"]) + self.assertTrue(payload["provider_features"]["features"]["batch_api"]["supported"]) + self.assertEqual(payload["batchability"]["level"], "candidate") + self.assertTrue(payload["batchability"]["eligible"]) + total = payload["total_cost_accounting"] + self.assertEqual(total["external_cost_usd"], 0.25) + self.assertEqual(total["local_cost_usd"], 0.05) + self.assertTrue(total["measurement_availability"]["shifted_cost"]) + self.assertTrue(total["shifted_cost_accounting"]["required"]) + self.assertTrue(total["pricing"]["release_recheck_required"]) + self.assertEqual(total["pricing"]["profile"], "unit-test-pricing") + self.assertGreater(total["total_cost_with_shift_usd"], 0.30) + recs = {item["id"]: item for item in payload["route_recommendations"]} + self.assertEqual(recs["use-batch-api-for-noninteractive-work"]["decision"], "candidate") + self.assertEqual(recs["preserve-prompt-cache-prefix"]["decision"], "candidate") + self.assertEqual(recs["use-structured-outputs-when-task-fits"]["decision"], "candidate") + self.assertEqual(recs["evaluate-cheaper-model-route"]["decision"], "candidate") + self.assertFalse(payload["claim_boundary"]["hosted_api_token_savings_claim_allowed"]) + self.assertFalse(payload["privacy"]["provider_call_performed"]) + self.assertFalse(payload["privacy"]["queue_started"]) + + def test_cost_guard_route_advisor_is_conservative_for_interactive_unknown_features(self): + workload = { + "provider": "generic", + "task": { + "latency_class": "interactive", + "risk": "high", + "quality_gate": "fail", + "task_kind": "code_edit", + "requires_interaction": True, + "has_external_side_effects": True, + }, + "telemetry": { + "external_tokens": 2000, + "retry_count": 1, + "subagent_count": 1, + }, + } + proc = run_cost_guard(KIT_DIR / "cost_guard.py", ["route-advisor", "--json"], workload) + self.assertEqual(proc.returncode, 0, proc.stderr) + payload = json.loads(proc.stdout) + self.assertEqual(payload["batchability"]["level"], "not_recommended") + self.assertIn("interactive_latency", payload["batchability"]["blockers"]) + self.assertIn("requires_user_interaction", payload["batchability"]["blockers"]) + self.assertTrue(payload["provider_features"]["recheck_required"]) + self.assertEqual(payload["provider_features"]["features"]["batch_api"]["supported"], None) + total = payload["total_cost_accounting"] + self.assertTrue(total["shifted_cost_accounting"]["missing_shifted_cost_warning"]) + recs = {item["id"]: item for item in payload["route_recommendations"]} + self.assertEqual(recs["evaluate-cheaper-model-route"]["decision"], "not_recommended") + self.assertEqual(recs["record-missing-shifted-costs"]["decision"], "required") + self.assertNotIn("guaranteed", proc.stdout.lower()) + self.assertNotIn("ContextGuard-caused savings", proc.stdout) + + def test_cost_guard_route_advisor_does_not_double_count_aggregate_shifted_costs(self): + workload = { + "provider": "generic", + "task": {"latency_class": "async", "risk": "low", "quality_gate": "pass"}, + "provider_features": {"batch_api": True}, + "telemetry": { + "primary_cost_usd": 1.0, + "external_cost_usd": 2.0, + "subagent_cost_usd": 0.75, + "embedding_cost_usd": 0.25, + "local_cost_usd": 3.0, + "local_server_cost_usd": 0.5, + "local_energy_cost_usd": 0.25, + }, + } + proc = run_cost_guard(KIT_DIR / "cost_guard.py", ["route-advisor", "--json"], workload) + self.assertEqual(proc.returncode, 0, proc.stderr) + total = json.loads(proc.stdout)["total_cost_accounting"] + self.assertEqual(total["primary_cost_usd"], 1.0) + self.assertEqual(total["external_cost_usd"], 2.0) + self.assertEqual(total["external_component_breakdown_usd"], 1.0) + self.assertEqual(total["local_cost_usd"], 3.0) + self.assertEqual(total["local_component_breakdown_usd"], 0.75) + self.assertEqual(total["computed_total_cost_with_shift_usd"], 6.0) + self.assertEqual(total["total_cost_with_shift_usd"], 6.0) + + def test_cost_guard_route_advisor_falls_back_past_invalid_cost_fields_and_tracks_zero(self): + workload = { + "provider": "generic", + "task": {"latency_class": "async", "risk": "low"}, + "telemetry": { + "external_cost_usd": None, + "local_cost_usd": 0, + "external_tokens": 500, + "subagent_count": 1, + "subagent_cost_usd": "not-a-number", + }, + "shifted_costs": { + "external_cost_usd": 0.25, + "subagent_cost_usd": 0.10, + "local_server_cost_usd": 0.20, + }, + } + proc = run_cost_guard(KIT_DIR / "cost_guard.py", ["route-advisor", "--json"], workload) + self.assertEqual(proc.returncode, 0, proc.stderr) + total = json.loads(proc.stdout)["total_cost_accounting"] + self.assertEqual(total["external_cost_usd"], 0.25) + self.assertEqual(total["external_component_breakdown_usd"], 0.10) + self.assertEqual(total["local_cost_usd"], 0) + self.assertEqual(total["local_component_breakdown_usd"], 0.20) + self.assertTrue(total["external_cost_supplied"]) + self.assertTrue(total["local_cost_supplied"]) + self.assertTrue(total["measurement_availability"]["external_cost"]) + self.assertTrue(total["measurement_availability"]["local_cost"]) + self.assertTrue(total["measurement_availability"]["shifted_cost"]) + self.assertFalse(total["shifted_cost_accounting"]["missing_shifted_cost_warning"]) + + def test_cost_guard_route_advisor_flags_tool_calls_without_shifted_costs(self): + workload = { + "provider": "generic", + "task": {"latency_class": "async", "risk": "low"}, + "telemetry": { + "primary_cost_usd": 0.01, + "tool_call_count": 3, + }, + } + proc = run_cost_guard(KIT_DIR / "cost_guard.py", ["route-advisor", "--json"], workload) + self.assertEqual(proc.returncode, 0, proc.stderr) + payload = json.loads(proc.stdout) + total = payload["total_cost_accounting"] + self.assertEqual(total["run_counters"]["tool_call_count"], 3) + self.assertTrue(total["shifted_cost_accounting"]["missing_shifted_cost_warning"]) + self.assertIn("record-missing-shifted-costs", [rec["id"] for rec in payload["route_recommendations"]]) + + def test_cost_guard_route_advisor_invalid_feature_error_redacts_raw_key(self): + private_feature = "/Users/example/private/sk-ant-route-feature-secret" + proc = run_cost_guard( + KIT_DIR / "cost_guard.py", + ["route-advisor", "--feature", f"{private_feature}=true", "--json"], + {"provider": "generic"}, + ) + self.assertEqual(proc.returncode, 2) + combined = proc.stdout + proc.stderr + self.assertIn("unknown route feature", combined) + self.assertIn("redacted", combined) + self.assertNotIn(private_feature, combined) + self.assertNotIn("sk-ant-route-feature-secret", combined) + + def test_cost_guard_route_advisor_preserves_provider_qualified_model_for_pricing(self): + pricing = { + "name": "slash-model-pricing", + "default_input_usd_per_mtok": 99, + "default_output_usd_per_mtok": 99, + "models": { + "openai/gpt-4o": {"input_usd_per_mtok": 2, "output_usd_per_mtok": 10}, + }, + } + proc = run_cost_guard( + KIT_DIR / "cost_guard.py", + ["route-advisor", "--pricing-profile", json.dumps(pricing), "--usd-to-krw", "2", "--json"], + {"model": "openai/gpt-4o", "usage": {"input_tokens": 1_000_000, "output_tokens": 0}}, + ) + self.assertEqual(proc.returncode, 0, proc.stderr) + payload = json.loads(proc.stdout) + self.assertEqual(payload["provider"]["model"], "openai/gpt-4o") + estimate = payload["total_cost_accounting"]["usage_cost_estimate"] + self.assertEqual(estimate["model_rate_key"], "openai/gpt-4o") + self.assertEqual(estimate["cost_usd"], 2.0) + self.assertEqual(payload["total_cost_accounting"]["pricing"]["usd_to_krw"], 2.0) + self.assertEqual(payload["total_cost_accounting"]["total_cost_with_shift_krw"], 4.0) + + def test_cost_guard_route_advisor_redacts_path_like_model_label(self): + private_model_path = "/Users/example/private/model-name" + proc = run_cost_guard( + KIT_DIR / "cost_guard.py", + ["route-advisor", "--json"], + {"model": private_model_path, "usage": {"input_tokens": 1}}, + ) + self.assertEqual(proc.returncode, 0, proc.stderr) + combined = proc.stdout + proc.stderr + self.assertNotIn(private_model_path, combined) + payload = json.loads(proc.stdout) + self.assertEqual(payload["provider"]["model"], "path-redacted") + + def test_cost_guard_route_advisor_redacts_relative_model_file_path(self): + private_model_path = "weights/customer-model.gguf" + proc = run_cost_guard( + KIT_DIR / "cost_guard.py", + ["route-advisor", "--json"], + {"model": private_model_path, "usage": {"input_tokens": 1}}, + ) + self.assertEqual(proc.returncode, 0, proc.stderr) + combined = proc.stdout + proc.stderr + self.assertNotIn(private_model_path, combined) + payload = json.loads(proc.stdout) + self.assertEqual(payload["provider"]["model"], "path-redacted") + + def test_context_guard_route_advisor_dispatcher_help_routes_to_cost_helper(self): + for dispatcher in (KIT_DIR / "context_guard_cli.py", PLUGIN_BIN / "context-guard"): + with self.subTest(dispatcher=dispatcher): + proc = subprocess.run( + [sys.executable, str(dispatcher), "route-advisor", "--help"], + text=True, + capture_output=True, + check=True, + cwd=ROOT, + ) + self.assertIn("advise on batchability", proc.stdout) + self.assertIn("--workload", proc.stdout) + def test_cost_guard_model_pricing_prefers_specific_matches(self): module = load_module_from_path(KIT_DIR / "cost_guard.py", "cost_guard_rate_resolution_test") profile = { @@ -7953,6 +8212,7 @@ def test_cost_guard_release_gate_parity_surfaces_include_cost_helper(self): smoke = load_module_from_path(ROOT / "scripts" / "release_smoke.py", "release_smoke_cost_test") self.assertEqual(smoke.ENTRYPOINT_SMOKE_COMMANDS["context-guard-cost"]["args"], ["--help"]) self.assertIn({"entrypoint": "context-guard", "args": ["cost", "--help"], "mode": "text"}, smoke.DISPATCHER_SMOKE_COMMANDS) + self.assertIn({"entrypoint": "context-guard", "args": ["route-advisor", "--help"], "mode": "text"}, smoke.DISPATCHER_SMOKE_COMMANDS) self.assertIn({"entrypoint": "context-guard-pack", "args": ["suggest", "--help"], "mode": "text"}, smoke.DISPATCHER_SMOKE_COMMANDS) self.assertEqual(smoke.npm_dispatcher_smoke_plan(), smoke.DISPATCHER_SMOKE_COMMANDS)