diff --git a/CHANGELOG.md b/CHANGELOG.md index 95fd9c3..d2d5f7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,74 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **BigQuery-table bundle mirror** in + `bigquery_agent_analytics.extractor_compilation.bq_bundle_mirror` + and + [`docs/extractor_compilation_bq_bundle_mirror.md`](docs/extractor_compilation_bq_bundle_mirror.md). + Issue [#75](https://github.com/GoogleCloudPlatform/BigQuery-Agent-Analytics-SDK/issues/75) + PR C2.c.3 — publishes compiled bundles to a BigQuery + table and syncs them back into a local directory for + C2.a's existing loader. Runtime path stays + ``sync_bundles_from_bq → discover_bundles → + from_bundles_root``; the mirror is a utility, not a + runtime loader. Public surface: + ``publish_bundles_to_bq(bundle_root, store, + bundle_fingerprint_allowlist=None)`` and + ``sync_bundles_from_bq(store, dest_dir, + bundle_fingerprint_allowlist=None)``. Both call + :func:`load_bundle` as a gate — publish refuses bundles + that wouldn't load at the runtime; sync refuses bundles + whose reconstruction the loader rejects, scrubbing any + partial directory it wrote. Sync writes each + fingerprint to a side-by-side **staging directory** and + runs ``load_bundle`` on the staged copy before performing + a **staged replace** of the target (the rmtree+move pair + is not strictly atomic — a crash between the two leaves + the bundle absent on disk, recoverable by re-sync — but + the load-bundle-failure direction *is* atomic, so a bad + mirror row never destroys a previously-good local + bundle). + Strict bundle-shape check: the table stores exactly two + rows per fingerprint (``manifest.json`` + the manifest's + ``module_filename``); ``unexpected_file`` codes reject + anything else. The manifest's own ``module_filename`` is + shape-checked at sync (bare filename — no separators, no + ``..``, no NUL); a path-separator value surfaces as + ``manifest_row_unreadable`` instead of raising + ``FileNotFoundError`` at the write step. + ``invalid_bundle_path`` rejects traversal / absolute / + backslash / NUL paths before writing to disk. + ``duplicate_row`` rejects two rows sharing the same + ``(fingerprint, bundle_path)`` (BigQuery has no unique + constraint; the mirror enforces uniqueness at sync). + ``duplicate_fingerprint`` rejects publish-side cases + where two subdirs of ``bundle_root`` claim the same + manifest fingerprint — neither is published, so the + table can't end up with logical duplicates. + ``malformed_row`` rejects rows with wrong field types. + Idempotent republish via DELETE+INSERT in + ``BigQueryBundleStore.publish_rows`` — + re-publishing the same fingerprint replaces the prior + rows rather than accumulating duplicates. The DELETE + + ``insert_rows_json`` are NOT a single atomic + transaction; a transient INSERT failure leaves rows + missing until the caller re-runs publish (recoverable; + documented in the class docstring). + ``publish_rows`` also raises ``ValueError`` on duplicate + ``(fingerprint, bundle_path)`` input pairs as defense in + depth. + ``BundleStore`` is a Protocol so tests can pass in-memory + fakes; ``BigQueryBundleStore`` is the concrete + implementation wrapping ``google.cloud.bigquery``. + ``BUNDLE_MIRROR_TABLE_SCHEMA`` is exported for callers + who need to create the table themselves (or + ``BigQueryBundleStore.ensure_table()`` does it + idempotently). Failure codes are stable strings; + per-bundle problems land in ``failures`` instead of + raising. Store exceptions (BQ-side: network, auth, table + missing) propagate. Out of scope: GCS-backed signed-URL + fetch, caching / TTL, garbage collection, multi-region + replication. - **Revalidation harness for compiled structured extractors** in `bigquery_agent_analytics.extractor_compilation.revalidation` diff --git a/docs/README.md b/docs/README.md index 40e3029..7887a04 100644 --- a/docs/README.md +++ b/docs/README.md @@ -51,6 +51,7 @@ architecture, rationale, and implementation plans behind key SDK features. | [extractor_compilation_runtime_fallback.md](extractor_compilation_runtime_fallback.md) | Runtime fallback wiring for compiled structured extractors (issue #75 PR C2.b): `run_with_fallback(...)` returning `FallbackOutcome` (`decision` is one of `compiled_unchanged` / `compiled_filtered` / `fallback_for_event`). Validates compiled output via #76; on per-element failures drops just the offending nodes / edges (with orphan cleanup) AND downgrades the event's span from `fully_handled` to `partially_handled` so the AI transcript still sees the source span. EVENT-scope, exception, wrong-type, and unpinpointable failures all trigger whole-event fallback. Does not validate fallback output; fallback exceptions propagate. Orchestrator call-site swap is C2.c. | | [extractor_compilation_runtime_registry.md](extractor_compilation_runtime_registry.md) | Runtime extractor-registry adapter (issue #75 PR C2.c.1): `build_runtime_extractor_registry(...)` glues C2.a's `discover_bundles` + C2.b's `run_with_fallback` into one call, returning a `WrappedRegistry` with an `extractors` dict ready for `run_structured_extractors` plus `bundles_without_fallback` (compiled-only, skipped) and `fallbacks_without_bundle` (no usable compiled registry entry — "never built" *and* "rejected by discovery"; cross-reference `discovery.failures` for the reason). Compiled-only event_types are skipped and recorded (fail-closed); fallback-only event_types pass through unchanged. Non-callable fallbacks are rejected at build time with `TypeError` naming the event_type. The `on_outcome(event_type, outcome)` callback fires on every wrapped invocation (denominator metric); callback exceptions propagate. Out of scope: actual orchestrator call-site swap (C2.c.2), BQ mirror (C2.c.3), revalidation (C2.d). | | [extractor_compilation_orchestrator_swap.md](extractor_compilation_orchestrator_swap.md) | Orchestrator call-site swap (issue #75 PR C2.c.2): `OntologyGraphManager.from_bundles_root(...)` classmethod that builds the runtime registry internally and constructs a manager whose `extractors` dict is the wrapped registry, so existing `run_structured_extractors` calls inside `extract_graph` pick up compiled-with-fallback behavior with no other code changes. Adds `manager.runtime_registry: WrappedRegistry | None` audit handle (non-None when bundle-wired). Mirrors `from_ontology_binding` arg shape; existing `__init__` and `from_ontology_binding` paths are unchanged. Compiled-only event_types without a matching fallback are NOT registered (fail-closed). Out of scope: BQ mirror (C2.c.3), revalidation (C2.d). | +| [extractor_compilation_bq_bundle_mirror.md](extractor_compilation_bq_bundle_mirror.md) | BigQuery-table bundle mirror (issue #75 PR C2.c.3): `publish_bundles_to_bq(bundle_root, store, ...)` + `sync_bundles_from_bq(store, dest_dir, ...)`. Mirror is a publish/sync utility, NOT a runtime loader — the runtime path stays `sync_bundles_from_bq → discover_bundles → from_bundles_root`. Both functions call `load_bundle` as a gate: publish refuses bundles that wouldn't load at the runtime; sync writes to a side-by-side **staging directory** and `load_bundle`-validates the staged copy before performing a **staged replace** of the target (the rmtree+move pair is not strictly atomic — a crash between the two leaves the bundle absent on disk and is recoverable by re-sync — but the load-bundle-failure direction *is* atomic, so a bad mirror row never destroys a previously-good local bundle). Strict bundle-shape (exactly `manifest.json` + the manifest's `module_filename`) plus shape-check on the manifest's `module_filename` (bare filename only — no separators, no `..`, no NUL; otherwise `manifest_row_unreadable`). Path-safety rejects traversal / absolute / backslash / NUL. `duplicate_fingerprint` rejects publish-side cases where two subdirs claim the same fingerprint (neither published). `duplicate_row` rejects two rows sharing the same `(fingerprint, bundle_path)` at sync. `malformed_row` shape check. Idempotent republish via DELETE+INSERT in `BigQueryBundleStore.publish_rows` (NOT a single atomic transaction; a transient INSERT failure is recoverable by re-running publish). `publish_rows` raises `ValueError` on duplicate input pairs as defense in depth. `BundleStore` Protocol for testability; `BigQueryBundleStore` is the concrete impl. Stable `MirrorFailure` codes; per-bundle problems accumulate, store exceptions propagate. Out of scope: GCS signed URLs, caching, garbage collection, multi-region. | | [extractor_compilation_revalidation.md](extractor_compilation_revalidation.md) | Revalidation harness (issue #75 PR C2.d): `revalidate_compiled_extractors(events, compiled_extractors, reference_extractors, resolved_graph, ...)` drives `run_with_fallback` (with a no-op fallback) over a batch of events AND calls the reference extractor directly, aggregating outcomes into a `RevalidationReport` with **two orthogonal dimensions**: runtime decision (`compiled_unchanged` / `compiled_filtered` / `fallback_for_event`, plus `compiled_path_faults` split out so bundle bugs are distinguishable from ontology drift) and agreement against reference (`parity_match` / `parity_divergence` / `parity_not_checked`). Parity uses three comparators: `_compare_nodes` and `_compare_span_handling` from `measurement.py` plus `_compare_edges` in `revalidation.py` (same edge_id set with matching relationship_name / endpoints / property-set per shared edge; duplicate edge_ids on either side reported as a divergence rather than silently collapsed by dict keying). The parity dimension catches **schema-valid but semantically wrong** outputs the schema-only check would miss. **Every failure mode on the reference side becomes a parity divergence, never a batch abort**: exceptions, non-`StructuredExtractionResult` returns (including `None`), and comparator crashes all funnel into the divergence channel with a descriptive string. `check_thresholds(report, RevalidationThresholds(...))` evaluates policy gates; threshold rates are validated to `[0, 1]` at construction so a typo like `=5` (intended as 5%) fails loud. JSON-serializable for persistence; deterministic. Out of scope: scheduled orchestration, BQ persistence, CLI, sampling strategy. | ## Deployment Surfaces diff --git a/docs/extractor_compilation_bq_bundle_mirror.md b/docs/extractor_compilation_bq_bundle_mirror.md new file mode 100644 index 0000000..f3f58c0 --- /dev/null +++ b/docs/extractor_compilation_bq_bundle_mirror.md @@ -0,0 +1,177 @@ +# Compiled Structured Extractors — BigQuery Bundle Mirror (PR C2.c.3) + +**Status:** Implemented (PR C2.c.3 of issue #75 Phase C / Milestone C2) +**Parent epic:** [issue #75](https://github.com/GoogleCloudPlatform/BigQuery-Agent-Analytics-SDK/issues/75) +**Builds on:** [`extractor_compilation_bundle_loader.md`](extractor_compilation_bundle_loader.md) (PR C2.a), [`extractor_compilation_orchestrator_swap.md`](extractor_compilation_orchestrator_swap.md) (PR C2.c.2) +**Working plan:** issue #96, Milestone C2 / PR C2.c.3 + +--- + +## What this is + +Compiled bundles live on the filesystem and are loaded by `load_bundle` / `discover_bundles` (C2.a). This module adds a **publish/sync utility** so bundles can flow between processes via a BigQuery table — useful for Cloud Run, Cloud Functions, ephemeral CI workers, or any environment where the filesystem isn't shared. + +**The mirror is a utility, not a runtime loader.** The runtime path stays unchanged: + +``` +sync_bundles_from_bq → discover_bundles → from_bundles_root +``` + +Sync writes verified files to a local directory and lets C2.a's existing loader do the actual import. There is no "fetch-direct-from-BQ" loader — that would double the trust surface and diverge from the loader's audit fields. + +## Public API + +```python +from bigquery_agent_analytics.extractor_compilation import ( + publish_bundles_to_bq, + sync_bundles_from_bq, + BigQueryBundleStore, + BUNDLE_MIRROR_TABLE_SCHEMA, + PublishResult, + SyncResult, + MirrorFailure, + BundleRow, + BundleStore, +) +from google.cloud import bigquery + +# 1. Stand up the store (creates the table if missing). +client = bigquery.Client(project="my-project", location="US") +store = BigQueryBundleStore( + bq_client=client, + table_id="my-project.my_dataset.compiled_bundles", +) +store.ensure_table() + +# 2. Publish local bundles to BigQuery. +publish: PublishResult = publish_bundles_to_bq( + bundle_root=pathlib.Path("/var/bqaa/bundles"), + store=store, + bundle_fingerprint_allowlist=None, # or a list of fingerprints +) + +# 3. Elsewhere (different process / VM / Cloud Run instance): +sync: SyncResult = sync_bundles_from_bq( + store=store, + dest_dir=pathlib.Path("/tmp/synced-bundles"), + bundle_fingerprint_allowlist=None, +) + +# 4. Wire the synced dir into the runtime via C2.a. +from bigquery_agent_analytics.ontology_graph import OntologyGraphManager +manager = OntologyGraphManager.from_bundles_root( + project_id="my-project", + dataset_id="my_dataset", + ontology=ontology, + binding=binding, + bundles_root=sync.dest_dir, + expected_fingerprint=fingerprint, + fallback_extractors=fallback_extractors, +) +``` + +## Per-bundle flow + +**Publish:** + +1. Walk `bundle_root`; for each subdirectory, read `manifest.json` and parse via `Manifest.from_json`. +2. Skip if `bundle_fingerprint_allowlist` is set and the manifest's fingerprint isn't in it (lands in `skipped_fingerprints`). +3. Run `load_bundle(child, expected_fingerprint=manifest.fingerprint)` as a **pre-publish validation gate**. Bundles that wouldn't load at runtime are NOT published; the mirror only distributes working bundles. Failures land in `failures` with code `bundle_load_failed` and the underlying loader code in `detail`. +4. Emit two `BundleRow`s per bundle (manifest + module file) with denormalized `event_types` / `module_filename` / `function_name` for query-side filtering. +5. Call `store.publish_rows(rows)` once for the whole batch. `BigQueryBundleStore` issues `DELETE FROM ... WHERE (fingerprint, path) IN (...)` then `INSERT`, so re-publishing the same fingerprint replaces the prior rows rather than accumulating. + +**Sync:** + +1. Fetch rows via `store.fetch_rows(bundle_fingerprints=allowlist)`. +2. Shape-check each row (`malformed_row` if wrong types). +3. Group by fingerprint; per fingerprint: + - Reject if any row's `bundle_path` is unsafe (`invalid_bundle_path`: traversal, absolute path, NUL, backslash). + - Reject if any `(fingerprint, bundle_path)` pair appears twice (`duplicate_row`). + - Require the `manifest.json` row (`manifest_row_missing`) and parse it (`manifest_row_unreadable`). + - Require exactly the manifest's two files (`unexpected_file` if any extra; `module_row_missing` if the module row is absent). + - Write the two files into `dest_dir//`. + - Run `load_bundle(dest_dir/, expected_fingerprint=fp)` as a **post-sync validation gate**. Tampered or incomplete bundles fail at sync (`bundle_load_failed`) and the partial directory is scrubbed. +4. Fingerprints in the allowlist that have no rows surface as `fingerprint_not_in_table` failures — the operator knows the publish lag hasn't caught up. + +## BQ table schema + +`BUNDLE_MIRROR_TABLE_SCHEMA` (tuples of `(name, type, mode)`): + +``` +bundle_fingerprint STRING REQUIRED +bundle_path STRING REQUIRED -- "manifest.json" or the manifest's module_filename +file_content BYTES REQUIRED +event_types STRING REPEATED -- denorm from manifest, for query-side filter +module_filename STRING NULLABLE -- denorm from manifest +function_name STRING NULLABLE -- denorm from manifest +published_at TIMESTAMP REQUIRED +``` + +**Logical primary key**: `(bundle_fingerprint, bundle_path)`. BigQuery doesn't enforce uniqueness; `BigQueryBundleStore.publish_rows` enforces it via DELETE+INSERT, and sync rejects duplicates fail-closed. + +The denormalized fields exist for query convenience (`SELECT DISTINCT bundle_fingerprint FROM mirror WHERE 'bka_decision' IN UNNEST(event_types)`). They are NOT the source of truth at sync time — sync re-parses `manifest.json` from the row content. The denorm is for query speed; correctness comes from re-validating against `load_bundle`. + +## Stable failure codes + +Callers can switch on `MirrorFailure.code`: + +Publish-side: +- `bundle_root_missing` — `bundle_root` is not a directory. +- `manifest_missing` — bundle subdir has no `manifest.json`. +- `manifest_unreadable` — manifest fails to parse or has wrong shape. +- `bundle_load_failed` — bundle wouldn't load via `load_bundle` pre-publish. `detail` carries the underlying loader code. +- `duplicate_fingerprint` — two or more subdirs of `bundle_root` declare the same manifest fingerprint. The mirror is keyed on `(bundle_fingerprint, bundle_path)`; publishing both would land contents-of-the-loser in the table and corrupt the bundle identity. Fail-closed: every participating subdir gets a failure record and no rows are emitted for that fingerprint. + +Sync-side: +- `fingerprint_not_in_table` — allowlist named a fingerprint with no rows. +- `manifest_row_missing` — bundle has rows but no `manifest.json` row. +- `manifest_row_unreadable` — manifest row content isn't a valid `Manifest`. Also fires when the parsed manifest's shape would let a path-escape or write failure slip past `_validate_bundle_path` (`module_filename` containing a path separator, NUL, `.`/`..`, or non-string fields). +- `invalid_bundle_path` — traversal / absolute / NUL / backslash. Offender is never written to disk. +- `unexpected_file` — row whose `bundle_path` isn't `manifest.json` nor the manifest's `module_filename`. Bundles are exactly two files; anything extra is rejected. +- `module_row_missing` — manifest is fine but no row for the module file. +- `duplicate_row` — two rows share the same `(fingerprint, bundle_path)`. +- `malformed_row` — row fields have wrong types (e.g. `file_content` not bytes) **or** the `bundle_fingerprint` isn't a strict 64-char lowercase sha256 hex string. The fingerprint check is load-bearing: sync uses the fingerprint as a directory name (`dest_dir//`), so a tampered value like `"../escape"` would otherwise write outside `dest_dir`. +- `bundle_load_failed` — sync wrote files to a *staging* directory but `load_bundle` rejected the reconstruction. The staging directory is removed and any pre-existing `dest_dir//` is left intact — a bad mirror row never destroys good local state. + +Neither `publish_bundles_to_bq` nor `sync_bundles_from_bq` raises on per-bundle problems; failures accumulate. **Store exceptions** (BQ-side: network, auth, table missing) DO propagate — that's the right boundary for "fix the connection and retry." + +## Staged replace during sync + +Sync writes each fingerprint's two files to a side-by-side **staging directory** (`dest_dir/.staging--/`) and runs `load_bundle` on the staged copy **before touching the target**. Only after `load_bundle` accepts the staged reconstruction does sync `rmtree(dest_dir/)` and `shutil.move(staging, target)`. A corrupt mirror row therefore cannot destroy a previously-good local bundle — the load-bundle gate is the safety boundary. + +The replace itself is **staged, not strictly atomic.** Between `rmtree(target)` and `move(staging, target)` there is a brief window where the target is absent; a process crash inside that window leaves the bundle missing on disk (a re-sync recovers it). The load-bundle-failure case — the one the staged flow is designed to protect — is correctly atomic in the failure direction: load-bundle failure leaves the target untouched. Locked by `test_sync_failure_preserves_existing_good_bundle`. + +## Idempotency + non-atomic publish + +`BigQueryBundleStore.publish_rows` upserts by `(bundle_fingerprint, bundle_path)`. Re-publishing the same bundle replaces the prior rows rather than duplicating them — verified by `test_republishing_same_bundle_does_not_accumulate_rows`. + +**Important caveat:** the DELETE + `insert_rows_json` upsert is **not a single atomic transaction**. If INSERT fails after DELETE (network, quota, schema drift), rows for the affected `(fingerprint, bundle_path)` pairs are *missing* from the table until the caller re-runs publish. The mirror is publish-side idempotent, so the recovery is to call `publish_bundles_to_bq` again — but operators should be aware that a transient INSERT failure leaves a recoverable, not silent, gap. A staging-table-plus-MERGE flow would close this gap and is deliberately deferred. + +Cross-subdir duplicate fingerprints (two bundles claiming the same fingerprint) are caught **before any DELETE runs** via the `duplicate_fingerprint` publish-side check. `BigQueryBundleStore.publish_rows` also raises `ValueError` on duplicate `(fingerprint, path)` input pairs as defense in depth for direct callers of the store. + +## Tests + +CI suite — `tests/test_extractor_compilation_bq_bundle_mirror.py`, 24 cases using an in-memory `BundleStore` substitute: + +- **`TestRoundTrip`** (2) — publish a local bundle, sync it back, verify `load_bundle` accepts the reconstruction. Plus a multi-bundle variant. +- **`TestAllowlist`** (3) — publish-side allowlist skips unlisted; sync-side allowlist skips unlisted; sync-side allowlist names a fingerprint with no rows → `fingerprint_not_in_table` failure. +- **`TestPathSafety`** (3) — `../escape.py`, `/etc/passwd`, `..\windows-style-escape.py` all rejected with `invalid_bundle_path`; no file written outside `dest_dir`. +- **`TestMissingAndMalformedRows`** (5) — missing manifest row, malformed manifest content, unexpected extra file, wrong field type, duplicate rows. +- **`TestIdempotentRepublish`** (1) — two consecutive publishes of the same bundle leave exactly two rows in the store, not four. +- **`TestPublishFailures`** (4) — subdir without manifest; bundle that would fail `load_bundle` pre-publish; missing `bundle_root`; two subdirs declaring the same fingerprint → `duplicate_fingerprint`, neither published. +- **`TestRoundTwoFindings`** (6) — manifest row with `module_filename` containing a path separator → `manifest_row_unreadable` (no `FileNotFoundError`); existing good local bundle preserved across a corrupt re-sync (staging-then-validate); `BigQueryBundleStore.publish_rows` raises `ValueError` on duplicate input pairs without running DELETE or INSERT; tampered `bundle_fingerprint="../escape"` rejected as `malformed_row` before any path is computed (no write outside `dest_dir`); tampered manifest `fingerprint="../escape"` rejected at publish-side; `BigQueryBundleStore.__init__` raises `ValueError` on malformed `table_id` (backtick, semicolon, whitespace, wrong dot count, empty segment, `--` comment marker, trailing newline) so injection can't reach the SQL. + +Live BQ suite — `tests/test_extractor_compilation_bq_bundle_mirror_live.py`, 1 case behind `BQAA_RUN_LIVE_TESTS=1` + `BQAA_RUN_LIVE_BQ_MIRROR_TESTS=1` + `PROJECT_ID` + `DATASET_ID`. Creates a temporary table, runs the publish+sync round-trip, asserts `load_bundle` accepts the reconstruction, deletes the table on the way out. + +## Out of scope (deferred) + +- **GCS-backed signed-URL fetch** for very large bundles. Bundles are tiny today (a few KB); a streaming path can land later if real bundles grow. +- **Caching / TTL** of synced bundles. Sync overwrites; the caller decides how often to sync. +- **Garbage collection** of stale fingerprints. The mirror's job is publish + fetch; lifecycle policy lives upstream. +- **Multi-region replication.** The mirror table is created in one BQ location. + +## Related + +- [`extractor_compilation_bundle_loader.md`](extractor_compilation_bundle_loader.md) — `load_bundle` / `discover_bundles` (C2.a). The mirror calls `load_bundle` as both a pre-publish gate and a post-sync gate, so the loader is the single source of truth for "is this bundle usable?" +- [`extractor_compilation_orchestrator_swap.md`](extractor_compilation_orchestrator_swap.md) — `OntologyGraphManager.from_bundles_root` (C2.c.2). Once sync lands bundles on disk, this is the entry point that wires them into the runtime. +- [`extractor_compilation_runtime_registry.md`](extractor_compilation_runtime_registry.md) — `build_runtime_extractor_registry` (C2.c.1). The registry adapter that `from_bundles_root` builds internally. diff --git a/src/bigquery_agent_analytics/__init__.py b/src/bigquery_agent_analytics/__init__.py index 740191f..e16ceab 100644 --- a/src/bigquery_agent_analytics/__init__.py +++ b/src/bigquery_agent_analytics/__init__.py @@ -590,6 +590,7 @@ from .extractor_compilation import AstFailure from .extractor_compilation import AstReport from .extractor_compilation import AttemptRecord + from .extractor_compilation import BigQueryBundleStore from .extractor_compilation import build_ast_diagnostic from .extractor_compilation import build_compile_result_diagnostic from .extractor_compilation import build_gate_diagnostic @@ -598,6 +599,9 @@ from .extractor_compilation import build_retry_prompt from .extractor_compilation import build_runtime_extractor_registry from .extractor_compilation import build_smoke_diagnostic + from .extractor_compilation import BUNDLE_MIRROR_TABLE_SCHEMA + from .extractor_compilation import BundleRow + from .extractor_compilation import BundleStore from .extractor_compilation import check_thresholds from .extractor_compilation import compile_extractor from .extractor_compilation import compile_with_llm @@ -616,10 +620,13 @@ from .extractor_compilation import LoadFailure from .extractor_compilation import Manifest from .extractor_compilation import measure_compile + from .extractor_compilation import MirrorFailure from .extractor_compilation import OutcomeCallback from .extractor_compilation import parse_resolved_extractor_plan_json from .extractor_compilation import PlanParseError from .extractor_compilation import PlanResolver + from .extractor_compilation import publish_bundles_to_bq + from .extractor_compilation import PublishResult from .extractor_compilation import render_extractor_source from .extractor_compilation import RESOLVED_EXTRACTOR_PLAN_JSON_SCHEMA from .extractor_compilation import ResolvedExtractorPlan @@ -631,6 +638,8 @@ from .extractor_compilation import run_with_fallback from .extractor_compilation import SmokeTestReport from .extractor_compilation import SpanHandlingRule + from .extractor_compilation import sync_bundles_from_bq + from .extractor_compilation import SyncResult from .extractor_compilation import ThresholdCheckResult from .extractor_compilation import validate_source from .extractor_compilation import WrappedRegistry @@ -640,6 +649,10 @@ "AstFailure", "AstReport", "AttemptRecord", + "BUNDLE_MIRROR_TABLE_SCHEMA", + "BigQueryBundleStore", + "BundleRow", + "BundleStore", "CompileMeasurement", "CompileResult", "CompileSource", @@ -649,7 +662,10 @@ "FieldMapping", "LoadFailure", "LoadedBundle", + "MirrorFailure", "OutcomeCallback", + "PublishResult", + "SyncResult", "WrappedRegistry", "LLMClient", "Manifest", @@ -678,10 +694,12 @@ "load_bundle", "measure_compile", "parse_resolved_extractor_plan_json", + "publish_bundles_to_bq", "build_runtime_extractor_registry", "render_extractor_source", "revalidate_compiled_extractors", "run_smoke_test", + "sync_bundles_from_bq", "run_with_fallback", "validate_source", ] diff --git a/src/bigquery_agent_analytics/extractor_compilation/__init__.py b/src/bigquery_agent_analytics/extractor_compilation/__init__.py index ad9530d..34501cb 100644 --- a/src/bigquery_agent_analytics/extractor_compilation/__init__.py +++ b/src/bigquery_agent_analytics/extractor_compilation/__init__.py @@ -42,6 +42,15 @@ from .ast_validator import AstFailure from .ast_validator import AstReport from .ast_validator import validate_source +from .bq_bundle_mirror import BigQueryBundleStore +from .bq_bundle_mirror import BUNDLE_MIRROR_TABLE_SCHEMA +from .bq_bundle_mirror import BundleRow +from .bq_bundle_mirror import BundleStore +from .bq_bundle_mirror import MirrorFailure +from .bq_bundle_mirror import publish_bundles_to_bq +from .bq_bundle_mirror import PublishResult +from .bq_bundle_mirror import sync_bundles_from_bq +from .bq_bundle_mirror import SyncResult from .bundle_loader import discover_bundles from .bundle_loader import DiscoveryResult from .bundle_loader import load_bundle @@ -96,6 +105,10 @@ "AstFailure", "AstReport", "AttemptRecord", + "BUNDLE_MIRROR_TABLE_SCHEMA", + "BigQueryBundleStore", + "BundleRow", + "BundleStore", "CompileMeasurement", "CompileResult", "CompileSource", @@ -108,9 +121,11 @@ "FieldMapping", "LLMClient", "Manifest", + "MirrorFailure", "OutcomeCallback", "PlanParseError", "PlanResolver", + "PublishResult", "RESOLVED_EXTRACTOR_PLAN_JSON_SCHEMA", "ResolvedExtractorPlan", "RetryCompileResult", @@ -118,6 +133,7 @@ "RevalidationThresholds", "SmokeTestReport", "SpanHandlingRule", + "SyncResult", "ThresholdCheckResult", "WrappedRegistry", "build_ast_diagnostic", @@ -139,8 +155,10 @@ "load_callable_from_source", "now_iso_utc", "parse_resolved_extractor_plan_json", + "publish_bundles_to_bq", "render_extractor_source", "revalidate_compiled_extractors", + "sync_bundles_from_bq", "run_smoke_test", "run_smoke_test_in_subprocess", "run_with_fallback", diff --git a/src/bigquery_agent_analytics/extractor_compilation/bq_bundle_mirror.py b/src/bigquery_agent_analytics/extractor_compilation/bq_bundle_mirror.py new file mode 100644 index 0000000..5db8e30 --- /dev/null +++ b/src/bigquery_agent_analytics/extractor_compilation/bq_bundle_mirror.py @@ -0,0 +1,1177 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""BigQuery-table mirror for compiled extractor bundles +(issue #75 Milestone C2.c.3). + +Compiled bundles live on the filesystem and are loaded by +:func:`load_bundle` / :func:`discover_bundles` (C2.a). This +module adds a **publish/sync utility** so bundles can flow +between processes via a BigQuery table — useful for Cloud +Run, Cloud Functions, ephemeral CI workers, or any environment +where the filesystem isn't shared. + +**The mirror is a utility, not a runtime loader.** The runtime +path stays unchanged: ``sync_bundles_from_bq → discover_bundles +→ from_bundles_root``. Sync writes verified files to a +local directory and lets C2.a's existing loader do the actual +import. There is no "fetch-direct-from-BQ" loader — that +would double the trust surface and diverge from the loader's +audit fields. + +Public surface: + +* :func:`publish_bundles_to_bq` — walk a local bundle root, + validate each candidate via :func:`load_bundle`, and push + the constituent files as rows. +* :func:`sync_bundles_from_bq` — read rows for the requested + fingerprints, write files into ``dest_dir//``, + and **call :func:`load_bundle` on the reconstructed bundle + before the sync is considered successful**. Tampered or + incomplete rows fail loud here, not at runtime. +* :class:`BundleStore` — Protocol the two functions consume. + Concrete :class:`BigQueryBundleStore` wraps a + ``google.cloud.bigquery.Client``; tests can pass any + Protocol-shaped object (e.g. an in-memory fake). + +Stable :class:`MirrorFailure` codes — callers can switch on +them: + +Publish-side: + +* ``manifest_missing`` — bundle subdir has no + ``manifest.json``. +* ``manifest_unreadable`` — manifest fails to parse or has + wrong shape. +* ``bundle_load_failed`` — the bundle would not load via + :func:`load_bundle` *before* publishing. ``detail`` carries + the underlying loader code so we don't publish bundles the + runtime would reject. +* ``duplicate_fingerprint`` — two or more subdirs of + ``bundle_root`` declare the same manifest fingerprint. The + mirror is keyed on ``(bundle_fingerprint, bundle_path)``; + publishing both would land contents-of-the-loser in the + table and corrupt the bundle identity. Fail-closed: every + participating subdir gets a failure record and *no* rows + are emitted for that fingerprint. + +Sync-side: + +* ``fingerprint_not_in_table`` — caller's allowlist named a + fingerprint that has no rows. +* ``manifest_row_missing`` — bundle has rows but no row with + ``bundle_path="manifest.json"``. +* ``manifest_row_unreadable`` — the manifest row's content + isn't a valid :class:`Manifest`. Also fires when the + parsed manifest's shape would let a path-escape or write + failure slip past :func:`_validate_bundle_path` (e.g. + ``module_filename`` containing a path separator). +* ``invalid_bundle_path`` — a row's ``bundle_path`` traverses + out of the bundle directory, is absolute, or contains + forbidden characters. Sync is fail-closed here: the offender + is never written to disk. +* ``unexpected_file`` — a row exists for the bundle whose + ``bundle_path`` isn't ``manifest.json`` nor the manifest's + declared ``module_filename``. Bundles are exactly two files; + anything extra is rejected rather than written. +* ``module_row_missing`` — manifest is fine but there's no + row for the module file. +* ``duplicate_row`` — two rows share the same + ``(bundle_fingerprint, bundle_path)``. The table has no + unique constraint; the mirror enforces it at sync time. +* ``malformed_row`` — row fields have wrong types (e.g. + ``file_content`` not bytes, ``event_types`` not a list of + strings). +* ``bundle_load_failed`` — sync wrote files but + :func:`load_bundle` rejected the reconstructed bundle. The + partial directory is removed so the caller doesn't keep + half-synced bundles around. + +Neither :func:`publish_bundles_to_bq` nor +:func:`sync_bundles_from_bq` raises on per-bundle problems; +all failures land in the result's ``failures`` tuple. Store +exceptions (BQ-side: network, auth, etc.) DO propagate — that +is the right boundary for "fix the connection and retry." + +Out of scope (deferred): + +* **GCS-backed signed-URL fetch** for very large bundles. The + mirror stores ``BYTES`` directly; bundles are tiny today (a + few KB) and a streaming path can land later if real bundles + grow. +* **Caching / TTL of synced bundles.** Sync overwrites; the + caller decides how often to sync. +* **Garbage collection** of stale fingerprints. The mirror's + job is publish + fetch; lifecycle policy lives upstream. +* **Multi-region replication.** The mirror table is created + in one BQ location. +""" + +from __future__ import annotations + +import collections +import dataclasses +import datetime +import pathlib +import re +import shutil +from typing import Any, Iterable, Iterator, Optional, Protocol +import uuid + +from .bundle_loader import load_bundle +from .bundle_loader import LoadFailure +from .manifest import Manifest + +# Public — re-exported in __init__.py. +__all__ = [ + "BUNDLE_MIRROR_TABLE_SCHEMA", + "BigQueryBundleStore", + "BundleRow", + "BundleStore", + "MirrorFailure", + "PublishResult", + "SyncResult", + "publish_bundles_to_bq", + "sync_bundles_from_bq", +] + +# Stable schema for the mirror table. Tuples of +# ``(name, type, mode)`` so a caller can construct a +# ``bigquery.SchemaField`` list without importing this module's +# row type. Keep this in lockstep with :class:`BundleRow` — +# any field added here must also be populated in publish and +# read back in :meth:`BigQueryBundleStore.fetch_rows`. +BUNDLE_MIRROR_TABLE_SCHEMA: tuple[tuple[str, str, str], ...] = ( + ("bundle_fingerprint", "STRING", "REQUIRED"), + ("bundle_path", "STRING", "REQUIRED"), + ("file_content", "BYTES", "REQUIRED"), + # Denormalized from the bundle's manifest. Lets a caller + # filter by event_type without an unnest+join. Source of + # truth at sync time is still manifest.json's parsed + # content, not this column. + ("event_types", "STRING", "REPEATED"), + # Also denormalized; nullable so the schema stays stable + # even if a future bundle layout omits one. + ("module_filename", "STRING", "NULLABLE"), + ("function_name", "STRING", "NULLABLE"), + ("published_at", "TIMESTAMP", "REQUIRED"), +) + +# The two file names that constitute a bundle, per C2.a's +# loader contract. ``manifest.json`` is fixed; the module +# filename is read from the manifest itself, so the only +# *constant* allowed path is the manifest. +_MANIFEST_FILENAME = "manifest.json" + +# Bundle fingerprints are sha256 hex digests — strict 64-char +# lowercase hex. Used as directory names by sync +# (``dest_dir//``) and as path components in the +# staging dir, so any value that isn't a pure hex digest could +# trivially escape the destination (e.g. ``"../escape"``). +# Enforced on every row at sync-side ``_check_row_shape`` and +# on every manifest at both publish and sync, so a tampered +# value never reaches the directory-name computation. +_FINGERPRINT_PATTERN = re.compile(r"^[0-9a-f]{64}$") + +# BigQuery table identifiers go into backtick-quoted SQL. The +# constructor accepts ``project.dataset.table``; this pattern +# rejects anything that could break out of the quoted identifier +# or smuggle SQL through. Conservative ASCII-only set; BQ does +# allow broader characters but a mirror table is operator-named +# and there's no reason to permit anything exotic. +# +# * Each segment: letters / digits / ``_`` / ``-``. Project IDs +# can contain ``-``; dataset and table names cannot per BQ +# docs, but allowing ``-`` here keeps the check simple and +# BigQuery itself will reject an invalid dataset name later. +# * Exactly two dots; three non-empty segments. +_TABLE_ID_PATTERN = re.compile( + r"^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$" +) + + +# ------------------------------------------------------------------ # +# Data types # +# ------------------------------------------------------------------ # + + +@dataclasses.dataclass(frozen=True) +class BundleRow: + """One row of the mirror table. + + One row per file inside a bundle; a bundle is two files + (``manifest.json`` + the module file the manifest names), + so each bundle is two rows. + """ + + bundle_fingerprint: str + bundle_path: str + file_content: bytes + # Denormalized audit fields read from the bundle's manifest + # at publish time. Useful for query-side filtering and for + # debugging mismatches between the table view and the + # eventual on-disk reconstruction. + event_types: tuple[str, ...] + module_filename: Optional[str] + function_name: Optional[str] + published_at: str + + +@dataclasses.dataclass(frozen=True) +class MirrorFailure: + """Stable failure record produced by publish or sync. + + Never raised — both top-level functions accumulate these + in the result. The ``code`` is the switch-on key; ``detail`` + is a human-readable description. ``bundle_fingerprint`` and + ``bundle_path`` are populated when the failure can be + pinned to one bundle / row. + """ + + code: str + detail: str + bundle_fingerprint: Optional[str] = None + bundle_path: Optional[str] = None + + +@dataclasses.dataclass(frozen=True) +class PublishResult: + """Outcome of :func:`publish_bundles_to_bq`.""" + + published_fingerprints: tuple[str, ...] + # Fingerprints found under ``bundle_root`` but excluded by + # ``bundle_fingerprint_allowlist``. Not a failure; just + # surfaced for visibility. + skipped_fingerprints: tuple[str, ...] + failures: tuple[MirrorFailure, ...] + rows_written: int + + +@dataclasses.dataclass(frozen=True) +class SyncResult: + """Outcome of :func:`sync_bundles_from_bq`.""" + + synced_fingerprints: tuple[str, ...] + # Fingerprints either filtered out by the allowlist (when + # set and a fetched row didn't match) or skipped because + # nothing in the table matched. + skipped_fingerprints: tuple[str, ...] + failures: tuple[MirrorFailure, ...] + dest_dir: pathlib.Path + + +# ------------------------------------------------------------------ # +# BundleStore protocol + concrete BigQuery implementation # +# ------------------------------------------------------------------ # + + +class BundleStore(Protocol): + """Read/write boundary over the mirror table. + + ``publish_rows`` is upsert-by-``(bundle_fingerprint, + bundle_path)`` — re-publishing the same bundle overwrites + prior rows for that key pair, so the table stays clean + across compile rebuilds. ``fetch_rows`` filters by the + requested fingerprints; ``None`` means "every fingerprint." + + Store-level exceptions (network, auth, BQ table missing) + propagate. Per-row problems are the publish/sync layer's + concern, surfaced as :class:`MirrorFailure`. + """ + + def fetch_rows( + self, *, bundle_fingerprints: Optional[Iterable[str]] = None + ) -> Iterable[BundleRow]: + ... + + def publish_rows(self, rows: Iterable[BundleRow]) -> int: + ... + + +class BigQueryBundleStore: + """Concrete :class:`BundleStore` over + ``google.cloud.bigquery``. + + The table is created lazily by :meth:`ensure_table` (callers + may invoke it once at setup); ``publish_rows`` and + ``fetch_rows`` assume the table already exists with the + schema declared in :data:`BUNDLE_MIRROR_TABLE_SCHEMA`. + + Idempotency contract — important caveats: + + * ``publish_rows`` first DELETEs every + ``(bundle_fingerprint, bundle_path)`` pair it's about to + write, then calls ``insert_rows_json`` for the new + payload. The DELETE is scoped to the keys being written; + other fingerprints are untouched. Re-publishing the same + fingerprint replaces the prior copy. + * **The DELETE and INSERT are NOT a single atomic + transaction.** ``insert_rows_json`` is a streaming insert + that BigQuery does not enroll in a multi-statement + transaction with the DELETE query. If the DELETE + succeeds and the INSERT fails (network, quota, schema + drift), rows for the affected + ``(bundle_fingerprint, bundle_path)`` pairs will be + *missing* from the table until the caller re-runs + publish. The mirror is publish-side idempotent, so the + fix is to call :func:`publish_bundles_to_bq` again — but + operators should be aware that a transient INSERT failure + leaves a recoverable, not silent, gap. A + staging-table-plus-MERGE flow would close this gap and + is deliberately deferred (see module docstring). + * Duplicate ``(bundle_fingerprint, bundle_path)`` pairs + in the input raise ``ValueError`` *before* any DELETE + runs. BigQuery's ``insert_rows_json`` does not + deduplicate, so silently accepting duplicates would + leave the table with logical duplicates that sync later + rejects fail-closed. The publisher in + :func:`publish_bundles_to_bq` already guards against + cross-bundle duplicate fingerprints; this defensive + raise covers callers that build rows by hand. + """ + + def __init__( + self, + *, + bq_client: Any, + table_id: str, + ) -> None: + """``bq_client`` is a ``google.cloud.bigquery.Client`` or + test-compatible substitute (anything exposing ``query``, + ``insert_rows_json``, ``get_table``, ``create_table``). + ``table_id`` is ``project.dataset.table`` and is validated + at construction — only ASCII letters / digits / ``_`` / + ``-`` per segment, exactly three segments. Any value that + could break out of the backtick-quoted SQL identifier + (backtick, semicolon, whitespace, ``--``, ``/*``) + raises :class:`ValueError`.""" + if not isinstance(table_id, str): + raise ValueError( + f"table_id must be a string; got {type(table_id).__name__}" + ) + # ``fullmatch`` (not ``match``) because Python regex's ``$`` + # accepts a trailing newline by default — ``"proj.ds.tbl\n"`` + # would otherwise sneak past and reach the SQL. + if not _TABLE_ID_PATTERN.fullmatch(table_id): + raise ValueError( + f"table_id {table_id!r} is not a well-formed " + f"'project.dataset.table' identifier " + f"(allowed per segment: ASCII letters, digits, '_', " + f"'-'; exactly three segments)" + ) + self._bq_client = bq_client + self._table_id = table_id + + # -------------------------------------------------------- # + # Setup # + # -------------------------------------------------------- # + + def ensure_table(self) -> None: + """Create the mirror table if it doesn't already exist. + Idempotent. Schema mismatches against an existing table are + NOT auto-corrected — that would silently rewrite a table the + caller owns. If the schema diverges, an operator runs the + DDL fix by hand.""" + # Imports here so the module imports cleanly in environments + # that don't have google-cloud-bigquery installed (tests + # using the in-memory fake store). + from google.cloud import bigquery # type: ignore + + schema = [ + bigquery.SchemaField(name, type_, mode=mode) + for name, type_, mode in BUNDLE_MIRROR_TABLE_SCHEMA + ] + table = bigquery.Table(self._table_id, schema=schema) + self._bq_client.create_table(table, exists_ok=True) + + # -------------------------------------------------------- # + # BundleStore protocol # + # -------------------------------------------------------- # + + def fetch_rows( + self, *, bundle_fingerprints: Optional[Iterable[str]] = None + ) -> Iterator[BundleRow]: + """Read rows for the requested fingerprints, or every row + when ``bundle_fingerprints`` is ``None``.""" + fps = ( + None + if bundle_fingerprints is None + else sorted({fp for fp in bundle_fingerprints}) + ) + sql, params = self._select_sql(fps) + job = self._bq_client.query(sql, job_config=self._query_config(params)) + for row in job.result(): + yield BundleRow( + bundle_fingerprint=row["bundle_fingerprint"], + bundle_path=row["bundle_path"], + file_content=bytes(row["file_content"]), + event_types=tuple(row["event_types"] or ()), + module_filename=row["module_filename"], + function_name=row["function_name"], + published_at=str(row["published_at"]), + ) + + def publish_rows(self, rows: Iterable[BundleRow]) -> int: + """Upsert rows by ``(bundle_fingerprint, bundle_path)``. + Returns the count of rows actually written. + + Raises ``ValueError`` if the input contains duplicate + ``(bundle_fingerprint, bundle_path)`` pairs. See the + class docstring for the DELETE+INSERT non-atomicity + contract. + """ + rows_list = list(rows) + if not rows_list: + return 0 + # Defense in depth: refuse duplicate (fp, path) input + # pairs. The publisher de-duplicates by detecting + # ``duplicate_fingerprint`` earlier in the pipeline; this + # raise covers direct callers of the store. + pairs = [(r.bundle_fingerprint, r.bundle_path) for r in rows_list] + pair_counts = collections.Counter(pairs) + dupes = sorted(p for p, n in pair_counts.items() if n > 1) + if dupes: + raise ValueError( + f"publish_rows received duplicate " + f"(bundle_fingerprint, bundle_path) pairs: {dupes}" + ) + # 1. DELETE the (fp, path) pairs we're about to overwrite. + delete_pairs = sorted(set(pairs)) + self._delete_pairs(delete_pairs) + # 2. INSERT the new rows. Note: this is NOT atomic with + # the DELETE above. See the class docstring. + payload = [self._row_to_json(r) for r in rows_list] + errors = self._bq_client.insert_rows_json(self._table_id, payload) + if errors: + # BQ surfaces row-level errors in this return value + # rather than raising; propagate them so callers don't + # silently miss a half-inserted batch. + raise RuntimeError( + f"BigQuery insert_rows_json returned errors for " + f"{self._table_id}: {errors!r}" + ) + return len(rows_list) + + # -------------------------------------------------------- # + # Helpers # + # -------------------------------------------------------- # + + def _select_sql( + self, fingerprints: Optional[list[str]] + ) -> tuple[str, dict[str, Any]]: + base = ( + f"SELECT bundle_fingerprint, bundle_path, file_content, " + f"event_types, module_filename, function_name, " + f"published_at " + f"FROM `{self._table_id}` " + ) + if fingerprints is None: + return base + "ORDER BY bundle_fingerprint, bundle_path", {} + sql = ( + base + "WHERE bundle_fingerprint IN UNNEST(@fps) " + "ORDER BY bundle_fingerprint, bundle_path" + ) + return sql, {"fps": fingerprints} + + def _delete_pairs(self, pairs: list[tuple[str, str]]) -> None: + if not pairs: + return + fps = sorted({fp for fp, _ in pairs}) + sql = ( + f"DELETE FROM `{self._table_id}` " + f"WHERE bundle_fingerprint IN UNNEST(@fps) " + f"AND CONCAT(bundle_fingerprint, '::', bundle_path) " + f"IN UNNEST(@pair_keys)" + ) + pair_keys = [f"{fp}::{path}" for fp, path in pairs] + self._bq_client.query( + sql, + job_config=self._query_config({"fps": fps, "pair_keys": pair_keys}), + ).result() + + def _query_config(self, params: dict[str, Any]) -> Any: + from google.cloud import bigquery # type: ignore + + parameters = [] + for name, value in params.items(): + if isinstance(value, list): + parameters.append(bigquery.ArrayQueryParameter(name, "STRING", value)) + else: + parameters.append(bigquery.ScalarQueryParameter(name, "STRING", value)) + return bigquery.QueryJobConfig(query_parameters=parameters) + + def _row_to_json(self, row: BundleRow) -> dict[str, Any]: + import base64 + + return { + "bundle_fingerprint": row.bundle_fingerprint, + "bundle_path": row.bundle_path, + # BQ JSON streaming insert expects base64 for BYTES. + "file_content": base64.b64encode(row.file_content).decode("ascii"), + "event_types": list(row.event_types), + "module_filename": row.module_filename, + "function_name": row.function_name, + "published_at": row.published_at, + } + + +# ------------------------------------------------------------------ # +# publish # +# ------------------------------------------------------------------ # + + +def publish_bundles_to_bq( + *, + bundle_root: pathlib.Path, + store: BundleStore, + bundle_fingerprint_allowlist: Optional[Iterable[str]] = None, +) -> PublishResult: + """Walk *bundle_root*, validate each bundle via + :func:`load_bundle`, and publish the constituent files to + the mirror store. + + Args: + bundle_root: Directory containing one subdirectory per + bundle (the same layout :func:`discover_bundles` walks). + store: :class:`BundleStore` to publish into. Typically a + :class:`BigQueryBundleStore`; tests pass in-memory + fakes. + bundle_fingerprint_allowlist: Optional set of fingerprints + to publish. ``None`` publishes every successfully-loaded + bundle. Fingerprints in the allowlist that don't exist + under ``bundle_root`` are silently absent from the + result — caller asked us to publish nothing. + + Returns: + A populated :class:`PublishResult`. Loader failures and + parse failures land in ``failures``; bundles excluded by + the allowlist land in ``skipped_fingerprints``. + """ + bundle_root = pathlib.Path(bundle_root) + allowlist = ( + None + if bundle_fingerprint_allowlist is None + else set(bundle_fingerprint_allowlist) + ) + + published: list[str] = [] + skipped: list[str] = [] + failures: list[MirrorFailure] = [] + now = _now_iso_utc() + # First-pass collect candidates by fingerprint so we can + # detect cross-subdir duplicate fingerprints before publishing + # any of them. Without this guard, two subdirs claiming the + # same fingerprint would each emit their own + # ``(fingerprint, manifest.json)`` + ``(fingerprint, module)`` + # rows; ``BigQueryBundleStore.publish_rows`` would DELETE the + # composite keys (good) then INSERT both copies, leaving the + # table with duplicate logical rows that sync later rejects. + # Better to fail-closed at publish than to corrupt the table. + candidates: dict[str, list[tuple[str, list[BundleRow]]]] = ( + collections.defaultdict(list) + ) + + if not bundle_root.is_dir(): + failures.append( + MirrorFailure( + code="bundle_root_missing", + detail=( + f"bundle_root {str(bundle_root)!r} is not a directory; " + f"nothing to publish." + ), + ) + ) + return PublishResult( + published_fingerprints=(), + skipped_fingerprints=(), + failures=tuple(failures), + rows_written=0, + ) + + for child in sorted(bundle_root.iterdir()): + if not child.is_dir(): + continue + manifest_path = child / _MANIFEST_FILENAME + if not manifest_path.exists(): + failures.append( + MirrorFailure( + code="manifest_missing", + detail=f"{child.name}/manifest.json not found", + ) + ) + continue + try: + manifest_text = manifest_path.read_text(encoding="utf-8") + manifest = Manifest.from_json(manifest_text) + # ``Exception`` covers JSON-decode + KeyError on the + # from_json mapping; both are "this file isn't a usable + # manifest" failures from the publisher's perspective. + except Exception as exc: # noqa: BLE001 — record + continue + failures.append( + MirrorFailure( + code="manifest_unreadable", + detail=f"{type(exc).__name__}: {exc}", + bundle_path=str(manifest_path.relative_to(bundle_root)), + ) + ) + continue + + # Shape-check the manifest before trusting its fingerprint / + # module_filename anywhere downstream. Specifically: a + # tampered ``fingerprint`` like ``"../escape"`` would + # otherwise become a ``bundle_fingerprint`` value in the + # table, and sync would use it as a directory name. Catch + # at the source. + shape_problem = _validate_manifest_shape(manifest) + if shape_problem is not None: + failures.append( + MirrorFailure( + code="manifest_unreadable", + detail=shape_problem, + bundle_path=str(manifest_path.relative_to(bundle_root)), + ) + ) + continue + + fp = manifest.fingerprint + if allowlist is not None and fp not in allowlist: + skipped.append(fp) + continue + + # Pre-publish validation: load_bundle against the + # manifest's own fingerprint. If the bundle wouldn't load + # at the runtime, don't publish it — the mirror's job is + # to distribute *working* bundles. + load_outcome = load_bundle(child, expected_fingerprint=fp) + if isinstance(load_outcome, LoadFailure): + failures.append( + MirrorFailure( + code="bundle_load_failed", + detail=f"{load_outcome.code}: {load_outcome.detail}", + bundle_fingerprint=fp, + ) + ) + continue + + module_path = child / manifest.module_filename + if not module_path.exists(): + # load_bundle would have already caught this, but be + # explicit so the failure code is precise. + failures.append( + MirrorFailure( + code="bundle_load_failed", + detail=( + f"module file {manifest.module_filename!r} missing " + f"after load_bundle succeeded — racing filesystem?" + ), + bundle_fingerprint=fp, + ) + ) + continue + + bundle_rows = [ + BundleRow( + bundle_fingerprint=fp, + bundle_path=_MANIFEST_FILENAME, + file_content=manifest_path.read_bytes(), + event_types=manifest.event_types, + module_filename=manifest.module_filename, + function_name=manifest.function_name, + published_at=now, + ), + BundleRow( + bundle_fingerprint=fp, + bundle_path=manifest.module_filename, + file_content=module_path.read_bytes(), + event_types=manifest.event_types, + module_filename=manifest.module_filename, + function_name=manifest.function_name, + published_at=now, + ), + ] + candidates[fp].append((child.name, bundle_rows)) + + # Second pass: emit rows for fingerprints that appeared in + # exactly one subdir. Duplicate fingerprints get a + # ``duplicate_fingerprint`` failure naming all participating + # subdirs and contribute zero rows. + rows_to_publish: list[BundleRow] = [] + for fp, entries in candidates.items(): + if len(entries) > 1: + names = sorted(name for name, _ in entries) + failures.append( + MirrorFailure( + code="duplicate_fingerprint", + detail=( + f"fingerprint {fp!r} declared by multiple subdirs " + f"({names}); refusing to publish either" + ), + bundle_fingerprint=fp, + ) + ) + continue + _name, rows = entries[0] + rows_to_publish.extend(rows) + published.append(fp) + + rows_written = store.publish_rows(rows_to_publish) if rows_to_publish else 0 + + return PublishResult( + published_fingerprints=tuple(sorted(set(published))), + skipped_fingerprints=tuple(sorted(set(skipped))), + failures=tuple(failures), + rows_written=rows_written, + ) + + +# ------------------------------------------------------------------ # +# sync # +# ------------------------------------------------------------------ # + + +def sync_bundles_from_bq( + *, + store: BundleStore, + dest_dir: pathlib.Path, + bundle_fingerprint_allowlist: Optional[Iterable[str]] = None, +) -> SyncResult: + """Fetch rows from *store* and reconstruct bundles under + ``dest_dir//``. For each fingerprint the + reconstructed bundle is passed through :func:`load_bundle` + before being declared synced; tampered or incomplete bundles + fail loud at sync time rather than at runtime. + + Args: + store: :class:`BundleStore` to fetch from. Typically a + :class:`BigQueryBundleStore`; tests pass in-memory + fakes. + dest_dir: Local destination. One subdirectory per + fingerprint will be (re)written. Files outside those + subdirectories are not touched; the directory may + contain other artifacts. + bundle_fingerprint_allowlist: Optional set of fingerprints + to sync. ``None`` syncs everything the store returns. + Any fingerprint in the allowlist for which no rows are + returned shows up as a ``fingerprint_not_in_table`` + failure. + + Returns: + A populated :class:`SyncResult`. Per-bundle issues land + in ``failures``; the ``dest_dir`` field echoes the + destination so callers (and tests) don't have to thread + the path back manually. + """ + dest_dir = pathlib.Path(dest_dir) + dest_dir.mkdir(parents=True, exist_ok=True) + + allowlist = ( + None + if bundle_fingerprint_allowlist is None + else set(bundle_fingerprint_allowlist) + ) + + rows = list(store.fetch_rows(bundle_fingerprints=allowlist)) + + # Group rows by fingerprint with row-level shape checks + # interleaved so a malformed row doesn't poison the whole + # bundle's parsing. + rows_by_fp: dict[str, list[BundleRow]] = collections.defaultdict(list) + failures: list[MirrorFailure] = [] + for row in rows: + row_problem = _check_row_shape(row) + if row_problem is not None: + failures.append( + MirrorFailure( + code="malformed_row", + detail=row_problem, + bundle_fingerprint=getattr(row, "bundle_fingerprint", None), + bundle_path=getattr(row, "bundle_path", None), + ) + ) + continue + rows_by_fp[row.bundle_fingerprint].append(row) + + # If the allowlist named fingerprints we never saw, record + # them as failures so the caller knows the publish lag + # hasn't caught up. + if allowlist is not None: + for fp in sorted(allowlist - set(rows_by_fp.keys())): + failures.append( + MirrorFailure( + code="fingerprint_not_in_table", + detail=f"fingerprint {fp!r} has no rows in the mirror table", + bundle_fingerprint=fp, + ) + ) + + synced: list[str] = [] + skipped: list[str] = [] + + for fp in sorted(rows_by_fp.keys()): + bundle_rows = rows_by_fp[fp] + + # Per-fingerprint duplicate-path check before writing + # anything. + path_counts = collections.Counter(r.bundle_path for r in bundle_rows) + dupes = sorted(p for p, n in path_counts.items() if n > 1) + if dupes: + failures.append( + MirrorFailure( + code="duplicate_row", + detail=f"duplicate bundle_path(s) for {fp!r}: {dupes}", + bundle_fingerprint=fp, + ) + ) + skipped.append(fp) + continue + + by_path = {r.bundle_path: r for r in bundle_rows} + + # Step 1: the manifest row must be present and readable + # before we can know which other paths are legitimate. + manifest_row = by_path.get(_MANIFEST_FILENAME) + if manifest_row is None: + failures.append( + MirrorFailure( + code="manifest_row_missing", + detail=( + f"no row with bundle_path={_MANIFEST_FILENAME!r} for " + f"fingerprint {fp!r}" + ), + bundle_fingerprint=fp, + ) + ) + skipped.append(fp) + continue + try: + manifest = Manifest.from_json(manifest_row.file_content.decode("utf-8")) + except Exception as exc: # noqa: BLE001 — record + continue + failures.append( + MirrorFailure( + code="manifest_row_unreadable", + detail=f"{type(exc).__name__}: {exc}", + bundle_fingerprint=fp, + bundle_path=_MANIFEST_FILENAME, + ) + ) + skipped.append(fp) + continue + + # ``Manifest.from_json`` is lenient about field types and + # values — it just maps keys. A mirrored manifest can + # therefore carry e.g. ``module_filename="subdir/foo.py"`` + # which passes :func:`_validate_bundle_path` (no ``..``, + # not absolute) but would later raise ``FileNotFoundError`` + # when sync writes to ``bundle_dir / "subdir/foo.py"`` + # because the parent dir doesn't exist. Catch + # malformed-shape cases up front so the failure surfaces + # as a structured ``manifest_row_unreadable`` rather than + # bubbling out of the write step. + shape_problem = _validate_manifest_shape(manifest) + if shape_problem is not None: + failures.append( + MirrorFailure( + code="manifest_row_unreadable", + detail=shape_problem, + bundle_fingerprint=fp, + bundle_path=_MANIFEST_FILENAME, + ) + ) + skipped.append(fp) + continue + + # Step 2: validate every row's bundle_path is safe AND + # is one of the two paths a bundle legitimately contains. + # Path safety happens *before* checking the file set so + # a malformed manifest can't introduce a traversal via + # module_filename. + allowed_paths = {_MANIFEST_FILENAME, manifest.module_filename} + rejected = False + for row in bundle_rows: + problem = _validate_bundle_path(row.bundle_path) + if problem is not None: + failures.append( + MirrorFailure( + code="invalid_bundle_path", + detail=problem, + bundle_fingerprint=fp, + bundle_path=row.bundle_path, + ) + ) + rejected = True + continue + if row.bundle_path not in allowed_paths: + failures.append( + MirrorFailure( + code="unexpected_file", + detail=( + f"bundle_path {row.bundle_path!r} is not " + f"manifest.json or the manifest's module_filename " + f"({manifest.module_filename!r})" + ), + bundle_fingerprint=fp, + bundle_path=row.bundle_path, + ) + ) + rejected = True + if rejected: + skipped.append(fp) + continue + + if manifest.module_filename not in by_path: + failures.append( + MirrorFailure( + code="module_row_missing", + detail=( + f"no row with bundle_path=" + f"{manifest.module_filename!r} for fingerprint {fp!r}" + ), + bundle_fingerprint=fp, + ) + ) + skipped.append(fp) + continue + + # Step 3: write the two files to a *staging* directory and + # validate the reconstruction via load_bundle BEFORE + # touching any existing ``dest_dir//``. A bad + # mirror row must not destroy a previously-good local + # bundle: write somewhere safe, run the loader gate, then + # staged-replace the target only on success. The + # replacement itself (rmtree + move) is NOT strictly + # atomic — a crash between the two leaves the bundle + # absent on disk — but the load-bundle-failure case (the + # one the staged flow is designed to protect) is correctly + # atomic in the failure direction. + bundle_dir = dest_dir / fp + staging_dir = dest_dir / f".staging-{fp}-{uuid.uuid4().hex[:8]}" + if staging_dir.exists(): + # Extraordinarily unlikely (collision on a uuid4 prefix) + # but be defensive — never reuse a populated staging dir. + shutil.rmtree(staging_dir) + staging_dir.mkdir(parents=True) + try: + (staging_dir / _MANIFEST_FILENAME).write_bytes(manifest_row.file_content) + (staging_dir / manifest.module_filename).write_bytes( + by_path[manifest.module_filename].file_content + ) + load_outcome = load_bundle(staging_dir, expected_fingerprint=fp) + except Exception as exc: # noqa: BLE001 — record + continue + # write failure (FileNotFoundError for a still-malformed + # path, disk full, etc.) — leave any pre-existing + # ``bundle_dir`` intact, scrub the staging dir. + shutil.rmtree(staging_dir, ignore_errors=True) + failures.append( + MirrorFailure( + code="bundle_load_failed", + detail=( + f"writing staging bundle raised " + f"{type(exc).__name__}: {exc}" + ), + bundle_fingerprint=fp, + ) + ) + skipped.append(fp) + continue + + if isinstance(load_outcome, LoadFailure): + # Reconstructed bundle doesn't load — keep the old + # ``bundle_dir`` (if any) and toss the staging copy. + shutil.rmtree(staging_dir, ignore_errors=True) + failures.append( + MirrorFailure( + code="bundle_load_failed", + detail=f"{load_outcome.code}: {load_outcome.detail}", + bundle_fingerprint=fp, + ) + ) + skipped.append(fp) + continue + + # Staged replace: remove old ``bundle_dir`` (if any), then + # move staging into place. The rmtree + move pair is NOT + # strictly atomic — a crash between the two leaves the + # bundle absent on disk, recoverable by re-running sync. + # The crucial property — "don't destroy good local state + # because of bad mirror rows" — is preserved by the + # staging-then-validate flow above (load_bundle failure + # never reaches this point). + if bundle_dir.exists(): + shutil.rmtree(bundle_dir) + shutil.move(str(staging_dir), str(bundle_dir)) + + synced.append(fp) + + return SyncResult( + synced_fingerprints=tuple(sorted(set(synced))), + skipped_fingerprints=tuple(sorted(set(skipped))), + failures=tuple(failures), + dest_dir=dest_dir, + ) + + +# ------------------------------------------------------------------ # +# Helpers # +# ------------------------------------------------------------------ # + + +def _validate_bundle_path(path: str) -> Optional[str]: + """Return a problem string if *path* isn't a safe relative + bundle-internal path, or ``None`` if it's clean. + + Required shape: + + * Non-empty. + * No NUL bytes; no backslashes (so a Windows-style path + can't smuggle a traversal past a POSIX check). + * Not absolute. + * No ``..`` segments. + * No leading ``/``. + """ + if not path: + return "empty bundle_path" + if "\x00" in path: + return "bundle_path contains NUL" + if "\\" in path: + return f"bundle_path contains backslash: {path!r}" + if path.startswith("/"): + return f"bundle_path is absolute: {path!r}" + pure = pathlib.PurePosixPath(path) + if pure.is_absolute(): + return f"bundle_path is absolute: {path!r}" + if any(part == ".." for part in pure.parts): + return f"bundle_path contains '..': {path!r}" + return None + + +def _validate_manifest_shape(manifest: Manifest) -> Optional[str]: + """Return a problem string if *manifest* would let sync write + outside the bundle directory or otherwise produce a broken + reconstruction, or ``None`` if it's safe. + + Checks beyond ``Manifest.from_json``'s lenient field-mapping: + + * ``fingerprint`` and ``function_name`` are non-empty + strings. + * ``event_types`` is a tuple of strings (the dataclass field + declares ``tuple[str, ...]`` but the JSON round-trip path + doesn't enforce element types). + * ``module_filename`` is a *bare* filename — no path + separators (forward slash or backslash), no ``..``, no + ``.``, no NUL, non-empty. C2.a's loader resolves it as + ``bundle_dir / module_filename``; a name containing + ``"subdir/foo.py"`` would otherwise raise + ``FileNotFoundError`` at sync's write step instead of + surfacing as a structured ``manifest_row_unreadable``. + """ + if not isinstance(manifest.fingerprint, str) or not manifest.fingerprint: + return ( + f"manifest fingerprint must be a non-empty string; got " + f"{type(manifest.fingerprint).__name__}={manifest.fingerprint!r}" + ) + # Manifest fingerprints become row's ``bundle_fingerprint`` + # and then directory names; same strict sha256-hex enforcement + # as ``_check_row_shape``. Catching here lets publish reject + # tampered manifests before they hit the table. + if not _FINGERPRINT_PATTERN.fullmatch(manifest.fingerprint): + return ( + f"manifest fingerprint must be 64 lowercase hex characters " + f"(sha256); got {manifest.fingerprint!r}" + ) + if not isinstance(manifest.function_name, str) or not manifest.function_name: + return ( + f"manifest function_name must be a non-empty string; got " + f"{type(manifest.function_name).__name__}={manifest.function_name!r}" + ) + if not isinstance(manifest.event_types, tuple): + return ( + f"manifest event_types must be a tuple; got " + f"{type(manifest.event_types).__name__}" + ) + for index, et in enumerate(manifest.event_types): + if not isinstance(et, str): + return ( + f"manifest event_types[{index}] must be a string; got " + f"{type(et).__name__}={et!r}" + ) + if ( + not isinstance(manifest.module_filename, str) + or not manifest.module_filename + ): + return ( + f"manifest module_filename must be a non-empty string; got " + f"{type(manifest.module_filename).__name__}=" + f"{manifest.module_filename!r}" + ) + mf = manifest.module_filename + if "\x00" in mf: + return "manifest module_filename contains NUL" + if "/" in mf or "\\" in mf: + return ( + f"manifest module_filename must be a bare filename " + f"(no path separators); got {mf!r}" + ) + if mf in (".", ".."): + return f"manifest module_filename must not be {mf!r}" + return None + + +def _check_row_shape(row: Any) -> Optional[str]: + """Return a problem string if *row* isn't a well-formed + :class:`BundleRow`, or ``None`` if it's clean. + + The store's row constructor handles most shape coercion, but + the protocol allows arbitrary substitutes. Defensive + checks keep a malformed fake or a future schema-drift case + from poisoning sync. + """ + if not isinstance(row, BundleRow): + return f"row is not a BundleRow; got {type(row).__name__}" + if not isinstance(row.bundle_fingerprint, str) or not row.bundle_fingerprint: + return f"bundle_fingerprint must be a non-empty string" + # Fingerprints become directory names at sync + # (``dest_dir//`` + the staging-dir name). A + # tampered value like ``"../escape"`` would otherwise sync + # successfully and write OUTSIDE dest_dir; reject early. + if not _FINGERPRINT_PATTERN.fullmatch(row.bundle_fingerprint): + return ( + f"bundle_fingerprint must be 64 lowercase hex characters " + f"(sha256); got {row.bundle_fingerprint!r}" + ) + if not isinstance(row.bundle_path, str) or not row.bundle_path: + return f"bundle_path must be a non-empty string" + if not isinstance(row.file_content, (bytes, bytearray)): + return ( + f"file_content must be bytes; got " f"{type(row.file_content).__name__}" + ) + if not isinstance(row.event_types, tuple): + return ( + f"event_types must be a tuple of strings; got " + f"{type(row.event_types).__name__}" + ) + for index, et in enumerate(row.event_types): + if not isinstance(et, str): + return ( + f"event_types[{index}] must be a string; got " f"{type(et).__name__}" + ) + return None + + +def _now_iso_utc() -> str: + return ( + datetime.datetime.now(datetime.timezone.utc) + .replace(microsecond=0) + .isoformat() + .replace("+00:00", "Z") + ) diff --git a/tests/test_extractor_compilation_bq_bundle_mirror.py b/tests/test_extractor_compilation_bq_bundle_mirror.py new file mode 100644 index 0000000..a69fb86 --- /dev/null +++ b/tests/test_extractor_compilation_bq_bundle_mirror.py @@ -0,0 +1,1064 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the BigQuery bundle mirror (#75 PR C2.c.3). + +Coverage: + +* Round-trip — publish a local bundle, sync it back into a + fresh directory, verify ``load_bundle`` accepts the + reconstruction. +* Fingerprint allowlist — publish and sync both honor it. +* Path-safety — bundle_path values that traverse out of the + bundle dir, are absolute, or contain forbidden characters + are rejected at sync (never written to disk). +* Missing manifest — sync rejects bundles whose rows lack + ``manifest.json``. +* Malformed rows — wrong content type / wrong types on the + denorm fields surface as ``malformed_row`` failures. +* Idempotent republish — publishing the same bundle twice + ends up with one copy in the store, not two. + +Tests use an in-memory ``BundleStore`` substitute so the +suite stays fast and deterministic. The live BQ path is +covered separately by ``test_extractor_compilation_bq_bundle_mirror_live.py``. +""" + +from __future__ import annotations + +import json +import pathlib +import textwrap + +import pytest + +# ------------------------------------------------------------------ # +# Hand-built bundle helpers (mirror tests don't depend on the # +# full compile pipeline; we just need a loader-acceptable bundle) # +# ------------------------------------------------------------------ # + + +_VALID_FINGERPRINT_A = "a" * 64 +_VALID_FINGERPRINT_B = "b" * 64 + + +def _write_manifest( + bundle_dir: pathlib.Path, + *, + fingerprint: str, + event_types: tuple[str, ...] = ("bka_decision",), + module_filename: str = "extractor.py", + function_name: str = "extract_bka", +) -> None: + bundle_dir.mkdir(parents=True, exist_ok=True) + manifest = { + "fingerprint": fingerprint, + "event_types": list(event_types), + "module_filename": module_filename, + "function_name": function_name, + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-08T00:00:00Z", + } + (bundle_dir / "manifest.json").write_text( + json.dumps(manifest, sort_keys=True, indent=2), encoding="utf-8" + ) + + +_MINIMAL_VALID_SOURCE = textwrap.dedent( + """\ + def extract_bka(event, spec): + return None + """ +) + + +def _build_bundle( + parent: pathlib.Path, + *, + name: str, + fingerprint: str, + event_types: tuple[str, ...] = ("bka_decision",), +) -> pathlib.Path: + bundle_dir = parent / name + _write_manifest(bundle_dir, fingerprint=fingerprint, event_types=event_types) + (bundle_dir / "extractor.py").write_text( + _MINIMAL_VALID_SOURCE, encoding="utf-8" + ) + return bundle_dir + + +# ------------------------------------------------------------------ # +# In-memory store # +# ------------------------------------------------------------------ # + + +class _InMemoryStore: + """Minimal :class:`BundleStore` substitute. Upserts by + ``(bundle_fingerprint, bundle_path)`` so re-publishing the + same bundle replaces the prior rows.""" + + def __init__(self): + self._rows: dict[tuple[str, str], "BundleRow"] = {} + + def fetch_rows(self, *, bundle_fingerprints=None): + if bundle_fingerprints is None: + return list(self._rows.values()) + allow = set(bundle_fingerprints) + return [r for r in self._rows.values() if r.bundle_fingerprint in allow] + + def publish_rows(self, rows): + count = 0 + for r in rows: + self._rows[(r.bundle_fingerprint, r.bundle_path)] = r + count += 1 + return count + + # Test-only inspection helpers. + def all_rows(self): + return list(self._rows.values()) + + def force_insert(self, row): + """Bypass the upsert dedup. Used to construct deliberately + malformed table states (duplicate rows, malformed payloads) + that the BQ-side schema would normally forbid.""" + # Use a sentinel composite key so the row is preserved + # alongside the well-keyed copy. + self._rows[ + ( + "__force__", + f"{row.bundle_fingerprint}::{row.bundle_path}::{len(self._rows)}", + ) + ] = row + + +# ------------------------------------------------------------------ # +# Round-trip # +# ------------------------------------------------------------------ # + + +class TestRoundTrip: + + def test_publish_then_sync_round_trips(self, tmp_path: pathlib.Path): + """Publish a hand-built bundle, sync it back into a fresh + directory, verify load_bundle accepts the reconstructed + bundle and reports the same fingerprint.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import load_bundle + from bigquery_agent_analytics.extractor_compilation import LoadedBundle + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="bka", fingerprint=_VALID_FINGERPRINT_A) + + store = _InMemoryStore() + + publish = publish_bundles_to_bq( + bundle_root=bundle_root, + store=store, + ) + assert publish.published_fingerprints == (_VALID_FINGERPRINT_A,) + assert publish.skipped_fingerprints == () + assert publish.failures == () + assert publish.rows_written == 2 # manifest + module + + # The denormalized event_types column was populated from + # the manifest at publish time. + for row in store.all_rows(): + assert isinstance(row, BundleRow) + assert row.event_types == ("bka_decision",) + assert row.module_filename == "extractor.py" + assert row.function_name == "extract_bka" + + sync_dir = tmp_path / "synced" + sync = sync_bundles_from_bq( + store=store, + dest_dir=sync_dir, + ) + assert sync.synced_fingerprints == (_VALID_FINGERPRINT_A,) + assert sync.skipped_fingerprints == () + assert sync.failures == () + assert sync.dest_dir == sync_dir + + # The reconstructed bundle is loadable by C2.a's loader. + bundle_dir = sync_dir / _VALID_FINGERPRINT_A + outcome = load_bundle(bundle_dir, expected_fingerprint=_VALID_FINGERPRINT_A) + assert isinstance(outcome, LoadedBundle) + assert outcome.manifest.fingerprint == _VALID_FINGERPRINT_A + assert callable(outcome.extractor) + + def test_round_trip_multiple_bundles(self, tmp_path: pathlib.Path): + """Two bundles with distinct fingerprints publish + sync + independently; sync writes each into its own + ```` subdir.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="a", fingerprint=_VALID_FINGERPRINT_A) + _build_bundle( + bundle_root, + name="b", + fingerprint=_VALID_FINGERPRINT_B, + event_types=("other_event",), + ) + + store = _InMemoryStore() + publish = publish_bundles_to_bq( + bundle_root=bundle_root, + store=store, + ) + assert set(publish.published_fingerprints) == { + _VALID_FINGERPRINT_A, + _VALID_FINGERPRINT_B, + } + assert publish.rows_written == 4 + + sync_dir = tmp_path / "synced" + sync = sync_bundles_from_bq(store=store, dest_dir=sync_dir) + assert set(sync.synced_fingerprints) == { + _VALID_FINGERPRINT_A, + _VALID_FINGERPRINT_B, + } + assert (sync_dir / _VALID_FINGERPRINT_A / "manifest.json").exists() + assert (sync_dir / _VALID_FINGERPRINT_B / "manifest.json").exists() + + +# ------------------------------------------------------------------ # +# Allowlist # +# ------------------------------------------------------------------ # + + +class TestAllowlist: + + def test_publish_allowlist_skips_unlisted(self, tmp_path: pathlib.Path): + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="a", fingerprint=_VALID_FINGERPRINT_A) + _build_bundle( + bundle_root, + name="b", + fingerprint=_VALID_FINGERPRINT_B, + event_types=("other_event",), + ) + + store = _InMemoryStore() + publish = publish_bundles_to_bq( + bundle_root=bundle_root, + store=store, + bundle_fingerprint_allowlist=[_VALID_FINGERPRINT_A], + ) + assert publish.published_fingerprints == (_VALID_FINGERPRINT_A,) + assert publish.skipped_fingerprints == (_VALID_FINGERPRINT_B,) + # Only fp A's two rows were written; fp B was skipped. + assert publish.rows_written == 2 + fps_in_store = {r.bundle_fingerprint for r in store.all_rows()} + assert fps_in_store == {_VALID_FINGERPRINT_A} + + def test_sync_allowlist_skips_unlisted(self, tmp_path: pathlib.Path): + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="a", fingerprint=_VALID_FINGERPRINT_A) + _build_bundle( + bundle_root, + name="b", + fingerprint=_VALID_FINGERPRINT_B, + event_types=("other_event",), + ) + + store = _InMemoryStore() + publish_bundles_to_bq(bundle_root=bundle_root, store=store) + + sync = sync_bundles_from_bq( + store=store, + dest_dir=tmp_path / "synced", + bundle_fingerprint_allowlist=[_VALID_FINGERPRINT_A], + ) + assert sync.synced_fingerprints == (_VALID_FINGERPRINT_A,) + assert sync.skipped_fingerprints == () + assert sync.failures == () + # B's directory was never created. + assert not (tmp_path / "synced" / _VALID_FINGERPRINT_B).exists() + + def test_sync_allowlist_missing_fingerprint_recorded(self, tmp_path): + """Allowlist names a fingerprint that has no rows in the + table. Surfaces as a ``fingerprint_not_in_table`` + failure rather than silently succeeding with zero + synced bundles.""" + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = _InMemoryStore() + sync = sync_bundles_from_bq( + store=store, + dest_dir=tmp_path / "synced", + bundle_fingerprint_allowlist=[_VALID_FINGERPRINT_A], + ) + assert sync.synced_fingerprints == () + assert len(sync.failures) == 1 + assert sync.failures[0].code == "fingerprint_not_in_table" + assert sync.failures[0].bundle_fingerprint == _VALID_FINGERPRINT_A + + +# ------------------------------------------------------------------ # +# Path safety # +# ------------------------------------------------------------------ # + + +class TestPathSafety: + + def _build_store_with_path( + self, *, fingerprint: str, bad_path: str + ) -> "_InMemoryStore": + """Helper: store has a valid manifest row plus a row whose + bundle_path is malicious. Module file uses the bad path as + its declared filename so we can isolate sync's + path-validation behavior.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + + store = _InMemoryStore() + manifest_payload = json.dumps( + { + "fingerprint": fingerprint, + "event_types": ["bka_decision"], + # module_filename intentionally normal so the + # path check focuses on the actual row's + # bundle_path. + "module_filename": "extractor.py", + "function_name": "extract_bka", + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-08T00:00:00Z", + }, + sort_keys=True, + indent=2, + ).encode("utf-8") + store.publish_rows( + [ + BundleRow( + bundle_fingerprint=fingerprint, + bundle_path="manifest.json", + file_content=manifest_payload, + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + BundleRow( + bundle_fingerprint=fingerprint, + bundle_path=bad_path, + file_content=b"def extract_bka(event, spec): return None\n", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + ] + ) + return store + + def test_sync_rejects_traversal_path(self, tmp_path: pathlib.Path): + """``../`` in bundle_path is rejected before any file is + written. The sync must NOT create + ``dest_dir/../something``.""" + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = self._build_store_with_path( + fingerprint=_VALID_FINGERPRINT_A, bad_path="../escape.py" + ) + sync_dir = tmp_path / "synced" + sync = sync_bundles_from_bq(store=store, dest_dir=sync_dir) + + assert sync.synced_fingerprints == () + assert sync.skipped_fingerprints == (_VALID_FINGERPRINT_A,) + codes = {f.code for f in sync.failures} + assert "invalid_bundle_path" in codes + # No file was written outside the dest_dir. + assert not (sync_dir.parent / "escape.py").exists() + + def test_sync_rejects_absolute_path(self, tmp_path: pathlib.Path): + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = self._build_store_with_path( + fingerprint=_VALID_FINGERPRINT_A, bad_path="/etc/passwd" + ) + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "invalid_bundle_path" in codes + + def test_sync_rejects_backslash_path(self, tmp_path: pathlib.Path): + """Windows-style backslashes can hide traversal past a + POSIX-only check. Rejected explicitly.""" + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = self._build_store_with_path( + fingerprint=_VALID_FINGERPRINT_A, + bad_path="..\\windows-style-escape.py", + ) + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + + codes = {f.code for f in sync.failures} + assert "invalid_bundle_path" in codes + + +# ------------------------------------------------------------------ # +# Missing / malformed rows # +# ------------------------------------------------------------------ # + + +class TestMissingAndMalformedRows: + + def test_sync_rejects_when_manifest_row_missing(self, tmp_path: pathlib.Path): + """Bundle has a module row but no manifest row → fail + closed with ``manifest_row_missing`` and skip the + fingerprint.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = _InMemoryStore() + store.publish_rows( + [ + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="extractor.py", + file_content=b"def extract_bka(event, spec): return None\n", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + ] + ) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + assert sync.synced_fingerprints == () + assert sync.skipped_fingerprints == (_VALID_FINGERPRINT_A,) + assert len(sync.failures) == 1 + assert sync.failures[0].code == "manifest_row_missing" + + def test_sync_rejects_malformed_manifest_row(self, tmp_path: pathlib.Path): + """Manifest row exists but isn't valid JSON → fail closed + with ``manifest_row_unreadable``.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = _InMemoryStore() + store.publish_rows( + [ + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=b"this is not json", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + ] + ) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "manifest_row_unreadable" in codes + + def test_sync_rejects_unexpected_file_in_bundle(self, tmp_path: pathlib.Path): + """Bundle has manifest + module + an extra file. Bundles + are exactly two files; the extra one is rejected via + ``unexpected_file``.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + store = _InMemoryStore() + manifest_payload = json.dumps( + { + "fingerprint": _VALID_FINGERPRINT_A, + "event_types": ["bka_decision"], + "module_filename": "extractor.py", + "function_name": "extract_bka", + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-08T00:00:00Z", + }, + sort_keys=True, + indent=2, + ).encode("utf-8") + store.publish_rows( + [ + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=manifest_payload, + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="extractor.py", + file_content=_MINIMAL_VALID_SOURCE.encode("utf-8"), + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="README.md", + file_content=b"# extra file the mirror should refuse", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + ] + ) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "unexpected_file" in codes + # The bundle dir for that fingerprint must not exist — + # sync rejected the fingerprint before writing. + assert not (tmp_path / "synced" / _VALID_FINGERPRINT_A).exists() + + def test_sync_rejects_malformed_row_type(self, tmp_path: pathlib.Path): + """A row whose ``file_content`` isn't bytes / ``event_types`` + isn't a tuple of strings fails the shape check.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + # BundleRow is frozen but we can construct one with a + # malformed field; the shape check at sync time is what + # catches it. + bad_row = BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + # str instead of bytes + file_content="not bytes", # type: ignore[arg-type] + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + store = _InMemoryStore() + store.publish_rows([bad_row]) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "malformed_row" in codes + + def test_sync_rejects_duplicate_rows(self, tmp_path: pathlib.Path): + """The BQ table has no unique constraint; if two rows + share the same ``(bundle_fingerprint, bundle_path)``, + sync rejects rather than picking one.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + manifest_payload = json.dumps( + { + "fingerprint": _VALID_FINGERPRINT_A, + "event_types": ["bka_decision"], + "module_filename": "extractor.py", + "function_name": "extract_bka", + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-08T00:00:00Z", + }, + sort_keys=True, + indent=2, + ).encode("utf-8") + + base_row = BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=manifest_payload, + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + + store = _InMemoryStore() + store.publish_rows([base_row]) + # Force a duplicate row past the upsert dedup so we can + # exercise sync's defense. + store.force_insert(base_row) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + codes = {f.code for f in sync.failures} + assert "duplicate_row" in codes + assert sync.synced_fingerprints == () + + +# ------------------------------------------------------------------ # +# Idempotent republish # +# ------------------------------------------------------------------ # + + +class TestIdempotentRepublish: + + def test_republishing_same_bundle_does_not_accumulate_rows( + self, tmp_path: pathlib.Path + ): + """``publish_rows`` is upsert-by-``(fingerprint, path)``. + Two consecutive publishes of the same bundle produce the + same final row set, not double rows.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="bka", fingerprint=_VALID_FINGERPRINT_A) + + store = _InMemoryStore() + publish_bundles_to_bq(bundle_root=bundle_root, store=store) + rows_after_first = len(store.all_rows()) + publish_bundles_to_bq(bundle_root=bundle_root, store=store) + rows_after_second = len(store.all_rows()) + + assert rows_after_first == 2 + assert rows_after_second == 2 + + +# ------------------------------------------------------------------ # +# Publish-side failure surfaces # +# ------------------------------------------------------------------ # + + +class TestPublishFailures: + + def test_publish_skips_subdir_without_manifest(self, tmp_path: pathlib.Path): + """A subdir without ``manifest.json`` lands in + ``failures`` rather than crashing publish.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + (bundle_root / "not-a-bundle").mkdir() + + store = _InMemoryStore() + publish = publish_bundles_to_bq(bundle_root=bundle_root, store=store) + codes = {f.code for f in publish.failures} + assert "manifest_missing" in codes + assert publish.published_fingerprints == () + assert publish.rows_written == 0 + + def test_publish_skips_bundle_that_would_not_load( + self, tmp_path: pathlib.Path + ): + """A bundle whose source doesn't define the manifest's + ``function_name`` fails pre-publish via ``load_bundle``. + Surfaces as ``bundle_load_failed``; nothing is published.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + bundle_dir = bundle_root / "broken" + _write_manifest(bundle_dir, fingerprint=_VALID_FINGERPRINT_A) + (bundle_dir / "extractor.py").write_text( + # No function named ``extract_bka``. + "def something_else(event, spec): return None\n", + encoding="utf-8", + ) + + store = _InMemoryStore() + publish = publish_bundles_to_bq(bundle_root=bundle_root, store=store) + codes = {f.code for f in publish.failures} + assert "bundle_load_failed" in codes + assert publish.published_fingerprints == () + assert publish.rows_written == 0 + assert store.all_rows() == [] + + def test_publish_handles_missing_bundle_root(self, tmp_path: pathlib.Path): + """A non-existent ``bundle_root`` yields a single + ``bundle_root_missing`` failure rather than an + exception.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + publish = publish_bundles_to_bq( + bundle_root=tmp_path / "does-not-exist", + store=_InMemoryStore(), + ) + assert publish.rows_written == 0 + codes = {f.code for f in publish.failures} + assert "bundle_root_missing" in codes + + def test_publish_rejects_duplicate_fingerprints_across_subdirs( + self, tmp_path: pathlib.Path + ): + """Two subdirs of ``bundle_root`` declaring the SAME + manifest fingerprint must NOT both publish — the mirror is + keyed on ``(fingerprint, bundle_path)`` and both would + INSERT logical duplicates that sync later rejects. The + fix: fail-closed at publish, neither subdir's rows are + written.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _build_bundle(bundle_root, name="copy-a", fingerprint=_VALID_FINGERPRINT_A) + _build_bundle(bundle_root, name="copy-b", fingerprint=_VALID_FINGERPRINT_A) + + store = _InMemoryStore() + publish = publish_bundles_to_bq(bundle_root=bundle_root, store=store) + + assert publish.published_fingerprints == () + assert publish.rows_written == 0 + codes = {f.code for f in publish.failures} + assert "duplicate_fingerprint" in codes + # The detail names both participating subdirs so an + # operator can find them. + detail = next( + f.detail for f in publish.failures if f.code == "duplicate_fingerprint" + ) + assert "copy-a" in detail and "copy-b" in detail + # No rows leaked into the store. + assert store.all_rows() == [] + + +# ------------------------------------------------------------------ # +# Round-2 reviewer findings # +# ------------------------------------------------------------------ # + + +class TestRoundTwoFindings: + """Reproducers + locks for the four PR #148 reviewer findings. + + The first three are functional bugs (sync would raise on a + malformed manifest, sync would destroy good local state on a + bad mirror row, publish would silently emit logical + duplicates). The fourth is a defensive raise on the store. + """ + + def _good_bundle( + self, + tmp_path: pathlib.Path, + *, + fingerprint: str = _VALID_FINGERPRINT_A, + ) -> pathlib.Path: + bundle_root = tmp_path / "bundles" + bundle_root.mkdir(exist_ok=True) + _build_bundle(bundle_root, name="bka", fingerprint=fingerprint) + return bundle_root + + def test_sync_rejects_manifest_with_path_separator_in_module_filename( + self, tmp_path: pathlib.Path + ): + """Manifest row whose ``module_filename`` contains ``/`` + used to slip past ``_validate_bundle_path`` and trigger + ``FileNotFoundError`` at the write step (the parent dir + doesn't exist). The shape check now catches it at + ``manifest_row_unreadable`` with a clear message.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + # bundle_path equals the malformed module_filename so the + # path-traversal check (which doesn't reject simple + # subdirs) would otherwise let this through. + manifest_payload = json.dumps( + { + "fingerprint": _VALID_FINGERPRINT_A, + "event_types": ["bka_decision"], + "module_filename": "subdir/extractor.py", + "function_name": "extract_bka", + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-08T00:00:00Z", + }, + sort_keys=True, + indent=2, + ).encode("utf-8") + store = _InMemoryStore() + store.publish_rows( + [ + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=manifest_payload, + event_types=("bka_decision",), + module_filename="subdir/extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="subdir/extractor.py", + file_content=_MINIMAL_VALID_SOURCE.encode("utf-8"), + event_types=("bka_decision",), + module_filename="subdir/extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + ] + ) + + sync = sync_bundles_from_bq(store=store, dest_dir=tmp_path / "synced") + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "manifest_row_unreadable" in codes + # The bundle dir was never written — staging stays + # internal to sync. + assert not (tmp_path / "synced" / _VALID_FINGERPRINT_A).exists() + + def test_sync_failure_preserves_existing_good_bundle( + self, tmp_path: pathlib.Path + ): + """If a previously-good bundle exists at + ``dest_dir//`` and a later sync brings back + corrupt rows for the same fingerprint, the existing good + bundle must NOT be destroyed. The staging-then-validate + flow writes to a side directory; the target is replaced + only after ``load_bundle`` succeeds on the staged copy.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import load_bundle + from bigquery_agent_analytics.extractor_compilation import LoadedBundle + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + # Step 1: publish a good bundle, sync it locally so we + # have an established good local state. + bundle_root = self._good_bundle(tmp_path) + store = _InMemoryStore() + publish_bundles_to_bq(bundle_root=bundle_root, store=store) + sync_dir = tmp_path / "synced" + initial_sync = sync_bundles_from_bq(store=store, dest_dir=sync_dir) + assert initial_sync.synced_fingerprints == (_VALID_FINGERPRINT_A,) + + good_bundle = sync_dir / _VALID_FINGERPRINT_A + good_module_bytes = (good_bundle / "extractor.py").read_bytes() + + # Step 2: corrupt the store — replace the module row with + # garbage source the loader will reject. Same fingerprint + # in the manifest row, so the corrupt re-sync targets the + # exact local directory we want to protect. + corrupt_manifest = (good_bundle / "manifest.json").read_bytes() + store_with_corruption = _InMemoryStore() + store_with_corruption.publish_rows( + [ + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=corrupt_manifest, + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="extractor.py", + # No function named extract_bka → load_bundle + # rejects with function_not_found. + file_content=b"def something_else(event, spec): return None\n", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ), + ] + ) + + # Step 3: re-sync from the corrupt store. The bundle dir + # exists; the staged reconstruction must fail load_bundle; + # the existing good bundle must NOT be destroyed. + second_sync = sync_bundles_from_bq( + store=store_with_corruption, dest_dir=sync_dir + ) + assert second_sync.synced_fingerprints == () + codes = {f.code for f in second_sync.failures} + assert "bundle_load_failed" in codes + + # Local good bundle is intact. + assert good_bundle.exists() + assert (good_bundle / "extractor.py").read_bytes() == good_module_bytes + outcome = load_bundle( + good_bundle, expected_fingerprint=_VALID_FINGERPRINT_A + ) + assert isinstance(outcome, LoadedBundle) + + # Staging directories were cleaned up — no orphaned + # ".staging-*" dirs left behind. + leftover_staging = [ + p for p in sync_dir.iterdir() if p.name.startswith(".staging-") + ] + assert leftover_staging == [] + + def test_publish_rows_rejects_duplicate_input_pairs(self): + """``BigQueryBundleStore.publish_rows`` raises + ``ValueError`` on duplicate ``(fingerprint, path)`` input + pairs — defense in depth on top of the publisher-side + ``duplicate_fingerprint`` guard.""" + from bigquery_agent_analytics.extractor_compilation import BigQueryBundleStore + from bigquery_agent_analytics.extractor_compilation import BundleRow + + row = BundleRow( + bundle_fingerprint=_VALID_FINGERPRINT_A, + bundle_path="manifest.json", + file_content=b"{}", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + + class _FakeBQClient: + + def query(self, *args, **kwargs): + raise AssertionError("DELETE must not run on duplicate input") + + def insert_rows_json(self, *args, **kwargs): + raise AssertionError("INSERT must not run on duplicate input") + + store = BigQueryBundleStore(bq_client=_FakeBQClient(), table_id="p.d.t") + with pytest.raises(ValueError, match=r"duplicate.*manifest\.json"): + store.publish_rows([row, row]) + + def test_sync_rejects_tampered_bundle_fingerprint_path_escape( + self, tmp_path: pathlib.Path + ): + """A row whose ``bundle_fingerprint="../escape"`` used to + sync successfully and write OUTSIDE ``dest_dir`` because + the fingerprint flows straight into ``dest_dir / + fingerprint``. The shape check now requires strict 64-char + lowercase hex (sha256) and rejects anything else as + ``malformed_row`` BEFORE any path is computed.""" + from bigquery_agent_analytics.extractor_compilation import BundleRow + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + tampered_row = BundleRow( + bundle_fingerprint="../escape", + bundle_path="manifest.json", + file_content=b"{}", + event_types=("bka_decision",), + module_filename="extractor.py", + function_name="extract_bka", + published_at="2026-05-08T00:00:00Z", + ) + store = _InMemoryStore() + store.publish_rows([tampered_row]) + + sync_dir = tmp_path / "synced" + sync = sync_bundles_from_bq(store=store, dest_dir=sync_dir) + + assert sync.synced_fingerprints == () + codes = {f.code for f in sync.failures} + assert "malformed_row" in codes + # Crucially: nothing was written outside dest_dir. + assert not (sync_dir.parent / "escape").exists() + # And no ``../escape`` subdir of dest_dir either. + assert not any(p.name == "escape" for p in sync_dir.parent.iterdir()) + + def test_publish_rejects_tampered_manifest_fingerprint( + self, tmp_path: pathlib.Path + ): + """Defense in depth: the publish-side shape check now + catches a manifest whose ``fingerprint`` isn't a clean + sha256 hex string, so a tampered local manifest can never + introduce a path-escape value into the table.""" + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + + bundle_root = tmp_path / "bundles" + bundle_root.mkdir() + _write_manifest( + bundle_root / "tampered", + fingerprint="../escape", + ) + (bundle_root / "tampered" / "extractor.py").write_text( + _MINIMAL_VALID_SOURCE, encoding="utf-8" + ) + + store = _InMemoryStore() + publish = publish_bundles_to_bq(bundle_root=bundle_root, store=store) + + assert publish.published_fingerprints == () + assert publish.rows_written == 0 + codes = {f.code for f in publish.failures} + assert "manifest_unreadable" in codes + # Detail names the offending fingerprint shape so an + # operator can spot the tampered file. + detail = next( + f.detail for f in publish.failures if f.code == "manifest_unreadable" + ) + assert "fingerprint" in detail + assert store.all_rows() == [] + + def test_bigquery_store_rejects_malformed_table_id(self): + """``BigQueryBundleStore.__init__`` validates ``table_id`` + at construction so a malformed value (backtick, semicolon, + wrong dot count, injection attempt) raises ``ValueError`` + before any SQL interpolation. The constructor is the only + place where ``table_id`` enters a backticked SQL string; + catching here prevents downstream injection.""" + from bigquery_agent_analytics.extractor_compilation import BigQueryBundleStore + + class _FakeBQClient: + pass + + fake = _FakeBQClient() + + # Wrong dot count. + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="onlyone") + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="two.parts") + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="four.parts.here.tbl") + + # Backtick — would break out of the quoted identifier. + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore( + bq_client=fake, table_id="proj.ds.tbl`; DROP TABLE x; --" + ) + + # Semicolon / SQL injection markers. + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="proj.ds.tbl;DROP") + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="proj.ds.tbl --comment") + + # Whitespace. + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="proj. ds.tbl") + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="proj.ds.tbl\n") + + # Empty segment. + with pytest.raises(ValueError, match=r"not a well-formed"): + BigQueryBundleStore(bq_client=fake, table_id="proj..tbl") + + # Non-string. + with pytest.raises(ValueError, match=r"must be a string"): + BigQueryBundleStore(bq_client=fake, table_id=None) # type: ignore[arg-type] + + # Valid forms — should NOT raise. + BigQueryBundleStore( + bq_client=fake, table_id="my-project.my_dataset.compiled_bundles" + ) + BigQueryBundleStore(bq_client=fake, table_id="p.d.t") diff --git a/tests/test_extractor_compilation_bq_bundle_mirror_live.py b/tests/test_extractor_compilation_bq_bundle_mirror_live.py new file mode 100644 index 0000000..69cb0ee --- /dev/null +++ b/tests/test_extractor_compilation_bq_bundle_mirror_live.py @@ -0,0 +1,142 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Live BigQuery round-trip test for the bundle mirror +(#75 PR C2.c.3). + +Skipped by default. To run, set: + + BQAA_RUN_LIVE_TESTS=1 + BQAA_RUN_LIVE_BQ_MIRROR_TESTS=1 + PROJECT_ID=... + DATASET_ID=... + BQAA_BQ_LOCATION=US # optional, defaults to US + +The test creates a temporary table in ``DATASET_ID``, runs +``publish_bundles_to_bq`` + ``sync_bundles_from_bq`` against +it, asserts the reconstructed bundle is loader-acceptable, +then drops the table. No bundle content leaves the local +machine outside of the BQ table the test owns. + +CI does NOT run this — the BQ dependency is intentionally +opt-in. +""" + +from __future__ import annotations + +import json +import os +import pathlib +import textwrap +import uuid + +import pytest + +_LIVE = ( + os.environ.get("BQAA_RUN_LIVE_TESTS") == "1" + and os.environ.get("BQAA_RUN_LIVE_BQ_MIRROR_TESTS") == "1" +) + +pytestmark = pytest.mark.skipif( + not _LIVE, + reason=( + "Live BQ mirror tests skipped. Set BQAA_RUN_LIVE_TESTS=1 plus " + "BQAA_RUN_LIVE_BQ_MIRROR_TESTS=1 plus PROJECT_ID + DATASET_ID " + "to opt in. Default CI does NOT run this — the BigQuery " + "dependency is intentionally opt-in." + ), +) + + +_VALID_FINGERPRINT = "c" * 64 + +_MINIMAL_VALID_SOURCE = textwrap.dedent( + """\ + def extract_bka(event, spec): + return None + """ +) + + +def _write_bundle(bundle_dir: pathlib.Path) -> None: + bundle_dir.mkdir(parents=True, exist_ok=True) + manifest = { + "fingerprint": _VALID_FINGERPRINT, + "event_types": ["bka_decision"], + "module_filename": "extractor.py", + "function_name": "extract_bka", + "compiler_package_version": "0.0.0", + "template_version": "v0.1", + "transcript_builder_version": "tb-1", + "created_at": "2026-05-12T00:00:00Z", + } + (bundle_dir / "manifest.json").write_text( + json.dumps(manifest, sort_keys=True, indent=2), encoding="utf-8" + ) + (bundle_dir / "extractor.py").write_text( + _MINIMAL_VALID_SOURCE, encoding="utf-8" + ) + + +def test_live_bigquery_round_trip(tmp_path: pathlib.Path): + """Publish + sync against a real BigQuery table. + + Assertions are contract-level: round-trip succeeds with no + failures, the reconstructed bundle is loader-acceptable. The + test doesn't pin BQ-specific behavior (job IDs, latency) — + those are BigQuery's concern. + """ + from google.cloud import bigquery + + from bigquery_agent_analytics.extractor_compilation import BigQueryBundleStore + from bigquery_agent_analytics.extractor_compilation import load_bundle + from bigquery_agent_analytics.extractor_compilation import LoadedBundle + from bigquery_agent_analytics.extractor_compilation import publish_bundles_to_bq + from bigquery_agent_analytics.extractor_compilation import sync_bundles_from_bq + + project_id = os.environ["PROJECT_ID"] + dataset_id = os.environ["DATASET_ID"] + location = os.environ.get("BQAA_BQ_LOCATION", "US") + + table_name = f"bqaa_mirror_test_{uuid.uuid4().hex[:12]}" + table_id = f"{project_id}.{dataset_id}.{table_name}" + client = bigquery.Client(project=project_id, location=location) + + store = BigQueryBundleStore(bq_client=client, table_id=table_id) + store.ensure_table() + try: + bundle_root = tmp_path / "bundles" + _write_bundle(bundle_root / "bka") + + publish = publish_bundles_to_bq(bundle_root=bundle_root, store=store) + assert publish.failures == () + assert publish.published_fingerprints == (_VALID_FINGERPRINT,) + assert publish.rows_written == 2 + + sync_dir = tmp_path / "synced" + sync = sync_bundles_from_bq(store=store, dest_dir=sync_dir) + assert sync.failures == () + assert sync.synced_fingerprints == (_VALID_FINGERPRINT,) + + outcome = load_bundle( + sync_dir / _VALID_FINGERPRINT, + expected_fingerprint=_VALID_FINGERPRINT, + ) + assert isinstance(outcome, LoadedBundle) + assert outcome.manifest.fingerprint == _VALID_FINGERPRINT + assert callable(outcome.extractor) + finally: + # Always clean up so consecutive runs don't accumulate + # tables in the test dataset. + client.delete_table(table_id, not_found_ok=True)