diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f85c734..8307066a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,57 @@ ## Unreleased +- **v0.20 — third-party adapter entry-point discovery (E4 from round-3 review).** + Opens the same extension surface for adapters (input loaders) that M5 + already opened for check plugins. Discovery is gated by the existing + `AGENTS_SHIPGATE_ENABLE_PLUGINS=1` env var and `--no-plugins` CLI flag. + - New entry-point group: `agents_shipgate.adapters`. A third-party + package declares an adapter class (or instance) in its + `pyproject.toml` under + `[project.entry-points."agents_shipgate.adapters"]`; the class must + satisfy the `ToolSourceAdapter` Protocol — `source_type` ClassVar, + `scope` ClassVar (`per_source` or `per_scan`), `artifact_class` + ClassVar, and a `load(source, base_dir, manifest)` method. + - New module `src/agents_shipgate/inputs/adapter_validation.py` with + four load-time gates: `load_failed`, `bad_protocol`, `bad_scope`, + and **`source_type_collision`** — the load-bearing trust rule + rejecting any third-party adapter whose `source_type` shadows a + built-in or another already-registered third-party adapter. + - New top-level `discover_third_party_adapters(registry, *, + plugins_enabled, loaded_adapters)` in `inputs/protocol.py` walks + `entry_points("agents_shipgate.adapters")`, validates each entry, + and registers the valid ones onto the supplied registry. Both + valid and invalid records surface in + `report.loaded_adapters[]` so reviewers can see what was skipped. + - New report field `loaded_adapters: list[dict[str, Any]]` parallel + to `loaded_plugins[]`. Items carry `name`, `value`, `distribution`, + `version`, `source_type`, `validation_status`, + `validation_errors[]`, `runtime_errors[]`. Required + present on + every emitted scan (empty list when `--no-plugins` or no + third-party adapters are installed). The schema generator marks + each item's eight fields as required. + - `--strict-plugins` (v0.17+) extended to cover adapter failures. + Any non-`valid` `loaded_adapters[]` row OR non-empty + `loaded_adapters[].runtime_errors` now elevates the scan to exit + code 4 alongside the existing plugin failures. + - `--no-plugins` flag help text updated to mention third-party + adapter discovery is also disabled. + - `run_validated_adapter` (in `adapter_validation.py`) provides a + runtime safety wrapper for callers that want to capture + exceptions into `loaded_adapters[].runtime_errors` instead of + propagating them. The dispatcher's existing `_absorb` artifact- + class check already fires `TypeError` for artifact smuggling; + runtime wrapping is opt-in for future adapter-execution paths. + - 21 new tests in `tests/test_adapter_entry_point_discovery.py`: + each of the four gates + valid-class + valid-instance + env-var + gating + `--no-plugins` overrides + collision-with-each-builtin + parametrize + collision-between-third-parties + `--strict-plugins` + end-to-end + runtime safety net (exception capture, wrong return + type, artifact smuggling). + - STABILITY.md gains a new "Third-party adapter discovery (v0.20+)" + subsection under "Trust-model invariants" documenting the four + gates + the `source_type_collision` load-bearing rule. + - **v0.20 — top-level `reviewer_summary` block.** Adds a deterministic projection of the reviewer lens surfaces (`tool_surface_diff`, capability/intent diff, `action_surface_diff`, evidence matrix) and diff --git a/STABILITY.md b/STABILITY.md index cd891dd1..92c04965 100644 --- a/STABILITY.md +++ b/STABILITY.md @@ -361,7 +361,7 @@ If a contributor introduces a real need for one of the forbidden surfaces, update this section in the same PR. The intent is not "we tried to forbid X" — it is that X is *structurally absent* from the scanner's parsing path. -Plugins are off by default. `AGENTS_SHIPGATE_ENABLE_PLUGINS=1` enables loading; `--no-plugins` overrides at the CLI level. When loaded, every plugin is enumerated in `report.loaded_plugins`. +Plugins are off by default. `AGENTS_SHIPGATE_ENABLE_PLUGINS=1` enables loading; `--no-plugins` overrides at the CLI level. When loaded, every plugin is enumerated in `report.loaded_plugins`, and every third-party adapter (v0.20+) is enumerated in `report.loaded_adapters`. Plugin validation (v0.17+ / M5). Every entry point is checked against five load-time gates before it can run: @@ -373,6 +373,33 @@ Plugin validation (v0.17+ / M5). Every entry point is checked against five load- Plugins that pass every gate run with the same trust as built-ins. Runtime validation additionally drops findings whose `Finding.check_id` does not match the plugin's declared `id`/`check_id`, drops non-`Finding` items, and captures any exception raised during the plugin call into `loaded_plugins[].runtime_errors`. The scan continues regardless; `--strict-plugins` elevates any non-`valid` plugin or non-empty `runtime_errors` to exit code 4. +#### Third-party adapter discovery (v0.20+) + +Third-party adapters register through the `agents_shipgate.adapters` Python entry-point group and provide a class (or instance) satisfying the `ToolSourceAdapter` Protocol — a `source_type: str` ClassVar, a `scope: Literal["per_source", "per_scan"]` ClassVar, an `artifact_class: type | None` ClassVar, and a `load(source, base_dir, manifest)` method returning `LoadedAdapterResult`. Discovery is gated by the same `AGENTS_SHIPGATE_ENABLE_PLUGINS=1` env var as plugin checks; `--no-plugins` forces it off. + +Every discovered entry point is checked against four load-time gates before it can register on the scan's adapter registry: + +1. **load** — `entry_point.load()` must not raise. Captured as `validation_status="load_failed"`. +2. **bad_protocol** — the loaded value (a class is instantiated with no args; an instance is used directly) must have all three ClassVars (`source_type` non-empty string, `scope`, `artifact_class`) and a callable `load` method that accepts the three positional arguments `(source, base_dir, manifest)`: at least three positional slots (or `*args`), no more than three required positional parameters, and no required keyword-only parameters. Captured as `validation_status="bad_protocol"`. +3. **bad_scope** — `scope` must be exactly `"per_source"` or `"per_scan"`. Out-of-range values would be silently skipped by the dispatcher. Captured as `validation_status="bad_scope"`. +4. **source_type_collision** — the adapter's `source_type` must not shadow a built-in (`mcp`, `openapi`, `langchain`, etc.) or another third-party adapter discovered earlier in the same scan. **This is the load-bearing trust rule** — without it, a malicious plugin could displace a built-in adapter and intercept every scan targeting that source type. Captured as `validation_status="source_type_collision"`. + +**Per-scan registry contract.** Adapters that pass every gate register on a **per-scan clone** of the global `REGISTRY` (built at the start of each `run_scan` / `inspect_sources` via `AdapterRegistry.clone()`), NOT on the global itself. The global stays builtin-only across the lifetime of the process. This guarantees two trust invariants: + +- **`--no-plugins` is per-scan honest.** A later in-process scan with `plugins_enabled=False` sees a fresh builtin-only clone — no third-party adapters carried over from a prior enabled scan. +- **Collision detection is per-scan honest.** The collision set is the clone's builtins-only state, so two consecutive scans of the same valid third-party adapter both classify as `validation_status="valid"`, never as `source_type_collision` against the adapter's own previous registration. + +The dispatcher walks the per-scan registry in the same pass-1 (per-source, in `tool_sources[]` declared order) / pass-2 (per-scan, in canonical registry order) loops it uses for built-ins. Two trust mechanisms protect the dispatch path: + +- **Artifact-class smuggling prevention.** The dispatcher's `_absorb` step fires `TypeError` if any adapter (built-in or third-party) declares one `artifact_class` but returns an artifact of another type. This is the structural counterpart to the `Finding.check_id` smuggling rule for plugin checks. +- **Runtime-error capture for third-party adapters.** Third-party adapters that raise at runtime do NOT abort the scan. The dispatcher routes their `load()` call through `run_validated_adapter` (from `inputs/adapter_validation.py`), which catches every exception, captures it into `loaded_adapters[].runtime_errors` on the matching row, and signals the dispatcher to skip absorbing the (None) result. Built-in adapters keep the direct call shape — a built-in raising means the scanner itself is broken and must abort loudly. + +`doctor` (`inspect_sources`) uses the same per-scan registry clone + discovery + dispatcher path as `scan`, so manifests referencing third-party `tool_sources[].type` values are introspectable. The doctor payload surfaces `loaded_adapters[]` alongside the existing `policy_packs` field. + +`--strict-plugins` (v0.17+) covers BOTH plugin and adapter failures from v0.20+ — any non-`valid` `loaded_plugins[]` row, any non-empty `loaded_plugins[].runtime_errors`, any non-`valid` `loaded_adapters[]` row, OR any non-empty `loaded_adapters[].runtime_errors` elevates the scan to exit code 4. Default behavior remains lenient — failures are recorded in the respective provenance arrays and the scan proceeds. + +**Manifest `tool_sources[].type`.** The field is `str` (relaxed from a closed `Literal` in v0.20) so manifests can reference third-party per-source adapters by name. Built-in source types are enumerated in `BUILTIN_TOOL_SOURCE_TYPES` for documentation and tooling; per-scan-only built-ins (`n8n`, `openai_api`, `anthropic_api`, `validation`) are still rejected at manifest-load time with a routable error pointing the user to the dedicated top-level manifest section. Unknown source types — both genuine third-party names with no registered adapter and typos of built-in names — fail with `ConfigError` (exit 2) when the dispatcher's `AdapterRegistry.require` cannot resolve them. The exit-2 contract is unchanged from prior releases; the failure layer (manifest-load vs dispatch) may differ. + ### Manifest Schema The manifest schema version (`version: "0.1"`) is independent of the CLI diff --git a/docs/diagnostics.md b/docs/diagnostics.md index c8d9f754..18e3b727 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -79,6 +79,7 @@ diagnostics where no command should run. | `SHIP-DIAG-MISSING-SOURCE-FILE` | block | A required `tool_sources[].path` does not resolve under the manifest directory. (`doctor` no longer raises `InputParseError(3)` for this — see below.) | | `SHIP-DIAG-CHANGE-ME-PLACEHOLDERS` | warn | Manifest text still contains `CHANGE_ME` markers. | | `SHIP-DIAG-NO-PRODUCTION-PERMISSIONS` | warn | `environment.target: production` AND no permissions / scopes / policies declared. | +| `SHIP-DIAG-UNKNOWN-ADAPTER-SOURCE-TYPE` | block | Manifest references a `tool_sources[].type` that no registered adapter handles. Rank-1 action depends on plugin state: enable plugin discovery (`AGENTS_SHIPGATE_ENABLE_PLUGINS=1`) and install the third-party adapter package, or fix a typo. v0.20+. | ## Negative-control precedence diff --git a/docs/manifest-v0.1.json b/docs/manifest-v0.1.json index cb9192d5..5d5eb301 100644 --- a/docs/manifest-v0.1.json +++ b/docs/manifest-v0.1.json @@ -1502,15 +1502,6 @@ "title": "Trust" }, "type": { - "enum": [ - "mcp", - "openapi", - "openai_agents_sdk", - "google_adk", - "langchain", - "crewai", - "codex_plugin" - ], "title": "Type", "type": "string" } diff --git a/docs/report-schema.v0.20.json b/docs/report-schema.v0.20.json index 7c971c5c..a63ead95 100644 --- a/docs/report-schema.v0.20.json +++ b/docs/report-schema.v0.20.json @@ -4251,6 +4251,24 @@ "title": "Generated Reports", "type": "object" }, + "loaded_adapters": { + "items": { + "additionalProperties": true, + "required": [ + "distribution", + "name", + "runtime_errors", + "source_type", + "validation_errors", + "validation_status", + "value", + "version" + ], + "type": "object" + }, + "title": "Loaded Adapters", + "type": "array" + }, "loaded_plugins": { "items": { "additionalProperties": true, @@ -4386,6 +4404,7 @@ "findings", "frameworks", "generated_reports", + "loaded_adapters", "loaded_plugins", "loaded_policy_packs", "misalignments", diff --git a/docs/report-sensitive-fields.json b/docs/report-sensitive-fields.json index 8691fe05..553e9c94 100644 --- a/docs/report-sensitive-fields.json +++ b/docs/report-sensitive-fields.json @@ -40,6 +40,7 @@ {"surface": "report", "path": "generated_reports", "classification": "path_metadata"}, {"surface": "report", "path": "loaded_policy_packs", "classification": "path_metadata"}, {"surface": "report", "path": "loaded_plugins", "classification": "path_metadata"}, + {"surface": "report", "path": "loaded_adapters", "classification": "path_metadata"}, {"surface": "report", "path": "tool_inventory", "classification": "credential_metadata"}, {"surface": "report", "path": "source_warnings", "classification": "free_text"}, {"surface": "report", "path": "agent_summary", "classification": "free_text"}, diff --git a/scripts/generate_schemas.py b/scripts/generate_schemas.py index 98f68600..96994775 100644 --- a/scripts/generate_schemas.py +++ b/scripts/generate_schemas.py @@ -221,6 +221,13 @@ def build_report_schema() -> tuple[Path, str]: "generated_reports", "loaded_policy_packs", "loaded_plugins", + # v0.20: third-party adapter provenance (parallels + # loaded_plugins[]). Optional in Python via + # ``Field(default_factory=list)`` for test-helper minimal + # reports; emitted scans always populate it (empty list + # when --no-plugins is set or no third-party adapters are + # installed). Required + non-nullable on the wire. + "loaded_adapters", "tool_inventory", "source_warnings", # v0.12: agent_summary is the deterministic top-level @@ -817,6 +824,29 @@ def build_report_schema() -> tuple[Path, str]: ] ), } + # v0.20: adapter validation provenance — parallel shape to + # loaded_plugins[] but the ID key is ``source_type`` (the dispatcher + # key) rather than ``check_id``. ``validation_status`` is one of + # ``valid | load_failed | bad_protocol | bad_scope | + # source_type_collision``; the two error lists are always present + # (empty for clean adapters). + if "loaded_adapters" in properties and properties["loaded_adapters"].get("type") == "array": + properties["loaded_adapters"]["items"] = { + "type": "object", + "additionalProperties": True, + "required": sorted( + [ + "name", + "value", + "distribution", + "version", + "source_type", + "validation_status", + "validation_errors", + "runtime_errors", + ] + ), + } # frameworks.{google_adk,langchain,crewai} surface counts. These are # also list[dict[str, Any]]-shaped at the model level; v0.5 enumerated diff --git a/src/agents_shipgate/cli/_helpers.py b/src/agents_shipgate/cli/_helpers.py index 76e2d34b..b89dd9e2 100644 --- a/src/agents_shipgate/cli/_helpers.py +++ b/src/agents_shipgate/cli/_helpers.py @@ -44,12 +44,31 @@ def _apply_strict_plugins( if not strict_plugins: return exit_code - messages = strict_failure_messages(report.loaded_plugins) + # v0.20: --strict-plugins covers BOTH check-plugin failures AND + # third-party adapter failures. Both surface through the same + # extension entry-point trust model (M5 for checks, v0.20 for + # adapters) and a CI step that opts into strictness wants to fail + # on either. Adapter messages carry an ``adapter`` prefix in the + # human-readable line; plugin messages carry a ``plugin`` prefix. + from agents_shipgate.inputs.adapter_validation import ( + strict_adapter_failure_messages, + ) + + plugin_messages = strict_failure_messages(report.loaded_plugins) + adapter_messages = strict_adapter_failure_messages( + getattr(report, "loaded_adapters", []) or [] + ) + messages = plugin_messages + adapter_messages if not messages: return exit_code prefix = f"{label}: " if label else "" + issue_kind = ( + "plugin/adapter issue(s)" + if plugin_messages and adapter_messages + else ("adapter issue(s)" if adapter_messages else "plugin issue(s)") + ) typer.echo( - f"{prefix}--strict-plugins: {len(messages)} plugin issue(s) detected; " + f"{prefix}--strict-plugins: {len(messages)} {issue_kind} detected; " "scan failed under strict-plugins policy.", err=True, ) @@ -175,21 +194,38 @@ def _candidate_manifest_paths( def _diagnose_config_error( - *, config: str, workspace: Path | None, exc: ConfigError + *, + config: str, + workspace: Path | None, + exc: ConfigError, + plugins_enabled: bool | None = None, ) -> list: """Pick the right diagnostic for a ``ConfigError``. - ``ConfigError`` covers two distinct failure shapes: + ``ConfigError`` covers three distinct failure shapes: + - the manifest file does not exist (``MISSING-MANIFEST``) - - one or more candidate manifest files exist but the loader rejected - them — invalid YAML, schema validation failure, unsupported - version (``INVALID-MANIFEST``) + - one or more candidate manifest files exist but the loader + rejected them — invalid YAML, schema validation failure, + unsupported version (``INVALID-MANIFEST``) + - the manifest is well-formed and references a + ``tool_sources[].type`` that resolves to no registered adapter + (``UNKNOWN-ADAPTER-SOURCE-TYPE``, v0.20 PR #111 review + follow-up #5) Disambiguate by walking every candidate path the CLI invocation points at (direct ``-c ``, ``--workspace`` discovery, or a glob pattern). If any candidate is a real file, the loader is - choking on it — emit ``INVALID-MANIFEST`` for that file. + choking on it — but first check whether the error is an + unknown-adapter error (which means the manifest itself is fine + and the right rank-1 action is to install/enable the adapter, + not edit the YAML). """ + unknown_adapter_diag = _maybe_diagnose_unknown_adapter( + config=config, workspace=workspace, exc=exc, plugins_enabled=plugins_enabled + ) + if unknown_adapter_diag is not None: + return unknown_adapter_diag for candidate in _candidate_manifest_paths( config=config, workspace=workspace ): @@ -200,6 +236,67 @@ def _diagnose_config_error( ) +def _maybe_diagnose_unknown_adapter( + *, + config: str, + workspace: Path | None, + exc: ConfigError, + plugins_enabled: bool | None = None, +) -> list | None: + """v0.20 (PR #111 review follow-up #5): detect the + ``AdapterRegistry.require`` unknown-source-type error and route + it to ``DIAG_UNKNOWN_ADAPTER_SOURCE_TYPE`` with install / enable + / typo next_actions, instead of the legacy ``INVALID-MANIFEST`` + "edit shipgate.yaml" path. + + Returns ``None`` if the error is anything else, so the caller + falls through to the existing manifest-missing / manifest-invalid + diagnostics. + + Pattern-matches on the ``"No adapter registered for source type "`` + prefix produced by ``AdapterRegistry.require``. Brittle if the + error text changes — there's a contract test asserting the prefix + stays stable. + """ + + import re + + message = str(exc) + match = re.match( + r"No adapter registered for source type '([^']+)'\.", message + ) + if match is None: + return None + source_type = match.group(1) + # Use the same plugins-enabled logic the dispatcher does so the + # diagnostic's next_action set matches the failure mode. + from agents_shipgate.inputs.protocol import _adapter_plugins_enabled + + discovery_enabled = _adapter_plugins_enabled(plugins_enabled) + # Locate a candidate manifest path to thread into the diagnostic's + # `edit` action; fall back to the user-supplied config string if + # nothing exists on disk (defensive — discovery shouldn't fire if + # the manifest were missing). + candidate_path = Path(config) + for candidate in _candidate_manifest_paths( + config=config, workspace=workspace + ): + if candidate.is_file(): + candidate_path = candidate + break + + from agents_shipgate.cli.diagnostics import ( + diagnose_unknown_adapter_source_type, + ) + + return diagnose_unknown_adapter_source_type( + candidate_path, + source_type=source_type, + plugins_enabled=discovery_enabled, + message=message, + ) + + def _run_multi_scan( *, config_paths: list[Path], diff --git a/src/agents_shipgate/cli/_register_doctor.py b/src/agents_shipgate/cli/_register_doctor.py index e2f5ef8f..8a32ca0d 100644 --- a/src/agents_shipgate/cli/_register_doctor.py +++ b/src/agents_shipgate/cli/_register_doctor.py @@ -9,7 +9,6 @@ from agents_shipgate.cli.agent_mode import emit_agent_mode_error as _emit_agent_mode_error from agents_shipgate.cli.diagnostics import ( diagnose_doctor, - diagnose_invalid_manifest, top_next_actions, ) from agents_shipgate.cli.discovery.placeholders import collect_placeholders @@ -51,23 +50,15 @@ def doctor( try: payloads.append(inspect_sources(config_path=path, verbose=verbose)) except ConfigError as exc: - # A specific discovered manifest failed to load. If the - # file exists, route the agent to edit it directly - # (INVALID-MANIFEST) — `init` refuses to overwrite, so - # MISSING-MANIFEST's detect/init hint would loop. If - # the file is genuinely absent (only possible in the - # bare ``-c missing.yaml`` path, since discovery and - # globbing only yield existing files), fall through to - # the missing-manifest dispatch. + # A specific discovered manifest failed to load. Route + # through the same ConfigError classifier as scan so + # unknown third-party adapter source types produce the + # install/enable diagnostic instead of the generic + # INVALID-MANIFEST "edit shipgate.yaml" recovery. typer.echo(f"Config error: {exc}", err=True) - if path.is_file(): - diagnostics = diagnose_invalid_manifest( - path, message=str(exc) - ) - else: - diagnostics = _diagnose_config_error( - config=str(path), workspace=None, exc=exc - ) + diagnostics = _diagnose_config_error( + config=str(path), workspace=None, exc=exc + ) flattened = top_next_actions(diagnostics) _emit_agent_mode_error( "config_error", diff --git a/src/agents_shipgate/cli/_register_scan.py b/src/agents_shipgate/cli/_register_scan.py index 043f3f40..0bc1f9aa 100644 --- a/src/agents_shipgate/cli/_register_scan.py +++ b/src/agents_shipgate/cli/_register_scan.py @@ -88,15 +88,21 @@ def scan( no_plugins: bool = typer.Option( False, "--no-plugins", - help="Do not load third-party check plugins even when AGENTS_SHIPGATE_ENABLE_PLUGINS is set.", + help=( + "Do not load third-party check plugins OR third-party adapters " + "(v0.20+: agents_shipgate.adapters entry-point group), even " + "when AGENTS_SHIPGATE_ENABLE_PLUGINS is set." + ), ), strict_plugins: bool = typer.Option( False, "--strict-plugins", help=( - "Exit non-zero (code 4) if any loaded plugin failed validation or " - "produced runtime errors. Default lenient mode records the failure " - "in report.loaded_plugins but proceeds with the scan." + "Exit non-zero (code 4) if any loaded plugin OR third-party " + "adapter (v0.20+) failed validation or produced runtime " + "errors. Default lenient mode records the failure in " + "report.loaded_plugins / report.loaded_adapters but proceeds " + "with the scan." ), ), suggest_patches: bool = typer.Option( @@ -208,7 +214,10 @@ def scan( except ConfigError as exc: typer.echo(f"Config error: {exc}", err=True) diagnostics = _diagnose_config_error( - config=config, workspace=workspace, exc=exc + config=config, + workspace=workspace, + exc=exc, + plugins_enabled=False if no_plugins else None, ) flattened = top_next_actions(diagnostics) _emit_agent_mode_error( diff --git a/src/agents_shipgate/cli/diagnostics.py b/src/agents_shipgate/cli/diagnostics.py index f0c52820..86a351ac 100644 --- a/src/agents_shipgate/cli/diagnostics.py +++ b/src/agents_shipgate/cli/diagnostics.py @@ -34,6 +34,7 @@ DIAG_NO_PRODUCTION_PERMISSIONS, DIAG_NON_AGENT_LIBRARY, DIAG_PURE_PROMPT_EXPERIMENT, + DIAG_UNKNOWN_ADAPTER_SOURCE_TYPE, DIAG_ZERO_TOOLS, Diagnostic, NextAction, @@ -94,6 +95,130 @@ def diagnose_missing_manifest(workspace: Path) -> list[Diagnostic]: ] +def diagnose_unknown_adapter_source_type( + manifest_path: Path, + *, + source_type: str, + plugins_enabled: bool, + message: str, +) -> list[Diagnostic]: + """v0.20 (PR #111 review follow-up #5): ``shipgate.yaml`` references + a ``tool_sources[].type`` value that no registered adapter handles. + + Distinct from ``SHIP-DIAG-INVALID-MANIFEST``: the manifest passes + Pydantic validation (``type`` is open ``str`` for third-party + adapter support) but the dispatcher can't resolve the source type. + The right rank-1 action depends on whether plugin discovery is + currently enabled: + + - **plugins disabled**: install the third-party adapter package + AND enable discovery via ``AGENTS_SHIPGATE_ENABLE_PLUGINS=1`` + (or remove ``--no-plugins``). + - **plugins enabled**: install the adapter package, or fix a typo + against a built-in name. + + The "edit shipgate.yaml" path that ``diagnose_invalid_manifest`` + emits would be misleading here — the manifest itself is valid; + the user just needs to install/enable the matching adapter. + """ + + if plugins_enabled: + next_actions = [ + NextAction( + kind="command", + command="pip install ", + why=( + f"Install the third-party package that ships an " + f"adapter for {source_type!r} via the " + f"`agents_shipgate.adapters` entry-point group. " + f"Re-run the scan to confirm it appears in " + f"`report.loaded_adapters[]` with " + f"`validation_status=\"valid\"`." + ), + expects=( + f"`agents-shipgate doctor -c {_quote_path(manifest_path)} " + f"--json` lists {source_type!r} under sources[] " + f"with no warning." + ), + ), + NextAction( + kind="edit", + path=str(manifest_path), + why=( + f"If {source_type!r} is a typo, fix it. Built-in " + "source types: mcp, openapi, openai_agents_sdk, " + "google_adk, langchain, crewai, codex_plugin." + ), + expects=( + "Manifest references only built-in source types or " + "source types registered by installed third-party " + "adapter packages." + ), + ), + ] + else: + next_actions = [ + NextAction( + kind="command", + command=( + "AGENTS_SHIPGATE_ENABLE_PLUGINS=1 agents-shipgate " + f"scan -c {_quote_path(manifest_path)}" + ), + why=( + "Enable third-party adapter discovery and re-run " + "the scan. If the adapter package is also " + "installed, this resolves the source_type." + ), + expects=( + f"Scan completes; `report.loaded_adapters[]` " + f"contains an entry for {source_type!r} with " + f"`validation_status=\"valid\"`." + ), + ), + NextAction( + kind="command", + command="pip install ", + why=( + f"If discovery is already enabled and " + f"{source_type!r} still doesn't resolve, the " + "adapter package isn't installed. Install it, " + "then re-run." + ), + expects=( + f"`pip show ` succeeds; `agents-shipgate " + f"doctor -c {_quote_path(manifest_path)} --json` " + f"lists {source_type!r} under sources[]." + ), + ), + NextAction( + kind="edit", + path=str(manifest_path), + why=( + f"If {source_type!r} is a typo, fix it. Built-in " + "source types: mcp, openapi, openai_agents_sdk, " + "google_adk, langchain, crewai, codex_plugin." + ), + expects=( + "Manifest references only built-in source types or " + "source types registered by installed third-party " + "adapter packages." + ), + ), + ] + + return [ + Diagnostic( + id=DIAG_UNKNOWN_ADAPTER_SOURCE_TYPE, + title=( + f"Unknown adapter source_type {source_type!r} " + "(install/enable the adapter, or fix a typo)" + ), + severity="block", + next_actions=next_actions, + ) + ] + + def diagnose_invalid_manifest( manifest_path: Path, *, message: str ) -> list[Diagnostic]: diff --git a/src/agents_shipgate/cli/scan.py b/src/agents_shipgate/cli/scan.py index 5cd21f3c..c55d2faf 100644 --- a/src/agents_shipgate/cli/scan.py +++ b/src/agents_shipgate/cli/scan.py @@ -183,6 +183,12 @@ class _LoadedInputs: artifact_warnings_list: list[str] # from _artifact_warnings(artifact_bag) placeholder_warnings: list[str] # from _manifest_placeholder_warnings policy_pack_warnings: list[str] # from policy_packs.warnings + # v0.20: third-party adapter provenance from + # ``discover_third_party_adapters``. Both valid and invalid records + # appear here; ``loaded_adapters[].validation_status == "valid"`` + # distinguishes them. Empty list when --no-plugins is set or no + # third-party adapters are installed. + loaded_adapters: list[dict[str, Any]] adk: GoogleAdkArtifacts | None langchain: LangChainArtifacts | None crewai: CrewAiArtifacts | None @@ -273,6 +279,7 @@ class _SanitizedSurfaces: policy_audit: PolicyAudit loaded_policy_packs: list[Any] loaded_plugins: Any + loaded_adapters: Any # v0.20: list[dict[str, Any]]; sanitized via redact_data diff_reference: ToolSurfaceDiffReference | None action_surface_facts: ActionSurfaceFacts action_surface_diff: Any @@ -343,13 +350,59 @@ def _load_inputs( config_path: Path, policy_pack_paths: list[Path] | None, verbose: bool, + plugins_enabled: bool | None = None, ) -> _LoadedInputs: """Phase 2: dispatch every adapter through ``REGISTRY``, extract typed artifacts from the ``ArtifactBag``, aggregate source warnings (including CHANGE_ME placeholder warnings from the manifest text), load policy packs. + + v0.20: also discovers third-party adapters from the + ``agents_shipgate.adapters`` entry-point group BEFORE + ``_load_sources`` runs, so the dispatcher resolves any + user-installed plugin source_types alongside built-ins. Discovery + is gated by ``plugins_enabled`` (mirroring the plugin-check flow + in ``checks/registry.py``). """ - loaded_sources, artifact_bag = _load_sources(manifest, base_dir, verbose=verbose) + from agents_shipgate.inputs.protocol import discover_third_party_adapters + + # v0.20 (PR #111 review fix P1 #1+#2): build a per-scan registry + # clone so third-party discovery NEVER mutates the global + # ``REGISTRY``. Without this, a later ``--no-plugins`` scan would + # still see adapters registered by an earlier scan, and the + # collision check on scan-two would misclassify stable third- + # party adapters as ``source_type_collision`` (the global already + # has them from scan-one). The clone captures any monkeypatch + # state at this exact moment, so existing tests that + # ``monkeypatch.setitem(REGISTRY._adapters, …)`` still work. + scan_registry = REGISTRY.clone() + loaded_adapters: list[dict[str, Any]] = [] + discovery_records = discover_third_party_adapters( + scan_registry, + plugins_enabled=plugins_enabled, + loaded_adapters=loaded_adapters, + ) + # v0.20 (PR #111 review follow-up #2): map of source_type → valid + # LoadedAdapter record. Used by ``_load_sources`` to route + # third-party adapter ``load()`` calls through + # ``run_validated_adapter`` so runtime exceptions land in + # ``loaded_adapters[].runtime_errors`` instead of crashing the + # scan. Invalid records (validation_status != "valid") are + # excluded: they never registered on ``scan_registry`` and so the + # dispatcher will never reach them. + third_party_records: dict[str, Any] = { + record.adapter.source_type: record + for record in discovery_records + if record.adapter is not None + } + loaded_sources, artifact_bag = _load_sources( + manifest, + base_dir, + verbose=verbose, + registry=scan_registry, + third_party_records=third_party_records, + plugins_enabled=plugins_enabled, + ) logger.debug( "loaded sources", extra={ @@ -388,6 +441,7 @@ def _load_inputs( artifact_warnings_list=artifact_warnings_list, placeholder_warnings=placeholder_warnings, policy_pack_warnings=policy_pack_warnings, + loaded_adapters=loaded_adapters, adk=artifact_bag.get("google_adk", GoogleAdkArtifacts), langchain=artifact_bag.get("langchain", LangChainArtifacts), crewai=artifact_bag.get("crewai", CrewAiArtifacts), @@ -791,6 +845,16 @@ def _sanitize_for_output( stats=privacy_stats, path="loaded_plugins[]", ) + # v0.20: third-party adapter provenance. Same redaction shape as + # loaded_plugins[] — entry-point ``value`` strings and distribution + # metadata are first-party and don't carry secrets, but the audit + # envelope flows through redact_data for forward-compat with future + # adapter-emitted fields. + public_loaded_adapters = redact_data( + inputs.loaded_adapters, + stats=privacy_stats, + path="loaded_adapters[]", + ) public_diff_reference = _sanitize_diff_reference( diffs.diff_reference, @@ -970,6 +1034,7 @@ def _sanitize_for_output( policy_audit=public_policy_audit, loaded_policy_packs=public_loaded_policy_packs, loaded_plugins=public_loaded_plugins, + loaded_adapters=public_loaded_adapters, diff_reference=public_diff_reference, action_surface_facts=public_action_surface_facts, action_surface_diff=public_action_surface_diff, @@ -1021,6 +1086,7 @@ def _build_final_report( new_findings_only=sanitized.baseline_summary is not None, loaded_policy_packs=sanitized.loaded_policy_packs, loaded_plugins=sanitized.loaded_plugins, + loaded_adapters=sanitized.loaded_adapters, source_warnings=sanitized.source_warnings, api_surface=sanitized.api_surface, anthropic_surface=sanitized.anthropic_surface, @@ -1141,6 +1207,7 @@ def run_scan( config_path=config_path, policy_pack_paths=policy_pack_paths, verbose=verbose, + plugins_enabled=plugins_enabled, ) tools_and_agent = _build_tools_and_agent( manifest=resolved.manifest, @@ -1197,7 +1264,25 @@ def run_scan( -def inspect_sources(*, config_path: Path, verbose: bool = False) -> dict[str, object]: +def inspect_sources( + *, + config_path: Path, + verbose: bool = False, + plugins_enabled: bool | None = None, +) -> dict[str, object]: + """``doctor``'s manifest-introspection entry point. + + v0.20 (PR #111 review fix follow-up #3): mirrors ``_load_inputs``'s + per-scan registry clone + adapter discovery so ``doctor`` can + inspect manifests that reference third-party source types. Before + this fix, the global ``REGISTRY`` was builtin-only (by design, + after the per-scan-registry refactor), so a manifest with + ``tool_sources[].type: demo_source`` scanned fine but ``doctor`` + raised ``ConfigError: No adapter registered``. + """ + + from agents_shipgate.inputs.protocol import discover_third_party_adapters + manifest = load_manifest(config_path) base_dir = config_path.resolve().parent unresolved_sources = _resolve_source_paths(manifest, base_dir, config_path) @@ -1215,7 +1300,31 @@ def inspect_sources(*, config_path: Path, verbose: bool = False) -> dict[str, ob ] } ) - loaded_sources, artifact_bag = _load_sources(manifest, base_dir, verbose=verbose) + # v0.20 (PR #111 review follow-up #3): build a per-scan registry + # for ``doctor`` so it sees the same adapter set as ``scan``. The + # global ``REGISTRY`` is builtin-only by design after the + # per-scan-clone refactor; without this discovery step, + # third-party source types would be unresolvable here. + scan_registry = REGISTRY.clone() + loaded_adapters: list[dict[str, Any]] = [] + discovery_records = discover_third_party_adapters( + scan_registry, + plugins_enabled=plugins_enabled, + loaded_adapters=loaded_adapters, + ) + third_party_records: dict[str, Any] = { + record.adapter.source_type: record + for record in discovery_records + if record.adapter is not None + } + loaded_sources, artifact_bag = _load_sources( + manifest, + base_dir, + verbose=verbose, + registry=scan_registry, + third_party_records=third_party_records, + plugins_enabled=plugins_enabled, + ) adk_artifacts = artifact_bag.get("google_adk", GoogleAdkArtifacts) langchain_artifacts = artifact_bag.get("langchain", LangChainArtifacts) crewai_artifacts = artifact_bag.get("crewai", CrewAiArtifacts) @@ -1263,6 +1372,11 @@ def inspect_sources(*, config_path: Path, verbose: bool = False) -> dict[str, ob else None ), "policy_packs": [pack.model_dump(mode="json") for pack in policy_packs.loaded], + # v0.20 (PR #111 review follow-up #3): surface third-party + # adapter discovery results in the doctor payload so the + # operator can confirm which extensions were loaded (or why + # they were skipped) without running a full scan. + "loaded_adapters": loaded_adapters, "baseline": _default_baseline_status(base_dir), "warnings": warnings, "unresolved_sources": unresolved_sources, @@ -1349,8 +1463,11 @@ def _load_sources( base_dir: Path, *, verbose: bool, + registry: Any = None, + third_party_records: dict[str, Any] | None = None, + plugins_enabled: bool | None = None, ) -> tuple[list[LoadedToolSource], ArtifactBag]: - """Dispatch every adapter through ``REGISTRY``. + """Dispatch every adapter through the supplied ``registry``. Returns ``(loaded_sources, artifact_bag)``. ``artifact_bag`` is a typed ``ArtifactBag`` with per-scan adapter artifacts keyed by @@ -1360,7 +1477,7 @@ def _load_sources( Ordering is deterministic and matches the legacy run_scan order: 1. per-source loaders in tool_sources declared order - 2. per-scan adapters in REGISTRY iteration order: + 2. per-scan adapters in registry iteration order: google_adk → langchain → crewai → n8n → openai_api → anthropic_api → codex_plugin → validation @@ -1372,33 +1489,87 @@ def _load_sources( Per-scan source types appearing in tool_sources are ignored by pass 1 — they would be redundant; framework loaders already iterate every matching entry internally via the manifest. + + v0.20 (PR #111 review fix): ``registry`` is the per-scan registry + built by ``_load_inputs`` (``REGISTRY.clone()`` plus any + third-party adapters validated in this scan). Defaults to the + module-global ``REGISTRY`` only for callers that bypass + ``_load_inputs`` (notably the legacy tests in + ``tests/test_adapter_registry.py``). New code should always pass + a per-scan registry. + + v0.20 (PR #111 review fix follow-up #2): ``third_party_records`` + maps each validated third-party ``source_type`` to its + ``LoadedAdapter`` record (from ``discover_third_party_adapters``). + When set, the dispatcher routes those adapters through + ``run_validated_adapter`` so any exception during their + ``load()`` call is captured into + ``loaded_adapters[].runtime_errors`` and the scan continues in + lenient mode (or trips ``--strict-plugins`` exit 4 in strict + mode). Built-in adapters keep the direct call shape — a built-in + raising means the scanner itself is broken and must abort loudly. + + ``plugins_enabled`` is forwarded into ``AdapterRegistry.require`` so + unknown third-party source-type errors reflect explicit CLI + overrides such as ``--no-plugins`` instead of only inspecting the + environment. """ + if registry is None: + registry = REGISTRY + if third_party_records is None: + third_party_records = {} per_source_loaded: list[LoadedToolSource] = [] per_scan_loaded: list[LoadedToolSource] = [] bag = ArtifactBag() # Pass 1 — per-source adapters only, in tool_sources declared # order. Per-scan source types (langchain, crewai, etc.) are - # skipped here; pass 2 invokes them in canonical REGISTRY order + # skipped here; pass 2 invokes them in canonical registry order # regardless of where they appear in tool_sources. This protects # the dedup tie-break in _flatten_and_deduplicate_tools from # changing based on user-facing tool_sources ordering. for source in manifest.tool_sources: - adapter = REGISTRY.require(source.type) + adapter = registry.require(source.type, plugins_enabled=plugins_enabled) if adapter.scope != "per_source": continue + third_party_record = third_party_records.get(source.type) result = _invoke_per_source_adapter( - adapter, source, base_dir, manifest, verbose=verbose + adapter, + source, + base_dir, + manifest, + verbose=verbose, + third_party_record=third_party_record, ) + if result is None: + # Third-party adapter raised at runtime; the wrapper + # captured the failure into runtime_errors and we skip + # absorbing the (None) result. + continue _absorb(result, source.type, per_source_loaded, bag, adapter) - # Pass 2 — every per-scan adapter fires once, in REGISTRY order. + # Pass 2 — every per-scan adapter fires once, in registry order. # Covers framework adapters (always check their manifest section # internally and may emit zero LoadedToolSource entries when not # configured) and manifest-only adapters (openai_api, # anthropic_api, n8n). - for adapter in REGISTRY.per_scan_adapters(): - result = adapter.load(None, base_dir, manifest) + for adapter in registry.per_scan_adapters(): + third_party_record = third_party_records.get(adapter.source_type) + if third_party_record is not None: + from agents_shipgate.inputs.adapter_validation import ( + run_validated_adapter, + ) + + result = run_validated_adapter( + third_party_record, + source=None, + base_dir=base_dir, + manifest=manifest, + ) + if result is None: + continue + else: + result = adapter.load(None, base_dir, manifest) _absorb(result, adapter.source_type, per_scan_loaded, bag, adapter) return per_source_loaded + per_scan_loaded, bag @@ -1492,7 +1663,35 @@ def _invoke_per_source_adapter( manifest: AgentsShipgateManifest, *, verbose: bool, -) -> LoadedAdapterResult: + third_party_record: Any = None, +) -> LoadedAdapterResult | None: + """Invoke a per_source adapter and return its result. + + For **built-in** adapters: catch ``InputParseError`` only when the + source is marked ``optional`` (returning a warning-only stub); + any other exception propagates. A built-in raising means the + scanner is broken and must abort loudly. + + For **third-party** adapters (``third_party_record`` is the + matching ``LoadedAdapter``): route through + ``run_validated_adapter``, which captures ALL exceptions into the + record's ``runtime_errors`` list and returns ``None``. Returning + ``None`` signals the caller to skip ``_absorb`` for this source — + the scan continues in lenient mode and ``--strict-plugins`` sees + the runtime error on exit. + """ + + if third_party_record is not None: + from agents_shipgate.inputs.adapter_validation import ( + run_validated_adapter, + ) + + return run_validated_adapter( + third_party_record, + source=source, + base_dir=base_dir, + manifest=manifest, + ) try: return adapter.load(source, base_dir, manifest) except InputParseError: diff --git a/src/agents_shipgate/core/findings.py b/src/agents_shipgate/core/findings.py index d4165924..0eab2dcc 100644 --- a/src/agents_shipgate/core/findings.py +++ b/src/agents_shipgate/core/findings.py @@ -1294,6 +1294,7 @@ def build_report( new_findings_only: bool = False, loaded_policy_packs: list[LoadedPolicyPack] | None = None, loaded_plugins: list[dict[str, object]] | None = None, + loaded_adapters: list[dict[str, object]] | None = None, source_warnings: list[str] | None = None, api_surface: dict[str, object] | None = None, anthropic_surface: dict[str, object] | None = None, @@ -1330,6 +1331,7 @@ def build_report( generated_reports=generated_reports, loaded_policy_packs=loaded_policy_packs or [], loaded_plugins=loaded_plugins or [], + loaded_adapters=loaded_adapters or [], tool_inventory=tool_inventory(tools), source_warnings=source_warnings or [], # v0.17 (M1): policy audit envelope. Always present on emitted diff --git a/src/agents_shipgate/inputs/adapter_validation.py b/src/agents_shipgate/inputs/adapter_validation.py new file mode 100644 index 00000000..31199ee5 --- /dev/null +++ b/src/agents_shipgate/inputs/adapter_validation.py @@ -0,0 +1,424 @@ +"""Adapter validation gates for the ``agents_shipgate.adapters`` entry-point group. + +v0.20: third-party adapter discovery follow-up to v0.18's M5 plugin +validation. Plugins (check functions) and adapters (input loaders) are +the two extension surfaces; this module gives adapters the same +structural trust enforcement plugins already have. + +Four load-time gates, each producing a single ``ValidationFailure`` row +that lands in ``loaded_adapters[].validation_errors``: + +1. **load** — ``entry_point.load()`` raises (broken module import, + syntax error, missing dependency). Exception is captured; the + adapter is recorded with ``validation_status="load_failed"`` and + skipped. +2. **bad_protocol** — the loaded value cannot produce a + ``ToolSourceAdapter`` instance. Either it's not callable (cannot + instantiate) or the resulting instance lacks the required ClassVar + attributes (``source_type``, ``scope``, ``artifact_class``) or the + ``load`` method. +3. **bad_scope** — ``scope`` is not one of ``"per_source"`` / + ``"per_scan"``. The dispatcher's pass-1/pass-2 design depends on + this enum; an out-of-range value would silently skip the adapter. +4. **source_type_collision** — the adapter's ``source_type`` shadows + a built-in (``mcp`` / ``openapi`` / ``langchain`` / etc.) or one + already registered by an earlier third-party adapter in the same + scan. This is the load-bearing trust rule — without it, a malicious + plugin could displace a built-in adapter and intercept all scans + targeting that source type. + +Runtime validation (``run_validated_adapter``) is a separate concern +from the load-time gates. It wraps the actual ``adapter.load()`` call +and ensures: + +* Exceptions during the call are captured (the scan continues), but + the adapter's record gains a ``runtime_errors`` entry. +* The returned object must be a ``LoadedAdapterResult``; otherwise we + drop the result. +* When the adapter declares a non-None ``artifact_class``, the + returned ``artifact`` (if non-None) must be an instance of that + class. This is the load-bearing artifact-smuggling prevention rule. + +An adapter landing here in ``valid`` state with empty +``runtime_errors`` behaves exactly like a built-in adapter. Everything +else surfaces in ``report.loaded_adapters[]`` so the operator can see +what was skipped and why without reading scanner logs. +""" + +from __future__ import annotations + +import inspect +from dataclasses import dataclass, field +from typing import Any + +# Stable enum surfaced to consumers via +# ``loaded_adapters[].validation_status``. Ordering is informational; +# consumers should compare to literal strings. +ValidationStatus = str +VALID = "valid" +LOAD_FAILED = "load_failed" +BAD_PROTOCOL = "bad_protocol" +BAD_SCOPE = "bad_scope" +SOURCE_TYPE_COLLISION = "source_type_collision" + +_VALID_SCOPES: frozenset[str] = frozenset({"per_source", "per_scan"}) + + +@dataclass +class LoadedAdapter: + """One row in the adapter-load result table. + + ``adapter`` is ``None`` when validation failed; the discovery + function skips invalid adapters but still surfaces them under + ``loaded_adapters[]`` (parallel to ``loaded_plugins[]``). + + ``info`` is the dict shape persisted into the JSON report (see + ``ReadinessReport.loaded_adapters``). The dict is intentionally + mutable so the runtime wrapper can append ``runtime_errors`` after + the adapter has been invoked. + """ + + adapter: Any | None # ToolSourceAdapter; typed Any here to avoid the import cycle + info: dict[str, Any] + runtime_errors: list[str] = field(default_factory=list) + + @property + def valid(self) -> bool: + return self.info.get("validation_status") == VALID + + +def validate_adapter_entry_point( + entry_point: Any, + *, + builtin_source_types: set[str], + already_registered_source_types: set[str], +) -> LoadedAdapter: + """Run the four load-time gates against a single entry point. + + Returns a ``LoadedAdapter`` whose ``info`` dict carries the + decision (``validation_status``, ``validation_errors``, plus the + ``name`` / ``value`` / ``distribution`` / ``version`` / + ``source_type`` fields for ``report.loaded_adapters[]``). + """ + + info = _base_info(entry_point) + + # Gate 1: load + try: + loaded = entry_point.load() + except Exception as exc: # noqa: BLE001 — capture is the point + return _fail(info, LOAD_FAILED, f"entry_point.load() raised: {exc!r}") + + # Gate 2: bad_protocol — instantiate (if class) and check Protocol shape. + try: + adapter = loaded() if isinstance(loaded, type) else loaded + except Exception as exc: # noqa: BLE001 — capture is the point + return _fail( + info, + BAD_PROTOCOL, + f"adapter class could not be instantiated with no args: {exc!r}", + ) + + proto_error = _protocol_error(adapter) + if proto_error is not None: + return _fail(info, BAD_PROTOCOL, proto_error) + + source_type = adapter.source_type + scope = adapter.scope + info["source_type"] = source_type + + # Gate 3: bad_scope — must be exactly one of the two literals. + if scope not in _VALID_SCOPES: + return _fail( + info, + BAD_SCOPE, + f"adapter scope must be 'per_source' or 'per_scan'; got {scope!r}", + ) + + # Gate 4: source_type_collision — load-bearing trust rule. + if source_type in builtin_source_types: + return _fail( + info, + SOURCE_TYPE_COLLISION, + f"source_type {source_type!r} is reserved by a built-in adapter", + ) + if source_type in already_registered_source_types: + return _fail( + info, + SOURCE_TYPE_COLLISION, + f"source_type {source_type!r} is already registered by another " + "third-party adapter", + ) + + # All gates pass. + info["validation_status"] = VALID + info["validation_errors"] = [] + return LoadedAdapter(adapter=adapter, info=info) + + +def run_validated_adapter( + loaded: LoadedAdapter, + *, + source: Any, + base_dir: Any, + manifest: Any, +) -> Any | None: + """Invoke a validated adapter's ``load()`` with runtime safety nets. + + Returns the ``LoadedAdapterResult`` if the call succeeds AND the + returned artifact (if any) is an instance of the adapter's declared + ``artifact_class``. Returns ``None`` on any runtime failure; + failures are captured in ``loaded.runtime_errors`` (and mirrored + into ``loaded.info['runtime_errors']``). + + Callers should treat ``None`` as "adapter produced no usable + result" and skip it. This mirrors ``run_validated_plugin`` in + ``checks/plugin_validation.py``. + """ + + if loaded.adapter is None: # defensive + raise RuntimeError("run_validated_adapter called on an invalid adapter record") + + runtime_errors: list[str] = [] + + try: + result = loaded.adapter.load(source, base_dir, manifest) + except Exception as exc: # noqa: BLE001 — capture is the point + runtime_errors.append(f"adapter.load() raised: {exc!r}") + _set_runtime_errors(loaded, runtime_errors) + return None + + # Lazy import to avoid the inputs.protocol ↔ inputs.adapter_validation + # cycle. ``LoadedAdapterResult`` is defined in inputs/protocol.py. + from agents_shipgate.inputs.protocol import LoadedAdapterResult + + if not isinstance(result, LoadedAdapterResult): + runtime_errors.append( + "adapter.load() must return LoadedAdapterResult; got " + f"{type(result).__name__!r}" + ) + _set_runtime_errors(loaded, runtime_errors) + return None + + # Smuggling prevention: when the adapter declares a non-None + # artifact_class, the returned artifact must be an instance of it. + # The dispatcher (cli/scan.py:_absorb) also enforces this for + # built-ins; recording it as a runtime error here gives external + # consumers (loaded_adapters[].runtime_errors) visibility before + # the dispatcher's TypeError fires. + declared_cls = getattr(loaded.adapter, "artifact_class", None) + if ( + declared_cls is not None + and result.artifact is not None + and not isinstance(result.artifact, declared_cls) + ): + runtime_errors.append( + f"adapter declared artifact_class={declared_cls.__name__} but " + f"returned {type(result.artifact).__name__}; dropping artifact" + ) + _set_runtime_errors(loaded, runtime_errors) + return None + + _set_runtime_errors(loaded, runtime_errors) + return result + + +def strict_adapter_failure_messages( + loaded_adapters: list[dict[str, Any]], +) -> list[str]: + """Collect human-readable reasons strict-mode should fail on. + + Returns an empty list if every adapter is ``valid`` with empty + ``runtime_errors``. ``--strict-plugins`` callers should turn a + non-empty list into a non-zero exit (alongside + ``strict_failure_messages`` from ``plugin_validation``). + """ + + messages: list[str] = [] + for record in loaded_adapters: + status = record.get("validation_status") + if status and status != VALID: + errors = record.get("validation_errors") or [status] + for err in errors: + messages.append( + f"adapter {record.get('value') or record.get('name')!r}: {err}" + ) + runtime_errors = record.get("runtime_errors") or [] + for err in runtime_errors: + messages.append( + f"adapter {record.get('value') or record.get('name')!r}: {err}" + ) + return messages + + +# --- internals ------------------------------------------------------------- + + +_REQUIRED_PROTOCOL_ATTRS = ("source_type", "scope", "artifact_class") + + +def _base_info(entry_point: Any) -> dict[str, Any]: + dist = getattr(entry_point, "dist", None) + return { + "name": _maybe_str(getattr(entry_point, "name", None)), + "value": _maybe_str(getattr(entry_point, "value", None)), + "distribution": _distribution_name(dist), + "version": _distribution_version(dist), + "source_type": None, + "validation_status": VALID, + "validation_errors": [], + "runtime_errors": [], + } + + +def _fail(info: dict[str, Any], status: ValidationStatus, message: str) -> LoadedAdapter: + info["validation_status"] = status + info["validation_errors"] = [message] + return LoadedAdapter(adapter=None, info=info) + + +def _protocol_error(adapter: Any) -> str | None: + """Return a human-readable error if ``adapter`` does not satisfy + the ``ToolSourceAdapter`` Protocol; ``None`` if it does. + + The Protocol requires three ClassVar attributes and a ``load`` + method. ``isinstance`` against ``ToolSourceAdapter`` (which is + ``@runtime_checkable``) would catch most of this, but it can't + distinguish between a missing attribute and a wrongly-typed one, + and the error message it produces is opaque. We do an explicit + check here. + """ + + for attr in _REQUIRED_PROTOCOL_ATTRS: + if not hasattr(adapter, attr): + return ( + f"adapter is missing required ClassVar {attr!r} " + "(ToolSourceAdapter Protocol)" + ) + + source_type = getattr(adapter, "source_type", None) + if not isinstance(source_type, str) or not source_type: + return ( + "adapter source_type must be a non-empty string; got " + f"{source_type!r}" + ) + + load_method = getattr(adapter, "load", None) + if not callable(load_method): + return "adapter.load must be a callable method" + + try: + sig = inspect.signature(load_method) + except (TypeError, ValueError): + # Builtins / C extensions; accept — runtime wrapper will catch. + return None + # ToolSourceAdapter.load is invoked as + # ``adapter.load(source, base_dir, manifest)`` — three positional + # arguments, never any keyword arguments. The validator must accept + # any signature compatible with that exact call shape and reject + # everything else. Three failure modes the original v0.20 (PR #111 + # review fix P2 #4) implementation missed: + # + # - Too FEW positional slots, e.g. ``load(self, source)`` — the + # bound method has only 1 positional slot; the dispatcher's + # 3-arg call would crash with ``TypeError: too many positional + # arguments``. + # - Required keyword-only parameters, e.g. + # ``load(self, source, base_dir, manifest, *, must_set)`` — the + # dispatcher never passes kwargs; the call would crash with + # ``TypeError: missing 1 required keyword-only argument``. + # - Too MANY required positional params (preserved from v0.20). + positional = [ + p + for p in sig.parameters.values() + if p.kind + in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.VAR_POSITIONAL, + ) + ] + if not positional: + return ( + "adapter.load must accept positional arguments " + "(source, base_dir, manifest)" + ) + required = [ + p + for p in positional + if p.kind != inspect.Parameter.VAR_POSITIONAL + and p.default is inspect.Parameter.empty + ] + if len(required) > 3: + return ( + "adapter.load must accept at most 3 required positional " + f"parameters (source, base_dir, manifest); got {len(required)}" + ) + # Count how many positional slots can absorb arguments. Required + + # optional contribute one each; ``*args`` absorbs any remaining. + accepting_slots = sum( + 1 + for p in positional + if p.kind != inspect.Parameter.VAR_POSITIONAL + ) + has_var_positional = any( + p.kind == inspect.Parameter.VAR_POSITIONAL for p in positional + ) + if accepting_slots < 3 and not has_var_positional: + return ( + "adapter.load must accept at least 3 positional arguments " + f"(source, base_dir, manifest); got {accepting_slots} " + "positional slot(s) and no *args" + ) + # Required keyword-only parameters are not satisfiable by the + # ``plugin(source, base_dir, manifest)`` call shape. Optional + # kw-only (with defaults) and ``**kwargs`` are fine. Mirrors the + # plugin-validation signature gate. + required_kw_only = [ + p + for p in sig.parameters.values() + if p.kind == inspect.Parameter.KEYWORD_ONLY + and p.default is inspect.Parameter.empty + ] + if required_kw_only: + names = ", ".join(p.name for p in required_kw_only) + return ( + "adapter.load has required keyword-only parameter(s) " + f"({names}); the dispatcher calls adapter.load(source, " + "base_dir, manifest) with no keyword arguments" + ) + return None + + +def _set_runtime_errors(loaded: LoadedAdapter, errors: list[str]) -> None: + loaded.runtime_errors = errors + loaded.info["runtime_errors"] = errors + + +def _maybe_str(value: Any) -> str | None: + if value is None: + return None + text = str(value) + return text or None + + +def _distribution_name(dist: Any) -> str | None: + if dist is None: + return None + metadata = getattr(dist, "metadata", None) + if metadata is not None: + try: + name = metadata.get("Name") + except AttributeError: + name = None + if isinstance(name, str) and name: + return name + name = getattr(dist, "name", None) + return str(name) if name else None + + +def _distribution_version(dist: Any) -> str | None: + if dist is None: + return None + version = getattr(dist, "version", None) + return str(version) if version else None diff --git a/src/agents_shipgate/inputs/protocol.py b/src/agents_shipgate/inputs/protocol.py index f6bb6881..de407845 100644 --- a/src/agents_shipgate/inputs/protocol.py +++ b/src/agents_shipgate/inputs/protocol.py @@ -6,13 +6,25 @@ ``ToolSourceAdapter``. The CLI's ``_load_sources`` walks ``REGISTRY`` to dispatch. -Adding a new builtin adapter in v0.12+ is a two-file change: +Adding a new built-in adapter in v0.12+ is a two-file change: 1. Drop a class in ``inputs/.py`` matching the Protocol. 2. Add it to the tuple in ``_register_builtin_adapters()`` below in canonical order (see the docstring there). -Entry-point discovery for third-party plugins is a v0.12 follow-up. +**Third-party adapter discovery (v0.20+).** Third-party adapters +register through the ``agents_shipgate.adapters`` entry-point group. +``discover_third_party_adapters()`` validates each entry point through +the four gates in ``inputs/adapter_validation.py`` (load, +bad_protocol, bad_scope, source_type_collision) and registers the +valid ones onto the supplied registry. Discovery is opt-in: it runs +only when ``AGENTS_SHIPGATE_ENABLE_PLUGINS=1`` is set (or an explicit +``plugins_enabled=True`` override is passed), and ``--no-plugins`` +forces it off. Both valid and invalid records surface in +``ReadinessReport.loaded_adapters[]`` so reviewers can see what was +skipped without reading scanner logs. Runtime errors and +artifact-class smuggling attempts are also captured into +``loaded_adapters[].runtime_errors`` via ``run_validated_adapter``. Manifest-only adapters (``openai_api``, ``anthropic_api``, ``n8n``, ``validation``) are registered under string keys that are NOT in @@ -23,10 +35,12 @@ from __future__ import annotations +import os from collections.abc import Iterator from dataclasses import dataclass, field +from importlib.metadata import entry_points from pathlib import Path -from typing import ClassVar, Literal, Protocol, runtime_checkable +from typing import Any, ClassVar, Literal, Protocol, runtime_checkable # Re-export for backward compatibility; protocol.py was ArtifactBag's # original home in v0.11 R1. @@ -138,20 +152,82 @@ def register(self, adapter: ToolSourceAdapter) -> None: ) self._adapters[adapter.source_type] = adapter + def clone(self) -> AdapterRegistry: + """v0.20 (PR #111 review follow-up): return a shallow-copy + registry snapshot for use as a *per-scan* adapter registry. + + The clone shares no mutable state with the original — its + ``_adapters`` dict is a fresh copy. Subsequent ``register()`` / + third-party additions on the clone do NOT propagate to the + global ``REGISTRY``. Closes two P1 review findings: + + - ``--no-plugins`` on a later in-process scan now actually + disables third-party adapters discovered by an earlier scan + (which previously persisted in the global registry). + - Collision detection on the second scan sees only the + builtins (clone-time state), not adapters carried over from + scan one, so a stable third-party adapter no longer + mis-reports as ``source_type_collision``. + + Tests that monkeypatch ``REGISTRY._adapters`` before + ``_load_inputs`` runs are preserved: the clone captures the + global's state at that moment, including monkeypatch + modifications. + """ + + clone = AdapterRegistry(autopopulate=False) + # Lazy-populate ourselves first so the clone inherits the + # canonical builtin set even when this is the global REGISTRY's + # first read. + self._ensure_populated() + clone._adapters = dict(self._adapters) + clone._populated = True + return clone + def get(self, source_type: str) -> ToolSourceAdapter | None: self._ensure_populated() return self._adapters.get(source_type) - def require(self, source_type: str) -> ToolSourceAdapter: + def require( + self, source_type: str, *, plugins_enabled: bool | None = None + ) -> ToolSourceAdapter: self._ensure_populated() adapter = self._adapters.get(source_type) if adapter is None: from agents_shipgate.core.errors import ConfigError + # v0.20 (PR #111 review follow-up #5): the original error + # message ("Add the adapter to _register_builtin_adapters") + # was contributor-facing and obsolete after the + # ``tool_sources[].type: str`` relaxation. Users hitting + # this in production have three real remediation paths; + # surface them in the message so ``str(exc)`` is + # actionable on its own. The agent-mode next_actions + # built by ``diagnose_unknown_adapter_source_type`` cover + # the same paths in structured form. + discovery_enabled = _adapter_plugins_enabled(plugins_enabled) + if discovery_enabled: + remediation = ( + "Either (a) install the third-party adapter " + "package that registers this source_type, or " + "(b) fix a typo of a built-in name " + "(`agents-shipgate list-checks --json` exposes " + "the catalog and `agents-shipgate doctor --json` " + "lists discovered adapters)." + ) + else: + remediation = ( + "Either (a) enable third-party adapter discovery " + "by setting `AGENTS_SHIPGATE_ENABLE_PLUGINS=1` " + "(or removing `--no-plugins`) and ensure the " + "adapter package is installed, or (b) fix a typo " + "of a built-in name (built-ins are: mcp, openapi, " + "openai_agents_sdk, google_adk, langchain, crewai, " + "codex_plugin)." + ) raise ConfigError( - f"No adapter registered for source type {source_type!r}. " - "Add the adapter to " - "agents_shipgate.inputs.protocol._register_builtin_adapters()." + f"No adapter registered for source type " + f"{source_type!r}. {remediation}" ) return adapter @@ -223,3 +299,143 @@ def _register_builtin_adapters(registry: AdapterRegistry) -> None: ValidationAdapter(), ): registry.register(adapter) + + +# v0.20: Third-party adapter discovery via the +# ``agents_shipgate.adapters`` entry-point group. Parallels the +# ``agents_shipgate.checks`` plugin discovery in ``checks/registry.py`` +# but is gated by the same env var / CLI flag so a single +# ``--no-plugins`` blocks both. See ``inputs/adapter_validation.py`` +# for the four-gate validation pipeline. + +ADAPTER_ENTRY_POINT_GROUP = "agents_shipgate.adapters" + + +def discover_third_party_adapters( + registry: AdapterRegistry, + *, + plugins_enabled: bool | None = None, + loaded_adapters: list[dict[str, Any]] | None = None, +) -> list[Any]: + """Walk ``entry_points('agents_shipgate.adapters')``, validate each + third-party adapter, and register the valid ones onto ``registry``. + + **Per-scan registry contract (v0.20 review fix).** ``registry`` + MUST be a per-scan instance (typically ``REGISTRY.clone()`` from + ``_load_inputs``), NOT the global ``REGISTRY`` itself. Mutating + the global broke two trust invariants: + + 1. ``--no-plugins`` on a later scan didn't disable adapters + registered by an earlier scan, because the dispatcher still + resolved them through the global registry. + 2. Collision detection on the second scan misclassified + previously-registered third-party adapters as + ``source_type_collision`` against themselves. + + Callers pass a clone so each scan starts from a known-good + builtins-only snapshot. + + Returns the list of ``LoadedAdapter`` records (both valid and + invalid). When ``loaded_adapters`` is provided, the per-row + ``info`` dicts are appended to it for downstream + ``ReadinessReport.loaded_adapters[]`` surfacing. + + Discovery is opt-in: a no-op when + ``AGENTS_SHIPGATE_ENABLE_PLUGINS`` is unset and + ``plugins_enabled`` is not explicitly ``True``. ``--no-plugins`` + on the CLI translates to ``plugins_enabled=False`` and forces this + off even when the env var is set. + """ + + from agents_shipgate.inputs.adapter_validation import ( + LoadedAdapter, + validate_adapter_entry_point, + ) + + if not _adapter_plugins_enabled(plugins_enabled): + return [] + + # Ensure builtins are populated before we measure the collision + # set — otherwise a third-party adapter declaring source_type="mcp" + # would be erroneously accepted on a freshly-instantiated registry. + registry._ensure_populated() + builtin_source_types = set(registry._adapters.keys()) + already_registered: set[str] = set() + + records: list[LoadedAdapter] = [] + + for entry_point in entry_points(group=ADAPTER_ENTRY_POINT_GROUP): + if _is_builtin_adapter_entry_point(entry_point): + continue + record = validate_adapter_entry_point( + entry_point, + builtin_source_types=builtin_source_types, + already_registered_source_types=already_registered, + ) + records.append(record) + if loaded_adapters is not None: + loaded_adapters.append(record.info) + if record.adapter is not None: + # Register on the live registry so subsequent + # ``REGISTRY.require(source_type)`` calls from the + # dispatcher resolve the third-party adapter. Track the + # source_type for the collision gate in this same loop. + registry.register(record.adapter) + already_registered.add(record.adapter.source_type) + + return records + + +def _adapter_plugins_enabled(override: bool | None = None) -> bool: + """Mirror ``checks/registry.py:_plugins_enabled`` so a single + ``AGENTS_SHIPGATE_ENABLE_PLUGINS`` env var / ``--no-plugins`` + flag governs BOTH plugin checks AND third-party adapters. + """ + + if override is not None: + return override + value = os.environ.get("AGENTS_SHIPGATE_ENABLE_PLUGINS", "") + return value.lower() in {"1", "true", "yes", "on"} + + +def _is_builtin_adapter_entry_point(entry_point: Any) -> bool: + """True for entry points that came from the agents-shipgate + distribution itself. + + The built-in adapters do NOT register through entry points today + (they are registered explicitly via + ``_register_builtin_adapters``), so this is defensive: it future- + proofs the discovery against a hypothetical future where a built- + in adapter migrates to entry-point discovery. Without this guard, + that migration would cause every built-in adapter to be flagged + as a source_type collision against itself. + """ + + dist = getattr(entry_point, "dist", None) + distribution_name = _adapter_distribution_name(dist) + if _adapter_normalize_distribution_name(distribution_name) == "agents-shipgate": + return True + if dist is None: + return str(getattr(entry_point, "value", "")).startswith( + "agents_shipgate.inputs." + ) + return False + + +def _adapter_distribution_name(dist: Any) -> str | None: + if dist is None: + return None + metadata = getattr(dist, "metadata", None) + if metadata is not None: + try: + name = metadata.get("Name") + except AttributeError: + name = None + if isinstance(name, str) and name: + return name + name = getattr(dist, "name", None) + return str(name) if name else None + + +def _adapter_normalize_distribution_name(value: str | None) -> str: + return (value or "").replace("_", "-").lower() diff --git a/src/agents_shipgate/report/markdown.py b/src/agents_shipgate/report/markdown.py index 82bca38b..14dd4525 100644 --- a/src/agents_shipgate/report/markdown.py +++ b/src/agents_shipgate/report/markdown.py @@ -104,6 +104,7 @@ def render_markdown_report(report: ReadinessReport) -> str: _append_source_warnings(lines, report) _append_loaded_policy_packs(lines, report) _append_loaded_plugins(lines, report) + _append_loaded_adapters(lines, report) _append_tool_surface(lines, report) _append_action_surface_diff(lines, report) _append_tool_surface_diff(lines, report) @@ -461,6 +462,44 @@ def _append_loaded_policy_packs(lines: list[str], report: ReadinessReport) -> No lines.append("") +def _append_loaded_adapters(lines: list[str], report: ReadinessReport) -> None: + """v0.20 (PR #111 review follow-up #4): render third-party adapter + provenance in the human-readable report. + + Before this section landed, ``load_failed`` and other invalid + adapters appeared in ``report.json`` only — the default human + report was blind to skipped third-party extensions. Show the + validation status prominently so the reviewer sees what was + skipped (and why) without opening the JSON. + """ + + adapters = getattr(report, "loaded_adapters", None) or [] + if not adapters: + return + lines.extend(["## Loaded Adapters", ""]) + for adapter in adapters: + distribution = adapter.get("distribution") or "unknown distribution" + version = adapter.get("version") + source_type = adapter.get("source_type") or "unknown source_type" + status = adapter.get("validation_status") or "unknown" + suffix = f" {version}" if version else "" + head = ( + f"- {_safe_markdown_text(distribution)}" + f"{_safe_markdown_text(suffix)}: " + f"{_safe_markdown_text(source_type)} — " + f"`{_safe_markdown_text(status)}`" + ) + lines.append(head) + # Surface every validation_error / runtime_error on its own + # indented bullet so a reviewer scanning the report cannot + # miss an adapter that failed to load or crashed at runtime. + for err in adapter.get("validation_errors") or []: + lines.append(f" - validation_error: {_safe_markdown_text(err)}") + for err in adapter.get("runtime_errors") or []: + lines.append(f" - runtime_error: {_safe_markdown_text(err)}") + lines.append("") + + def _append_tool_surface(lines: list[str], report: ReadinessReport) -> None: surface = report.tool_surface lines.extend( diff --git a/src/agents_shipgate/schemas/diagnostics.py b/src/agents_shipgate/schemas/diagnostics.py index debbc0de..30ebd189 100644 --- a/src/agents_shipgate/schemas/diagnostics.py +++ b/src/agents_shipgate/schemas/diagnostics.py @@ -73,6 +73,16 @@ class Diagnostic(BaseModel): DIAG_MISSING_SOURCE_FILE = "SHIP-DIAG-MISSING-SOURCE-FILE" DIAG_CHANGE_ME_PLACEHOLDERS = "SHIP-DIAG-CHANGE-ME-PLACEHOLDERS" DIAG_NO_PRODUCTION_PERMISSIONS = "SHIP-DIAG-NO-PRODUCTION-PERMISSIONS" +# v0.20 (PR #111 review follow-up): manifest references a +# ``tool_sources[].type`` that resolves to no registered adapter. Two +# common causes: (a) plugin discovery is disabled (env unset or +# ``--no-plugins``) and the source type belongs to a third-party +# adapter; (b) a typo of a built-in name. The diagnostic next_actions +# route the agent to the right remediation depending on which case it +# is — installing the third-party package, enabling plugin discovery, +# or fixing the typo — instead of the legacy "edit shipgate.yaml" +# advice that ``diagnose_invalid_manifest`` would otherwise emit. +DIAG_UNKNOWN_ADAPTER_SOURCE_TYPE = "SHIP-DIAG-UNKNOWN-ADAPTER-SOURCE-TYPE" ALL_DIAGNOSTIC_IDS: tuple[str, ...] = ( DIAG_MISSING_MANIFEST, @@ -87,4 +97,5 @@ class Diagnostic(BaseModel): DIAG_MISSING_SOURCE_FILE, DIAG_CHANGE_ME_PLACEHOLDERS, DIAG_NO_PRODUCTION_PERMISSIONS, + DIAG_UNKNOWN_ADAPTER_SOURCE_TYPE, ) diff --git a/src/agents_shipgate/schemas/manifest.py b/src/agents_shipgate/schemas/manifest.py index 81fd2d16..963fefba 100644 --- a/src/agents_shipgate/schemas/manifest.py +++ b/src/agents_shipgate/schemas/manifest.py @@ -47,24 +47,80 @@ class EnvironmentConfig(BaseModel): promotion_to: str | None = None +#: v0.20 (PR #111 review fix P1 #3): the curated set of built-in +#: source types that may legitimately appear in ``tool_sources[].type``. +#: Used for documentation only — ``ToolSourceConfig.type`` is open +#: (``str``) so third-party adapters from the ``agents_shipgate.adapters`` +#: entry-point group can declare custom source types. Unknown source +#: types fail at dispatcher time via ``AdapterRegistry.require``. +BUILTIN_TOOL_SOURCE_TYPES: tuple[str, ...] = ( + "mcp", + "openapi", + "openai_agents_sdk", + "google_adk", + "langchain", + "crewai", + "codex_plugin", +) + +#: Built-in adapters that are intentionally NOT permitted in +#: ``tool_sources[]`` because they are per_scan-only and ingest +#: configuration from their dedicated top-level manifest section. +#: Placing one of these in ``tool_sources`` is a user mistake; we +#: reject it at manifest-load time with a clear error rather than +#: silently no-op'ing it in pass 1 of the dispatcher. +BUILTIN_PER_SCAN_ONLY_TOOL_SOURCE_TYPES: frozenset[str] = frozenset( + {"n8n", "openai_api", "anthropic_api", "validation"} +) + + class ToolSourceConfig(BaseModel): model_config = STRICT_MODEL_CONFIG id: str - type: Literal[ - "mcp", - "openapi", - "openai_agents_sdk", - "google_adk", - "langchain", - "crewai", - "codex_plugin", - ] + # v0.20 (PR #111 review fix P1 #3): opened from a closed Literal to + # ``str`` so manifests can reference third-party per_source adapters + # registered via the ``agents_shipgate.adapters`` entry-point group. + # Without this relaxation, ``ToolSourceConfig.model_validate`` + # rejected ``type: my_custom_source`` at manifest-load time — + # before adapter discovery had a chance to register the loader — + # making the v0.20 third-party adapter surface unusable for its + # main advertised use case. + # + # Built-in source types are enumerated in + # ``BUILTIN_TOOL_SOURCE_TYPES`` above (documentation only). Unknown + # source types are still rejected with a routable ``ConfigError`` + # (exit 2) at dispatch time via ``AdapterRegistry.require``. Typos + # in built-in names therefore fail loudly with the same exit code + # as before — just from a different code path. + # + # Per_scan-only built-ins (``n8n``, ``openai_api``, ``anthropic_api``, + # ``validation``) remain explicitly REJECTED here (model validator + # below) because they ingest config from their dedicated top-level + # manifest section, not from ``tool_sources[]``. + type: str path: str | None = None trust: str | None = None mode: str | None = None optional: bool = False + @model_validator(mode="after") + def reject_per_scan_only_builtins(self) -> ToolSourceConfig: + # Each per_scan-only built-in has its own dedicated manifest + # section (``n8n.workflows``, ``openai_api.tools``, …); putting + # it in ``tool_sources`` is always a user mistake. Third-party + # per_scan adapters (also discovered via entry points) do NOT + # require ``tool_sources`` entries either; the dispatcher + # iterates them in pass-2 regardless. + if self.type in BUILTIN_PER_SCAN_ONLY_TOOL_SOURCE_TYPES: + raise ValueError( + f"tool_sources entry {self.id!r} declares type " + f"{self.type!r}, which is a per_scan-only built-in. " + f"Move this configuration to the top-level " + f"``{self.type}:`` manifest section." + ) + return self + @model_validator(mode="after") def require_path_when_needed(self) -> ToolSourceConfig: if ( diff --git a/src/agents_shipgate/schemas/report.py b/src/agents_shipgate/schemas/report.py index 0dfae9f5..d886609f 100644 --- a/src/agents_shipgate/schemas/report.py +++ b/src/agents_shipgate/schemas/report.py @@ -624,6 +624,13 @@ class ReadinessReport(BaseModel): generated_reports: dict[str, str] = Field(default_factory=dict) loaded_policy_packs: list[LoadedPolicyPack] = Field(default_factory=list) loaded_plugins: list[dict[str, Any]] = Field(default_factory=list) + # v0.20: third-party adapter provenance. Mirrors loaded_plugins[] but + # for the agents_shipgate.adapters entry-point group. Always present + # on emitted scans (empty list when no third-party adapters discovered + # or when --no-plugins is set). Required + non-nullable on the wire; + # Python-Optional via default_factory so older test helpers + # constructing minimal reports keep working. + loaded_adapters: list[dict[str, Any]] = Field(default_factory=list) tool_inventory: list[dict[str, Any]] = Field(default_factory=list) source_warnings: list[str] = Field(default_factory=list) # v0.12: top-level agent summary. Deterministic projection of diff --git a/tests/harness/test_overlay_renderer.py b/tests/harness/test_overlay_renderer.py index 2faf6550..1f230118 100644 --- a/tests/harness/test_overlay_renderer.py +++ b/tests/harness/test_overlay_renderer.py @@ -45,15 +45,21 @@ def test_40_shipgate_yaml_renders_clean_for_every_archetype(tmp_path: Path) -> N def test_every_archetype_uses_a_valid_tool_source_type() -> None: - """Each archetype's ``ToolSource.type`` must be in the v0.1 manifest enum. - - Catches regressions like the original ``openai_api`` typo for - clean-read-only that would otherwise only surface when an operator runs + """Each archetype's ``ToolSource.type`` must be a known built-in + source type (third-party extensions are intentionally NOT used in + canonical archetypes — the harness ships a fixed set). + + v0.20 PR #111 review fix: ``ToolSourceConfig.type`` relaxed from + Literal to ``str`` to support third-party adapters; this test now + pins the archetype set against the explicit + ``BUILTIN_TOOL_SOURCE_TYPES`` constant. Catches regressions like + the original ``openai_api`` typo for clean-read-only that would + otherwise only surface when an operator runs ``agents-shipgate doctor`` on a rendered 40-shipgate-yaml manifest. """ - from agents_shipgate.schemas.manifest import ToolSourceConfig + from agents_shipgate.schemas.manifest import BUILTIN_TOOL_SOURCE_TYPES - allowed = set(ToolSourceConfig.model_fields["type"].annotation.__args__) + allowed = set(BUILTIN_TOOL_SOURCE_TYPES) for archetype, ctx in ctx_mod.ARCHETYPE_CONTEXTS.items(): for ts in ctx.tool_sources: assert ts.type in allowed, ( diff --git a/tests/test_adapter_entry_point_discovery.py b/tests/test_adapter_entry_point_discovery.py new file mode 100644 index 00000000..5ffbfcc8 --- /dev/null +++ b/tests/test_adapter_entry_point_discovery.py @@ -0,0 +1,1447 @@ +"""Tests for v0.20 third-party adapter discovery + validation. + +Mirrors ``tests/test_plugin_validation.py`` shape: each load-time gate +gets a dedicated test that synthesizes an entry-point and asserts the +resulting ``loaded_adapters[]`` row. Plus end-to-end tests covering: + +- gating by ``AGENTS_SHIPGATE_ENABLE_PLUGINS`` env / ``plugins_enabled`` override +- gating by ``--no-plugins`` (override=False forces off) +- valid third-party adapter end-to-end: registered, invoked, surfaced + in ``loaded_adapters[]`` with status ``valid`` +- ``--strict-plugins`` exits non-zero when an adapter fails validation +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, ClassVar, Literal + +import pytest + +from agents_shipgate.cli.scan import run_scan +from agents_shipgate.inputs import protocol as protocol_module +from agents_shipgate.inputs.adapter_validation import ( + BAD_PROTOCOL, + BAD_SCOPE, + LOAD_FAILED, + SOURCE_TYPE_COLLISION, + VALID, + LoadedAdapter, + run_validated_adapter, + strict_adapter_failure_messages, + validate_adapter_entry_point, +) +from agents_shipgate.inputs.protocol import ( + REGISTRY, + AdapterRegistry, + LoadedAdapterResult, + discover_third_party_adapters, +) +from agents_shipgate.schemas.manifest import ( + AgentsShipgateManifest, + ToolSourceConfig, +) + +CLEAN_FIXTURE = Path("samples/clean_read_only_agent/shipgate.yaml") + + +def _extract_agent_mode_error(output: str) -> dict | None: + """Pull the agent-mode JSON error line out of mixed stdout/stderr. + + Agent-mode emits one JSON object per error to stderr. The + Typer ``CliRunner`` merges streams, so we scan line-by-line for + a JSON object whose top-level ``error`` field is set. + """ + + for line in output.splitlines(): + line = line.strip() + if not line.startswith("{"): + continue + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(payload, dict) and "error" in payload: + return payload + return None + + +def _write_unknown_adapter_manifest(tmp_path: Path) -> Path: + workspace = tmp_path / "workspace" + workspace.mkdir() + (workspace / "tools.json").write_text('{"tools": []}', encoding="utf-8") + manifest_path = workspace / "shipgate.yaml" + manifest_path.write_text( + """version: "0.1" +project: + name: unknown-adapter-demo +agent: + name: demo + declared_purpose: ["test unknown adapter routing"] +environment: + target: local +tool_sources: + - id: src + type: not_a_real_adapter_type + path: tools.json +""", + encoding="utf-8", + ) + return manifest_path + + +# --- entry-point synthesis ------------------------------------------------- + + +class _Dist: + metadata = {"Name": "test-adapter-dist"} + version = "1.2.3" + + +def _entry_point( + load_fn: Any, + *, + name: str = "third-party-adapter", + value: str = "third_party_pkg.module:Adapter", + dist: Any = None, +) -> Any: + """Synthesize an entry-point object suitable for monkeypatching + ``importlib.metadata.entry_points``. + + Matches the shape we use in ``test_plugin_validation.py``: a + plain object with name/value/dist/load attributes. + """ + + class EP: + pass + + EP.name = name + EP.value = value + EP.dist = dist if dist is not None else _Dist() + EP.load = staticmethod(load_fn) + return EP() + + +def _patch_adapter_entries(monkeypatch, entries: list[Any]) -> None: + """Monkeypatch the adapter entry-points lookup + enable plugins.""" + + monkeypatch.setenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", "1") + + def fake_entry_points(*, group: str) -> list[Any]: + if group == protocol_module.ADAPTER_ENTRY_POINT_GROUP: + return entries + return [] + + monkeypatch.setattr(protocol_module, "entry_points", fake_entry_points) + + +@pytest.fixture(autouse=True) +def _populate_registry(monkeypatch): + """v0.20 PR #111 review fix: discovery no longer mutates the + global ``REGISTRY``. The autouse fixture only triggers lazy + population so subsequent tests see a stable builtin set; no + cleanup is needed because ``_load_inputs`` operates on a per-scan + ``REGISTRY.clone()`` and third-party additions land there, never + on the global. + """ + + REGISTRY._ensure_populated() + yield + + +# --- valid adapter shapes (positive controls) ------------------------------ + + +class _ThirdPartyAdapter: + """A valid third-party adapter that doesn't collide with builtins.""" + + source_type: ClassVar[str] = "third_party_demo" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load( + self, + source: ToolSourceConfig | None, + base_dir: Path, + manifest: AgentsShipgateManifest, + ) -> LoadedAdapterResult: + return LoadedAdapterResult() + + +def test_valid_third_party_adapter_lands_in_loaded_adapters(monkeypatch, tmp_path): + """End-to-end: a valid third-party adapter (class or instance) is + discovered, registered, and shows up in ``report.loaded_adapters[]`` + with ``validation_status == "valid"``. + """ + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _ThirdPartyAdapter)] + ) + + report, exit_code = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + + assert exit_code == 0 + assert len(report.loaded_adapters) == 1 + record = report.loaded_adapters[0] + assert record["validation_status"] == VALID + assert record["source_type"] == "third_party_demo" + assert record["validation_errors"] == [] + assert record["runtime_errors"] == [] + assert record["distribution"] == "test-adapter-dist" + assert record["version"] == "1.2.3" + # v0.20 PR #111 review fix: the per-scan registry holds the + # third-party adapter for the duration of this scan, but the + # global REGISTRY stays builtin-only across the process. A later + # ``--no-plugins`` scan sees a clean slate. + assert "third_party_demo" not in REGISTRY._adapters, ( + "global REGISTRY must stay builtin-only — discovery should " + "mutate a per-scan clone, not the global" + ) + + +def test_adapter_instance_value_is_accepted(monkeypatch, tmp_path): + """``entry_point.load()`` may return either an instance or a class. + The validator handles both shapes. + """ + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _ThirdPartyAdapter())] + ) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == VALID + + +# --- gate 1: load_failed --------------------------------------------------- + + +def test_gate_load_failure(monkeypatch, tmp_path): + def boom(): + raise ImportError("synthetic broken adapter module") + + _patch_adapter_entries(monkeypatch, [_entry_point(boom)]) + + report, exit_code = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + + assert exit_code == 0, "lenient default must continue on adapter failures" + assert len(report.loaded_adapters) == 1 + record = report.loaded_adapters[0] + assert record["validation_status"] == LOAD_FAILED + assert any("synthetic broken adapter module" in err for err in record["validation_errors"]) + assert record["source_type"] is None + + +# --- gate 2: bad_protocol -------------------------------------------------- + + +def test_gate_bad_protocol_missing_source_type(monkeypatch, tmp_path): + class _NoSourceType: + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_source" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _NoSourceType)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_PROTOCOL + assert any("source_type" in err for err in record["validation_errors"]) + + +def test_gate_bad_protocol_empty_source_type(monkeypatch, tmp_path): + class _EmptySourceType: + source_type: ClassVar[str] = "" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_source" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _EmptySourceType)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_PROTOCOL + + +def test_gate_bad_protocol_load_not_callable(monkeypatch, tmp_path): + class _NotCallableLoad: + source_type: ClassVar[str] = "bad_load" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_source" + artifact_class: ClassVar[type | None] = None + load = "not a method" # type: ignore[assignment] + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _NotCallableLoad)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_PROTOCOL + assert any("load" in err for err in record["validation_errors"]) + + +# --- gate 3: bad_scope ----------------------------------------------------- + + +def test_gate_bad_scope(monkeypatch, tmp_path): + class _BadScope: + source_type: ClassVar[str] = "bad_scope_demo" + scope: ClassVar[Any] = "whenever_i_feel_like_it" # not in {per_source, per_scan} + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _BadScope)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_SCOPE + assert any("scope" in err for err in record["validation_errors"]) + + +# --- gate 4: source_type_collision (the load-bearing trust rule) ----------- + + +@pytest.mark.parametrize("builtin_source_type", ["mcp", "openapi", "langchain", "validation"]) +def test_gate_source_type_collision_with_builtin( + monkeypatch, tmp_path, builtin_source_type +): + """A third-party adapter MUST NOT shadow a built-in source_type. + + This is the load-bearing trust rule — without it, a malicious + plugin could displace ``mcp`` (or any other) and intercept every + scan that loads an MCP source. + """ + + @dataclass + class _Collider: + source_type: ClassVar[str] = builtin_source_type + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_source" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _Collider)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == SOURCE_TYPE_COLLISION + assert any(builtin_source_type in err for err in record["validation_errors"]) + assert any("reserved by a built-in" in err for err in record["validation_errors"]) + + +def test_gate_source_type_collision_between_third_parties(monkeypatch, tmp_path): + """Two third-party adapters cannot claim the same source_type.""" + + class _Twin1: + source_type: ClassVar[str] = "twin" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult() + + class _Twin2(_Twin1): + pass + + _patch_adapter_entries( + monkeypatch, + [ + _entry_point(lambda: _Twin1, name="first", value="pkg:T1"), + _entry_point(lambda: _Twin2, name="second", value="pkg:T2"), + ], + ) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + assert len(report.loaded_adapters) == 2 + statuses = sorted(rec["validation_status"] for rec in report.loaded_adapters) + assert statuses == [SOURCE_TYPE_COLLISION, VALID] + collider = next( + rec for rec in report.loaded_adapters + if rec["validation_status"] == SOURCE_TYPE_COLLISION + ) + assert any( + "already registered by another third-party adapter" in err + for err in collider["validation_errors"] + ) + + +# --- gating: AGENTS_SHIPGATE_ENABLE_PLUGINS + --no-plugins ---------------- + + +def test_discovery_disabled_by_default(monkeypatch, tmp_path): + """Without the env var set, discovery is a no-op.""" + + monkeypatch.delenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", raising=False) + + def fake_entry_points(*, group: str) -> list[Any]: + if group == protocol_module.ADAPTER_ENTRY_POINT_GROUP: + return [_entry_point(lambda: _ThirdPartyAdapter)] + return [] + + monkeypatch.setattr(protocol_module, "entry_points", fake_entry_points) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + assert report.loaded_adapters == [], ( + "discovery must be opt-in via AGENTS_SHIPGATE_ENABLE_PLUGINS" + ) + assert "third_party_demo" not in REGISTRY._adapters + + +def test_no_plugins_flag_overrides_env(monkeypatch, tmp_path): + """``--no-plugins`` (plugins_enabled=False) forces discovery off + even when the env var is set. + """ + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _ThirdPartyAdapter)] + ) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + plugins_enabled=False, + ) + assert report.loaded_adapters == [] + assert "third_party_demo" not in REGISTRY._adapters + + +# --- review-fix regression tests (PR #111 P1 #1, P1 #2, P1 #3) ------------ + + +class _RecordingAdapter: + """A per_scan third-party adapter that records every load() call, + so tests can prove the adapter ran (or didn't) across multiple + scans within the same process. + """ + + source_type: ClassVar[str] = "recording_demo" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + invocations: ClassVar[int] = 0 + + @classmethod + def reset(cls) -> None: + cls.invocations = 0 + + def load( + self, + source: ToolSourceConfig | None, + base_dir: Path, + manifest: AgentsShipgateManifest, + ) -> LoadedAdapterResult: + type(self).invocations += 1 + return LoadedAdapterResult() + + +def test_no_plugins_disables_third_party_after_prior_enabled_scan( + monkeypatch, tmp_path +): + """**Regression for PR #111 P1 #1.** A second in-process scan + with ``plugins_enabled=False`` must NOT execute a third-party + adapter discovered by the first scan. + + Pre-fix: the first scan registered the adapter into the global + ``REGISTRY``. The second scan skipped discovery but the + dispatcher still resolved the adapter from the polluted + global. We assert both behaviors are now correct: + + - second-scan ``loaded_adapters == []`` (already worked pre-fix) + - second-scan adapter ``invocations`` count does NOT increase + (the actual regression — pre-fix this WAS incremented) + """ + + _RecordingAdapter.reset() + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _RecordingAdapter)]) + + # Scan 1: plugins enabled → adapter discovered, registered (per-scan), + # and invoked once via pass-2 per_scan loop. + report1, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path / "scan1", + formats=["json"], + ci_mode="advisory", + ) + assert len(report1.loaded_adapters) == 1 + assert report1.loaded_adapters[0]["validation_status"] == VALID + assert _RecordingAdapter.invocations == 1 + + # Scan 2: same process, plugins disabled. Discovery is a no-op + # AND the dispatcher must not see the adapter from scan 1. + report2, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path / "scan2", + formats=["json"], + ci_mode="advisory", + plugins_enabled=False, + ) + assert report2.loaded_adapters == [], ( + "scan 2 with plugins_enabled=False must report an empty " + "loaded_adapters[]" + ) + assert _RecordingAdapter.invocations == 1, ( + "scan 2 must NOT execute the third-party adapter registered " + "by scan 1; global REGISTRY pollution would have re-run it" + ) + assert "recording_demo" not in REGISTRY._adapters + + +def test_second_scan_does_not_misclassify_third_party_as_collision( + monkeypatch, tmp_path +): + """**Regression for PR #111 P1 #2.** A stable third-party adapter + discovered across two consecutive scans must produce + ``validation_status == "valid"`` on BOTH scans, not collide with + its own registration from scan one. + + Pre-fix: scan 2's collision set was ``REGISTRY._adapters.keys()`` + which already contained the scan-1 adapter, so scan 2 reported + ``source_type_collision`` while the dispatcher still executed the + stale registered adapter. Both report truthfulness and + ``--strict-plugins`` were broken. + """ + + _RecordingAdapter.reset() + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _RecordingAdapter)]) + + report1, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path / "scan1", + formats=["json"], + ci_mode="advisory", + ) + report2, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path / "scan2", + formats=["json"], + ci_mode="advisory", + ) + + assert report1.loaded_adapters[0]["validation_status"] == VALID + assert report2.loaded_adapters[0]["validation_status"] == VALID, ( + "second scan must classify the same adapter as valid, not " + "as source_type_collision against itself" + ) + assert _RecordingAdapter.invocations == 2, ( + "adapter should have run exactly once per scan" + ) + + +class _PerSourceThirdPartyAdapter: + """A per_source third-party adapter — proves manifests can + reference custom source types after PR #111 P1 #3. + """ + + source_type: ClassVar[str] = "demo_source" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_source" + artifact_class: ClassVar[type | None] = None + + invocations: ClassVar[int] = 0 + + @classmethod + def reset(cls) -> None: + cls.invocations = 0 + + def load( + self, + source: ToolSourceConfig | None, + base_dir: Path, + manifest: AgentsShipgateManifest, + ) -> LoadedAdapterResult: + type(self).invocations += 1 + # Minimal valid LoadedAdapterResult; the per_source contract + # returns at least one LoadedToolSource so the dispatcher can + # absorb it without error. + from agents_shipgate.core.domain import LoadedToolSource + + return LoadedAdapterResult( + tool_sources=[ + LoadedToolSource( + source_id=source.id if source else "demo", + source_type="demo_source", + tools=[], + warnings=[], + ) + ], + ) + + +def test_per_source_third_party_adapter_referenced_from_manifest( + monkeypatch, tmp_path +): + """**Regression for PR #111 P1 #3.** A manifest referencing a + third-party per_source adapter via ``tool_sources[].type`` must + load successfully. Pre-fix ``ToolSourceConfig.type`` was a closed + ``Literal`` of built-in source types, so manifest validation + rejected the custom type before discovery ran. + """ + + _PerSourceThirdPartyAdapter.reset() + + # Build a minimal manifest referencing the third-party source type. + workspace = tmp_path / "workspace" + workspace.mkdir() + (workspace / "tools.json").write_text('{"tools": []}', encoding="utf-8") + manifest_path = workspace / "shipgate.yaml" + manifest_path.write_text( + """version: "0.1" +project: + name: third-party-demo +agent: + name: demo + declared_purpose: ["test third-party per_source adapter"] +environment: + target: local +tool_sources: + - id: demo + type: demo_source + path: tools.json +""", + encoding="utf-8", + ) + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _PerSourceThirdPartyAdapter)] + ) + + report, exit_code = run_scan( + config_path=manifest_path, + output_dir=tmp_path / "out", + formats=["json"], + ci_mode="advisory", + ) + + assert exit_code == 0, ( + f"scan should succeed with a third-party per_source adapter; " + f"got exit={exit_code}" + ) + assert _PerSourceThirdPartyAdapter.invocations == 1 + assert len(report.loaded_adapters) == 1 + assert report.loaded_adapters[0]["validation_status"] == VALID + assert report.loaded_adapters[0]["source_type"] == "demo_source" + + +# --- review-fix: tighter signature validation (PR #111 P2 #4) ------------- + + +def test_gate_bad_protocol_load_too_few_positional(monkeypatch, tmp_path): + """**Regression for PR #111 P2 #4.** A ``load`` method that + accepts fewer than 3 positional arguments must fail at the gate, + not crash the dispatcher at runtime. + """ + + class _OnlyOnePositional: + source_type: ClassVar[str] = "too_few_args" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source): # type: ignore[override] + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _OnlyOnePositional)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_PROTOCOL + assert any( + "at least 3 positional" in err for err in record["validation_errors"] + ), record["validation_errors"] + + +def test_gate_bad_protocol_load_required_keyword_only(monkeypatch, tmp_path): + """**Regression for PR #111 P2 #4.** Required keyword-only + parameters must fail at the gate. The dispatcher calls + ``load(source, base_dir, manifest)`` with no kwargs. + """ + + class _RequiredKwOnly: + source_type: ClassVar[str] = "kw_only_required" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest, *, must_set): # type: ignore[override] + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _RequiredKwOnly)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == BAD_PROTOCOL + assert any( + "keyword-only" in err and "must_set" in err + for err in record["validation_errors"] + ), record["validation_errors"] + + +def test_gate_bad_protocol_accepts_var_positional(monkeypatch, tmp_path): + """An adapter using ``*args`` to absorb the three positional + arguments is valid (it CAN bind 3 args, even though no required + positional slots are declared). + """ + + class _VarPositional: + source_type: ClassVar[str] = "var_args_demo" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, *args): # type: ignore[override] + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _VarPositional)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == VALID, record["validation_errors"] + + +def test_gate_bad_protocol_accepts_optional_keyword_only(monkeypatch, tmp_path): + """Optional keyword-only parameters (with defaults) don't break + the dispatcher's call shape; the gate accepts them. + """ + + class _OptionalKwOnly: + source_type: ClassVar[str] = "optional_kw_demo" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest, *, opt=None): # type: ignore[override] + return LoadedAdapterResult() + + _patch_adapter_entries(monkeypatch, [_entry_point(lambda: _OptionalKwOnly)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + record = report.loaded_adapters[0] + assert record["validation_status"] == VALID, record["validation_errors"] + + +# --- isolated unit tests on the validator (no scan needed) ---------------- + + +def test_validate_entry_point_rejects_uninstantiable_class(): + """A class that can't be instantiated with zero args fails the + bad_protocol gate at instantiation time (before checking attrs). + """ + + class _Picky: + def __init__(self, required_arg): + pass + + record = validate_adapter_entry_point( + _entry_point(lambda: _Picky), + builtin_source_types=set(), + already_registered_source_types=set(), + ) + assert record.info["validation_status"] == BAD_PROTOCOL + assert any( + "could not be instantiated" in err + for err in record.info["validation_errors"] + ) + + +def test_discover_writes_to_loaded_adapters_list(monkeypatch): + """The ``loaded_adapters`` out-parameter list is appended to in + discovery order, matching the entry-points iteration order. + """ + + monkeypatch.setenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", "1") + monkeypatch.setattr( + protocol_module, + "entry_points", + lambda *, group: ( + [ + _entry_point(lambda: _ThirdPartyAdapter, name="alpha", value="pkg:A"), + ] + if group == protocol_module.ADAPTER_ENTRY_POINT_GROUP + else [] + ), + ) + + registry = AdapterRegistry(autopopulate=False) + loaded: list[dict[str, Any]] = [] + records = discover_third_party_adapters( + registry, + plugins_enabled=True, + loaded_adapters=loaded, + ) + assert len(records) == 1 + assert loaded[0]["name"] == "alpha" + assert loaded[0]["source_type"] == "third_party_demo" + assert loaded[0]["validation_status"] == VALID + + +# --- strict_adapter_failure_messages + --strict-plugins ------------------- + + +def test_strict_failure_messages_collects_validation_errors(): + rows = [ + { + "name": "n", + "value": "v", + "validation_status": LOAD_FAILED, + "validation_errors": ["entry_point.load() raised: ImportError(...)"], + "runtime_errors": [], + }, + { + "name": "good", + "value": "good:pkg", + "validation_status": VALID, + "validation_errors": [], + "runtime_errors": [], + }, + ] + messages = strict_adapter_failure_messages(rows) + assert len(messages) == 1 + assert "adapter 'v'" in messages[0] + assert "ImportError" in messages[0] + + +def test_strict_plugins_exits_nonzero_on_adapter_failure(monkeypatch, tmp_path): + """v0.20: ``--strict-plugins`` extends to adapter validation + failures via ``strict_adapter_failure_messages``. A failing + third-party adapter elevates the exit code to 4 even when the + scan itself would have exited 0. + """ + + # Drive through the public CLI to exercise the _apply_strict_plugins + # path; only that path actually surfaces adapter messages. + from typer.testing import CliRunner + + from agents_shipgate.cli.main import app + + def boom(): + raise ImportError("synthetic broken third-party adapter") + + monkeypatch.setenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", "1") + monkeypatch.setattr( + protocol_module, + "entry_points", + lambda *, group: ( + [_entry_point(boom)] + if group == protocol_module.ADAPTER_ENTRY_POINT_GROUP + else [] + ), + ) + + # Newer Typer CliRunner doesn't accept ``mix_stderr``; output is the + # combined stdout+stderr stream when the underlying click runner + # merges them. That's fine for substring assertion. + runner = CliRunner() + result = runner.invoke( + app, + [ + "scan", + "--config", + str(CLEAN_FIXTURE), + "--out", + str(tmp_path), + "--ci-mode", + "advisory", + "--strict-plugins", + ], + ) + assert result.exit_code == 4, ( + f"expected --strict-plugins exit 4, got {result.exit_code}; " + f"output={result.output!r}" + ) + assert "adapter" in result.output + assert "synthetic broken third-party adapter" in result.output + + +# --- run_validated_adapter (runtime safety net) --------------------------- + + +def test_run_validated_adapter_captures_exceptions(): + class _Crashy: + source_type: ClassVar[str] = "crashy" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + raise RuntimeError("simulated crash inside adapter.load()") + + loaded = LoadedAdapter( + adapter=_Crashy(), + info={ + "name": "crashy", + "value": "pkg:Crashy", + "validation_status": VALID, + "validation_errors": [], + "runtime_errors": [], + "source_type": "crashy", + }, + ) + result = run_validated_adapter( + loaded, source=None, base_dir=Path("."), manifest=None + ) + assert result is None + assert any( + "simulated crash inside adapter.load()" in err + for err in loaded.runtime_errors + ) + assert loaded.info["runtime_errors"] == loaded.runtime_errors + + +def test_run_validated_adapter_rejects_wrong_return_type(): + class _BadReturn: + source_type: ClassVar[str] = "badreturn" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load(self, source, base_dir, manifest): + return "not a LoadedAdapterResult" + + loaded = LoadedAdapter( + adapter=_BadReturn(), + info={ + "name": "badreturn", + "value": "pkg:BadReturn", + "validation_status": VALID, + "validation_errors": [], + "runtime_errors": [], + "source_type": "badreturn", + }, + ) + result = run_validated_adapter( + loaded, source=None, base_dir=Path("."), manifest=None + ) + assert result is None + assert any( + "LoadedAdapterResult" in err for err in loaded.runtime_errors + ) + + +def test_run_validated_adapter_rejects_smuggled_artifact(): + """Load-bearing rule: an adapter declaring ``artifact_class=X`` + must not return an artifact of any other type. This is the + artifact-smuggling-prevention rule that mirrors the + Finding.check_id smuggling rule in plugin_validation. + """ + + class _ExpectedArtifact: + pass + + class _OtherArtifact: + pass + + class _Smuggler: + source_type: ClassVar[str] = "smuggler" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = _ExpectedArtifact + + def load(self, source, base_dir, manifest): + return LoadedAdapterResult(artifact=_OtherArtifact()) + + loaded = LoadedAdapter( + adapter=_Smuggler(), + info={ + "name": "smuggler", + "value": "pkg:Smuggler", + "validation_status": VALID, + "validation_errors": [], + "runtime_errors": [], + "source_type": "smuggler", + }, + ) + result = run_validated_adapter( + loaded, source=None, base_dir=Path("."), manifest=None + ) + assert result is None + assert any( + "_OtherArtifact" in err and "_ExpectedArtifact" in err + for err in loaded.runtime_errors + ) + + +# --- PR #111 follow-up review (round 3) ---------------------------------- + + +class _RuntimeCrashingAdapter: + """A third-party per_scan adapter that raises at runtime. + + Pre-fix: ``adapter.load()`` was called directly in the dispatcher, + so the exception propagated, ``run_scan`` aborted with the raw + ``RuntimeError``, and ``--strict-plugins`` never got a chance to + inspect ``loaded_adapters[].runtime_errors``. + """ + + source_type: ClassVar[str] = "runtime_crash_demo" + scope: ClassVar[Literal["per_source", "per_scan"]] = "per_scan" + artifact_class: ClassVar[type | None] = None + + def load( + self, + source: ToolSourceConfig | None, + base_dir: Path, + manifest: AgentsShipgateManifest, + ) -> LoadedAdapterResult: + raise RuntimeError("simulated runtime crash inside adapter.load()") + + +def test_runtime_error_in_third_party_adapter_captured_not_propagated( + monkeypatch, tmp_path +): + """**Regression for PR #111 review follow-up P1 #1.** A + third-party adapter that raises at runtime must NOT abort + ``run_scan``. The dispatcher routes its ``load()`` through + ``run_validated_adapter``, which captures the exception into + ``loaded_adapters[].runtime_errors``. The scan completes; the + report is emitted; ``--strict-plugins`` (when set) exits 4. + """ + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _RuntimeCrashingAdapter)] + ) + + report, exit_code = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + ) + + # Scan must succeed; report must be emitted. + assert exit_code == 0, ( + f"adapter runtime crash must not abort the scan in lenient " + f"mode; got exit_code={exit_code}" + ) + assert len(report.loaded_adapters) == 1 + record = report.loaded_adapters[0] + # Adapter passed all four load-time gates. + assert record["validation_status"] == VALID + # Runtime error captured, not propagated. + assert any( + "simulated runtime crash inside adapter.load()" in err + for err in record["runtime_errors"] + ), f"runtime_errors not captured: {record['runtime_errors']!r}" + + +def test_strict_plugins_exits_on_third_party_adapter_runtime_error( + monkeypatch, tmp_path +): + """End-to-end: ``--strict-plugins`` elevates exit code 4 when a + third-party adapter raises at runtime — not just when it fails + validation. Closes the loop on the previously-bypassed contract. + """ + + from typer.testing import CliRunner + + from agents_shipgate.cli.main import app + + monkeypatch.setenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", "1") + monkeypatch.setattr( + protocol_module, + "entry_points", + lambda *, group: ( + [_entry_point(lambda: _RuntimeCrashingAdapter)] + if group == protocol_module.ADAPTER_ENTRY_POINT_GROUP + else [] + ), + ) + + runner = CliRunner() + result = runner.invoke( + app, + [ + "scan", + "--config", + str(CLEAN_FIXTURE), + "--out", + str(tmp_path), + "--ci-mode", + "advisory", + "--strict-plugins", + ], + ) + assert result.exit_code == 4, ( + f"expected --strict-plugins exit 4 on adapter runtime error, " + f"got {result.exit_code}; output={result.output!r}" + ) + assert "adapter" in result.output + assert "simulated runtime crash inside adapter.load()" in result.output + + +def test_doctor_resolves_third_party_source_types(monkeypatch, tmp_path): + """**Regression for PR #111 review follow-up P1 #2.** ``doctor`` + (``inspect_sources``) must run third-party adapter discovery so a + manifest referencing ``tool_sources[].type: demo_source`` can be + introspected, not crash with ``ConfigError: No adapter + registered``. + """ + + from agents_shipgate.cli.scan import inspect_sources + from agents_shipgate.core.domain import LoadedToolSource as _LTS + + _patch_adapter_entries( + monkeypatch, [_entry_point(lambda: _PerSourceThirdPartyAdapter)] + ) + _PerSourceThirdPartyAdapter.reset() + + workspace = tmp_path / "doctor_workspace" + workspace.mkdir() + (workspace / "tools.json").write_text('{"tools": []}', encoding="utf-8") + manifest_path = workspace / "shipgate.yaml" + manifest_path.write_text( + """version: "0.1" +project: + name: doctor-third-party-demo +agent: + name: demo + declared_purpose: ["doctor inspects third-party per_source adapter"] +environment: + target: local +tool_sources: + - id: demo + type: demo_source + path: tools.json +""", + encoding="utf-8", + ) + + payload = inspect_sources(config_path=manifest_path) + + # Doctor returns a payload — no ConfigError raised. + assert payload["project"] == "doctor-third-party-demo" + # The third-party adapter was invoked and the source ID surfaces + # in the sources list. + sources = payload["sources"] + assert any(s["id"] == "demo" for s in sources), ( + f"doctor's sources list missing the third-party demo source: " + f"{sources!r}" + ) + # Adapter discovery results are surfaced in the payload so an + # operator can see what was loaded without running a full scan. + loaded = payload["loaded_adapters"] + assert len(loaded) == 1 + assert loaded[0]["source_type"] == "demo_source" + assert loaded[0]["validation_status"] == VALID + # And the adapter actually ran (per-source load is called from + # _load_sources pass 1). + assert _PerSourceThirdPartyAdapter.invocations == 1 + # Pin the no-warnings invariant — discovery must NOT have emitted + # warnings for a clean third-party load. ``_LTS`` import is kept + # solely to avoid an "unused import" lint failure under + # ``from __future__ import annotations``. + assert _LTS is not None + assert payload["warnings"] == [], ( + f"unexpected warnings for clean third-party load: " + f"{payload['warnings']!r}" + ) + + +def test_unknown_adapter_source_type_routes_to_install_enable_diagnostic( + monkeypatch, tmp_path +): + """**Regression for PR #111 review follow-up P2 #5.** A manifest + referencing an unknown ``tool_sources[].type`` must route the + agent to install / enable / fix-typo next_actions, NOT the legacy + "edit shipgate.yaml" advice from + ``diagnose_invalid_manifest``. + + Pre-fix: ``AdapterRegistry.require`` raised ``ConfigError`` with + the contributor-facing message "Add the adapter to + _register_builtin_adapters()", and ``_diagnose_config_error`` + fell through to ``diagnose_invalid_manifest`` → emit + SHIP-DIAG-INVALID-MANIFEST → next_action "edit shipgate.yaml". + Wrong for the v0.20 public extension flow. + + Post-fix: ``_maybe_diagnose_unknown_adapter`` pattern-matches the + require() error prefix and emits + SHIP-DIAG-UNKNOWN-ADAPTER-SOURCE-TYPE with three next_actions + (enable plugins, install adapter, fix typo). + """ + + from typer.testing import CliRunner + + from agents_shipgate.cli.main import app + + # Plugins disabled: should route to the "enable + install + typo" + # next_action set. + monkeypatch.delenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", raising=False) + # Agent-mode JSON output is gated by env var, not a CLI flag. + monkeypatch.setenv("AGENTS_SHIPGATE_AGENT_MODE", "1") + + manifest_path = _write_unknown_adapter_manifest(tmp_path) + + runner = CliRunner() + result = runner.invoke( + app, + [ + "scan", + "--config", + str(manifest_path), + "--out", + str(tmp_path / "out"), + "--ci-mode", + "advisory", + ], + ) + # Exit 2 = ConfigError, as before. + assert result.exit_code == 2, ( + f"expected ConfigError exit 2, got {result.exit_code}; " + f"output={result.output!r}" + ) + output = result.output + # The agent-mode JSON's ``next_action`` (legacy single-string) and + # ``next_actions[0]`` (structured rank-1) must point at the + # third-party adapter remediation path, NOT the legacy + # "edit shipgate.yaml" advice that ``diagnose_invalid_manifest`` + # produces. Specifically: a command-kind action that enables + # plugin discovery is the right rank-1 when plugins are disabled. + payload = _extract_agent_mode_error(output) + assert payload is not None, ( + "agent-mode error JSON must be emitted; raw output:\n" + output + ) + assert payload["error"] == "config_error" + assert "not_a_real_adapter_type" in payload["message"], ( + "error message must echo the offending source_type" + ) + assert "AGENTS_SHIPGATE_ENABLE_PLUGINS" in payload["message"], ( + "error message must mention enabling plugin discovery" + ) + rank1 = payload["next_actions"][0] + assert rank1["kind"] == "command", ( + f"rank-1 next_action must be a command (enable plugins), " + f"not 'edit shipgate.yaml'; got kind={rank1['kind']!r}, " + f"command={rank1.get('command')!r}" + ) + assert ( + "AGENTS_SHIPGATE_ENABLE_PLUGINS=1" in rank1["command"] + ), ( + "rank-1 next_action.command must enable plugin discovery; " + f"got {rank1.get('command')!r}" + ) + # And the rationale clearly mentions the third-party adapter path, + # not "edit YAML". + assert "third-party adapter discovery" in rank1["why"], ( + f"rank-1 next_action.why must explain the third-party " + f"discovery path; got {rank1.get('why')!r}" + ) + + +def test_unknown_adapter_scan_no_plugins_override_routes_to_enable( + monkeypatch, tmp_path +): + """Explicit ``--no-plugins`` must control unknown-source diagnostics + even when the environment enables plugin discovery. + + Pre-fix: the scan correctly disabled discovery via + ``plugins_enabled=False``, but ``AdapterRegistry.require`` and the + diagnostic classifier recomputed state from the env only. With + ``AGENTS_SHIPGATE_ENABLE_PLUGINS=1`` plus ``--no-plugins``, rank-1 + incorrectly became ``pip install `` + instead of telling the user to re-enable discovery. + """ + + from typer.testing import CliRunner + + from agents_shipgate.cli.main import app + + monkeypatch.setenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", "1") + monkeypatch.setenv("AGENTS_SHIPGATE_AGENT_MODE", "1") + manifest_path = _write_unknown_adapter_manifest(tmp_path) + + result = CliRunner().invoke( + app, + [ + "scan", + "--config", + str(manifest_path), + "--out", + str(tmp_path / "out"), + "--ci-mode", + "advisory", + "--no-plugins", + ], + ) + + assert result.exit_code == 2, result.output + payload = _extract_agent_mode_error(result.output) + assert payload is not None, result.output + assert "AGENTS_SHIPGATE_ENABLE_PLUGINS" in payload["message"] + assert "removing `--no-plugins`" in payload["message"] + rank1 = payload["next_actions"][0] + assert rank1["kind"] == "command", rank1 + assert "AGENTS_SHIPGATE_ENABLE_PLUGINS=1" in rank1["command"] + + +def test_unknown_adapter_doctor_uses_install_enable_diagnostic( + monkeypatch, tmp_path +): + """``doctor`` must use the same unknown-adapter diagnostic as scan.""" + + from typer.testing import CliRunner + + from agents_shipgate.cli.main import app + + monkeypatch.delenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", raising=False) + monkeypatch.setenv("AGENTS_SHIPGATE_AGENT_MODE", "1") + manifest_path = _write_unknown_adapter_manifest(tmp_path) + + result = CliRunner().invoke( + app, + ["doctor", "--config", str(manifest_path), "--json"], + ) + + assert result.exit_code == 2, result.output + payload = _extract_agent_mode_error(result.output) + assert payload is not None, result.output + rank1 = payload["next_actions"][0] + assert rank1["kind"] == "command", ( + f"doctor must not route unknown third-party adapter types to " + f"edit shipgate.yaml; got {rank1!r}" + ) + assert "AGENTS_SHIPGATE_ENABLE_PLUGINS=1" in rank1["command"] + + +def test_require_error_message_is_user_facing(monkeypatch): + """**Regression for PR #111 review follow-up P2 #5.** The raw + ``ConfigError`` message from ``AdapterRegistry.require`` must be + actionable for a user — not the previous "add to + ``_register_builtin_adapters``" contributor-facing text. + """ + + from agents_shipgate.core.errors import ConfigError + from agents_shipgate.inputs.protocol import REGISTRY + + monkeypatch.delenv("AGENTS_SHIPGATE_ENABLE_PLUGINS", raising=False) + REGISTRY._ensure_populated() + + with pytest.raises(ConfigError) as excinfo: + REGISTRY.require("not_a_real_adapter_type") + + message = str(excinfo.value) + assert "not_a_real_adapter_type" in message + # Must NOT mention the contributor-facing helper anymore. + assert "_register_builtin_adapters" not in message, ( + "require() error must be user-facing in v0.20; the " + "_register_builtin_adapters hint is contributor-only" + ) + # Must mention the three real remediation paths. + assert "AGENTS_SHIPGATE_ENABLE_PLUGINS" in message + assert "typo" in message.lower() + + +def test_markdown_report_renders_loaded_adapters_section( + monkeypatch, tmp_path +): + """**Regression for PR #111 review follow-up P2 #3.** The + Markdown report (``report.md``) must include a ``Loaded + Adapters`` section listing each third-party adapter and its + validation status. A ``load_failed`` adapter is shown alongside + its validation_errors so reviewers don't have to open + ``report.json`` to see it. + """ + + def boom(): + raise ImportError("simulated broken adapter for markdown test") + + _patch_adapter_entries(monkeypatch, [_entry_point(boom)]) + + report, _ = run_scan( + config_path=CLEAN_FIXTURE, + output_dir=tmp_path, + formats=["json", "markdown"], + ci_mode="advisory", + ) + + # The JSON contains the load_failed row (already proven by an + # earlier test); now confirm the markdown does too. Note: the + # ``_safe_markdown_text`` helper escapes underscores so + # ``load_failed`` renders as ``load\_failed`` in the raw bytes + # (and as ``load_failed`` in any markdown viewer). The assertions + # below accept both forms so a future change to the escaping + # rules doesn't make the test brittle. + md_path = tmp_path / "report.md" + assert md_path.exists(), "report.md was not emitted" + md = md_path.read_text(encoding="utf-8") + assert "## Loaded Adapters" in md, ( + "Markdown report must include a Loaded Adapters section; " + "got:\n" + md[:2000] + ) + assert "load_failed" in md or "load\\_failed" in md, ( + "Markdown report must show validation_status of failed " + f"adapters; got:\n{md[:2000]}" + ) + assert "simulated broken adapter for markdown test" in md, ( + "Markdown report must show the validation_error so the " + "reviewer can act on it without opening report.json" + ) diff --git a/tests/test_adapter_registry.py b/tests/test_adapter_registry.py index 15d93954..9eeb0d95 100644 --- a/tests/test_adapter_registry.py +++ b/tests/test_adapter_registry.py @@ -11,7 +11,7 @@ from dataclasses import dataclass from pathlib import Path -from typing import ClassVar, Literal, get_args +from typing import ClassVar, Literal import pytest @@ -92,13 +92,20 @@ def test_stub_adapter_satisfies_protocol(): def test_adapters_registered_for_every_tool_source_type(): - """Every value in ``ToolSourceConfig.type``'s Literal must have a - matching adapter in ``REGISTRY``. Catches drift if a new source - type lands without registration.""" + """Every built-in source type allowed in ``tool_sources[].type`` + must have a matching adapter in ``REGISTRY``. Catches drift if a + new source type lands without registration. + + v0.20 PR #111 review fix: ``ToolSourceConfig.type`` is now ``str`` + (relaxed from Literal so third-party adapters can register custom + types). The curated set of built-ins lives in + ``BUILTIN_TOOL_SOURCE_TYPES`` — this test iterates that. + """ + + from agents_shipgate.schemas.manifest import BUILTIN_TOOL_SOURCE_TYPES - source_types = get_args(ToolSourceConfig.model_fields["type"].annotation) - assert source_types, "expected ToolSourceConfig.type to be a Literal" - for source_type in source_types: + assert BUILTIN_TOOL_SOURCE_TYPES, "expected at least one built-in source type" + for source_type in BUILTIN_TOOL_SOURCE_TYPES: assert source_type in REGISTRY, ( f"no adapter registered for tool source type {source_type!r}" ) diff --git a/tests/test_n8n.py b/tests/test_n8n.py index 90b05835..5c1fdcff 100644 --- a/tests/test_n8n.py +++ b/tests/test_n8n.py @@ -83,8 +83,18 @@ def test_n8n_top_level_config_is_accepted_and_tool_source_type_is_rejected(tmp_p """, encoding="utf-8", ) + # v0.20 PR #111 review fix P1 #3: ``ToolSourceConfig.type`` is open + # ``str`` so third-party adapters can declare custom source types. + # ``load_manifest`` no longer rejects ``type: n8N`` (a typo of the + # built-in ``n8n``); the dispatcher's ``REGISTRY.require`` raises + # ConfigError when the unknown type fails to resolve at scan time + # instead. The exit-2 contract is unchanged. with pytest.raises(ConfigError): - load_manifest(project / "invalid-case.yaml") + run_scan( + config_path=project / "invalid-case.yaml", + output_dir=tmp_path / "invalid-case-out", + ci_mode="advisory", + ) def test_n8n_malformed_optional_placement_is_rejected(tmp_path): diff --git a/tests/test_schema_boundaries.py b/tests/test_schema_boundaries.py index f9f557f8..0f070340 100644 --- a/tests/test_schema_boundaries.py +++ b/tests/test_schema_boundaries.py @@ -150,6 +150,7 @@ def test_representative_schema_payloads_keep_wire_fields() -> None: "generated_reports", "loaded_policy_packs", "loaded_plugins", + "loaded_adapters", "tool_inventory", "source_warnings", "agent_summary",