From c4aa403ccb1288421293c77913525ed41b0f78b3 Mon Sep 17 00:00:00 2001 From: Egor Kraev Date: Thu, 14 May 2026 17:28:02 +0200 Subject: [PATCH 1/3] DEV-1416: refresh memory embeddings on slayer ingest / --ingest-on-startup Per-datasource ingest pass now re-embeds every memory whose canonical entities are rooted at the datasource. A stale embeddings.db (created without an API key, or after a manual memories.yaml edit) is repaired by the next ingest without any extra step. Per-memory embed failures attribute as IngestionError(model_name="memory:", ...) so a startup log inspection can distinguish them from datasource-doc / model failures. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 4 +- docs/concepts/ingestion.md | 9 ++ docs/concepts/search.md | 7 +- pyproject.toml | 2 +- slayer/engine/ingestion.py | 36 ++++- tests/test_idempotent_ingestion.py | 230 ++++++++++++++++++++++++++++- tests/test_startup_ingest.py | 76 ++++++++++ 7 files changed, 357 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f4d31bad..63139ee4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -93,7 +93,7 @@ poetry run ruff check slayer/ tests/ - **Memories + semantic search** (DEV-1357 + DEV-1375): An agent-memory layer indexed by canonical entity strings. Two write-side tools — `save_memory(learning, linked_entities)` and `forget_memory(id)` — record per-entity notes (optionally bundled with an example `SlayerQuery`). Retrieval is unified into a single `search(entities, query, question, max_memories=5, max_example_queries=2, max_entities=5)` tool — there is no separate `recall_memories` surface. `linked_entities` accepts either a list of entity strings (resolved strictly) or an inline `SlayerQuery`/dict (entities auto-extracted; warnings non-fatal; the query is persisted on the memory). The canonical form is exactly one of ``, `.`, `..` (≤ 3 dotted segments after canonicalisation). Aggregation suffixes are stripped (`revenue:sum` → `..revenue`); `*:count` collapses to the source model; multi-hop dotted paths keep only the leaf (`orders.customers.regions.name` → `{.orders, .regions.name}`). The resolver lives in `slayer/memories/resolver.py`; the unified `Memory` row + storage primitives are concrete on `StorageBackend` (ID format / entity-intersection filter), with backends only implementing the row-shaped CRUD + a one-line `_next_memory_seq` that derives the next id from the existing corpus. `inspect_model` auto-renders a `Learnings` section listing only memories where `query is None`; query-bearing memories surface only via `search` (in the `example_queries` bucket). **Memory ids** (DEV-1405): positive ints that increase monotonically while the corpus grows; YAMLStorage derives the next id from the last row of `memories.yaml`, SQLiteStorage from `SELECT MAX(id) + 1 FROM memories`. Ids of deleted memories may be reused by future saves; `delete_memory` already cascades to the matching embedding row so reuse strands no data. - `search` runs up to three parallel channels merged by RRF (DEV-1386 adds the third). **Channel 1** is entity-overlap BM25 over memories (`slayer/memories/ranker.py` using `rank_bm25.BM25Plus`, DEV-1365) — a precisely-tagged memory outranks one with a long entity list that overlaps incidentally. **Channel 2** is a fresh in-memory tantivy index built per call over memories ∪ entities (datasources / non-hidden models / non-hidden columns / named measures / aggregations), using tantivy's `en_stem` analyzer (Porter stemmer + default tokenizer, splits on `_` and `.`). **Channel 3** (DEV-1386, optional via the `embedding_search` pip extra) is dense embedding similarity over the same memories ∪ entities corpus, computed numpy-only against rows persisted in a sidecar `embeddings` table keyed by `(canonical_id, embedding_model_name)`. The SQL lives in `slayer/storage/sidecar_embedding_store.py` (DEV-1405) — both `SQLiteStorage` and `YAMLStorage` instantiate a `SidecarEmbeddingStore` and forward all embedding CRUD to it. SQLiteStorage points it at the main `.db` file; YAMLStorage points it at a dedicated `/embeddings.db` sidecar so the YAML store keeps its git-diffable shape while embeddings live in a fast indexed store. **Cascade semantics** (DEV-1405 fix): `delete_embeddings_for_canonical(canonical_id_prefix=X)` matches the canonical id exactly OR as a strict dotted-path descendant (`X + "." + …`) — never as a character prefix, so deleting `memory:4` no longer also nukes `memory:42`. **Hot-path batching** (DEV-1405): `StorageBackend` exposes `save_embeddings(rows)` and `get_embeddings_for_canonical_ids(canonical_ids, embedding_model_name)` with default M-iteration impls, overridden by the bundled backends to issue single batched round-trips through `SidecarEmbeddingStore.save_many` / `get_many`; `EmbeddingService._apply_pending` uses them so one `refresh_model_subtree` issues exactly one batched read + one batched write regardless of subtree size. The active embedding model is read from `SLAYER_EMBEDDING_MODEL` (default `openai/text-embedding-3-small`) and dispatched via litellm; provider credentials are read by litellm directly (`OPENAI_API_KEY`, etc.). When the extra is not installed, the model has no rows, or the query embedding call fails, channel 3 contributes nothing and emits a single warning into `SearchResponse.warnings`; tantivy + BM25 continue to work. Refresh runs inline on `slayer ingest` / `edit_model` / `save_memory` and skips the litellm call when the rendered `content_hash` matches the stored row (cheap idempotent re-runs). Per-entity embed failures are non-fatal — search degrades gracefully. Memory rankings from every active channel are fused via Reciprocal Rank Fusion (`k=60`, hand-rolled in `slayer/search/rrf.py`); **entity hits from channels 2 and 3 are now also RRF-fused** (channel 1 contributes only to memory ranking). Memory hits are partitioned by `Memory.query is None` into `memories` (learning-only, small) and `example_queries` (query-bearing, bulky) — independent caps via `max_memories` and `max_example_queries` so bulky examples cannot crowd out small learnings. The response also echoes `resolved_input_entities` for diagnostics. Empty-input fallback returns the newest `max_memories` learning-only + newest `max_example_queries` query-bearing memories with a warning. Each indexed entity carries a `text` field rendered by `slayer/search/render.py` — named children (columns / measures / aggregations / join targets) are mentioned by name + kind only (no descriptions, since each child has its own indexed doc), while non-named children (model filters, model `sql` block, join `pairs`, aggregation `params`) are included in full. `meta` is **excluded** from indexed text (DEV-1377 hardening). Hidden models / hidden columns are skipped. **`datasource` filter** (DEV-1409): all four surfaces (`MCP search`, `POST /search`, `slayer search --datasource`, `SlayerClient.search`) accept an optional `datasource: Optional[str] = None`. When set, every channel pre-filters its corpus to that one datasource — entity hits only include docs rooted at it (exact name or strict dotted-path descendant); memories surface when any of their `entities` is rooted at it (memories spanning multiple datasources surface from each); BM25 / IDF / cosine corpus reflect only the filtered subset. Unknown datasource → `ValueError` (HTTP 400 on REST). Helper: `slayer.memories.resolver.canonical_id_rooted_at`. + `search` runs up to three parallel channels merged by RRF (DEV-1386 adds the third). **Channel 1** is entity-overlap BM25 over memories (`slayer/memories/ranker.py` using `rank_bm25.BM25Plus`, DEV-1365) — a precisely-tagged memory outranks one with a long entity list that overlaps incidentally. **Channel 2** is a fresh in-memory tantivy index built per call over memories ∪ entities (datasources / non-hidden models / non-hidden columns / named measures / aggregations), using tantivy's `en_stem` analyzer (Porter stemmer + default tokenizer, splits on `_` and `.`). **Channel 3** (DEV-1386, optional via the `embedding_search` pip extra) is dense embedding similarity over the same memories ∪ entities corpus, computed numpy-only against rows persisted in a sidecar `embeddings` table keyed by `(canonical_id, embedding_model_name)`. The SQL lives in `slayer/storage/sidecar_embedding_store.py` (DEV-1405) — both `SQLiteStorage` and `YAMLStorage` instantiate a `SidecarEmbeddingStore` and forward all embedding CRUD to it. SQLiteStorage points it at the main `.db` file; YAMLStorage points it at a dedicated `/embeddings.db` sidecar so the YAML store keeps its git-diffable shape while embeddings live in a fast indexed store. **Cascade semantics** (DEV-1405 fix): `delete_embeddings_for_canonical(canonical_id_prefix=X)` matches the canonical id exactly OR as a strict dotted-path descendant (`X + "." + …`) — never as a character prefix, so deleting `memory:4` no longer also nukes `memory:42`. **Hot-path batching** (DEV-1405): `StorageBackend` exposes `save_embeddings(rows)` and `get_embeddings_for_canonical_ids(canonical_ids, embedding_model_name)` with default M-iteration impls, overridden by the bundled backends to issue single batched round-trips through `SidecarEmbeddingStore.save_many` / `get_many`; `EmbeddingService._apply_pending` uses them so one `refresh_model_subtree` issues exactly one batched read + one batched write regardless of subtree size. The active embedding model is read from `SLAYER_EMBEDDING_MODEL` (default `openai/text-embedding-3-small`) and dispatched via litellm; provider credentials are read by litellm directly (`OPENAI_API_KEY`, etc.). When the extra is not installed, the model has no rows, or the query embedding call fails, channel 3 contributes nothing and emits a single warning into `SearchResponse.warnings`; tantivy + BM25 continue to work. Refresh runs inline on `slayer ingest` / `edit_model` / `save_memory` and skips the litellm call when the rendered `content_hash` matches the stored row (cheap idempotent re-runs). DEV-1416 closes the corresponding gap on the `slayer ingest` / `--ingest-on-startup` path: each per-datasource pass now also re-embeds every memory whose canonical entities are rooted at the datasource (via `canonical_id_rooted_at`), so a stale `embeddings.db` (created without an API key, or after a manual `memories.yaml` edit) is repaired by the next ingest without any extra steps. Per-memory failures attribute as `IngestionError(model_name="memory:", …)` in `IdempotentIngestResult.errors`. Per-entity embed failures are non-fatal — search degrades gracefully. Memory rankings from every active channel are fused via Reciprocal Rank Fusion (`k=60`, hand-rolled in `slayer/search/rrf.py`); **entity hits from channels 2 and 3 are now also RRF-fused** (channel 1 contributes only to memory ranking). Memory hits are partitioned by `Memory.query is None` into `memories` (learning-only, small) and `example_queries` (query-bearing, bulky) — independent caps via `max_memories` and `max_example_queries` so bulky examples cannot crowd out small learnings. The response also echoes `resolved_input_entities` for diagnostics. Empty-input fallback returns the newest `max_memories` learning-only + newest `max_example_queries` query-bearing memories with a warning. Each indexed entity carries a `text` field rendered by `slayer/search/render.py` — named children (columns / measures / aggregations / join targets) are mentioned by name + kind only (no descriptions, since each child has its own indexed doc), while non-named children (model filters, model `sql` block, join `pairs`, aggregation `params`) are included in full. `meta` is **excluded** from indexed text (DEV-1377 hardening). Hidden models / hidden columns are skipped. **`datasource` filter** (DEV-1409): all four surfaces (`MCP search`, `POST /search`, `slayer search --datasource`, `SlayerClient.search`) accept an optional `datasource: Optional[str] = None`. When set, every channel pre-filters its corpus to that one datasource — entity hits only include docs rooted at it (exact name or strict dotted-path descendant); memories surface when any of their `entities` is rooted at it (memories spanning multiple datasources surface from each); BM25 / IDF / cosine corpus reflect only the filtered subset. Unknown datasource → `ValueError` (HTTP 400 on REST). Helper: `slayer.memories.resolver.canonical_id_rooted_at`. Sample-value snapshots cached on `Column.sampled` (v6 schema bump, no-op forward migration in `slayer/storage/v6_migration.py`); refreshed on every `slayer ingest` for table-backed models, on `slayer search refresh-samples`, on `edit_model` (column-level edits → that column; `model.filters` / `model.sql` / `source_queries` change → all columns), and lazily on `inspect_model` cache miss (best-effort write-back). sql-mode and query-backed sample-value coverage is deferred to [DEV-1377](https://linear.app/motley-ai/issue/DEV-1377). Surfaces: write side via MCP, REST (`POST /memories`, `DELETE /memories/{id}`), CLI (`slayer memory {save,forget}`), and `SlayerClient`; retrieval via MCP (`search`), REST (`POST /search`), CLI (`slayer search [--entity ...] [--query ...] [--question ...] [--max-example-queries N]`, `slayer search refresh-samples`), and `SlayerClient.search()`. See [docs/concepts/memories.md](docs/concepts/memories.md) and [docs/concepts/search.md](docs/concepts/search.md). @@ -112,7 +112,7 @@ poetry run ruff check slayer/ tests/ - `slayer datasources create-inline` supports `--password-stdin` for secure credential input. - `slayer datasources test` verifies connectivity. - `slayer datasources create demo [--ingest]` spins up the bundled Jaffle Shop DuckDB (idempotent). `slayer serve --demo` and `slayer mcp --demo` do the same at server startup. Requires the `duckdb` extra and `jafgen` (git-only install); missing deps trigger a clean install-hint message. Lives in `slayer/demo/jaffle_shop.py`. -- `slayer serve --ingest-on-startup` and `slayer mcp --ingest-on-startup` (DEV-1392) — opt-in boot-time idempotent auto-ingestion across every configured datasource, sync-before-listen (uvicorn/mcp.run don't start until ingest finishes). Continue-on-failure: per-datasource errors are friendly-formatted to stderr and never abort startup; `storage.list_datasources()` raising is the only thing that prevents the server from starting. `to_delete` drift entries are printed but **never auto-applied** — destructive cleanup stays gated behind `slayer validate-models --force-clean [--yes]`. Composes freely with `--demo` (demo first, then the ingest pass over every datasource including the freshly-created demo). Also exposed via `SLAYER_INGEST_ON_STARTUP=1` env var (flag wins when both set) and the `ingest_on_startup=True` kwarg on `create_app` / `create_mcp_server`. All output goes to stderr — `slayer mcp` stdio JSON-RPC remains protocol-safe. Orchestrator: `slayer/engine/ingestion.py::ingest_all_datasources_idempotent`. See [docs/concepts/ingestion.md](docs/concepts/ingestion.md#ingesting-at-startup). +- `slayer serve --ingest-on-startup` and `slayer mcp --ingest-on-startup` (DEV-1392) — opt-in boot-time idempotent auto-ingestion across every configured datasource, sync-before-listen (uvicorn/mcp.run don't start until ingest finishes). Continue-on-failure: per-datasource errors are friendly-formatted to stderr and never abort startup; `storage.list_datasources()` raising is the only thing that prevents the server from starting. `to_delete` drift entries are printed but **never auto-applied** — destructive cleanup stays gated behind `slayer validate-models --force-clean [--yes]`. Composes freely with `--demo` (demo first, then the ingest pass over every datasource including the freshly-created demo). Also exposed via `SLAYER_INGEST_ON_STARTUP=1` env var (flag wins when both set) and the `ingest_on_startup=True` kwarg on `create_app` / `create_mcp_server`. All output goes to stderr — `slayer mcp` stdio JSON-RPC remains protocol-safe. Orchestrator: `slayer/engine/ingestion.py::ingest_all_datasources_idempotent`. **Memory embeddings** (DEV-1416): each per-datasource pass also re-embeds every memory whose canonical entities are rooted at the datasource, so a stale `embeddings.db` is repaired by the next `--ingest-on-startup` without extra steps. See [docs/concepts/ingestion.md](docs/concepts/ingestion.md#ingesting-at-startup). - `slayer validate-models [--datasource X] [--force-clean] [--yes]` (DEV-1356) — read-only diff against live schemas; with `--force-clean`, prompts to apply each delete via `engine.apply_drift_deletes`. See [docs/concepts/schema-drift.md](docs/concepts/schema-drift.md). - `slayer storage migrate-types [--data-source X] [--dry-run]` (DEV-1361) — refine `DOUBLE → INT` on base columns whose live SQL type is integer for every persisted model, then write the refined v5 dict back. Hard-fails if a datasource is unreachable. The same refinement runs transparently inside `storage.get_model` on first load; this CLI is a batch / inspectable alternative. - `slayer search [--entity ENT ...] [--query JSON_OR_@FILE] [--question TEXT] [--datasource DS] [--max-memories N] [--max-example-queries N] [--max-entities N] [--format json|text]` (DEV-1375 / DEV-1386 / DEV-1409) — up to three-channel semantic search over memories + canonical entities (BM25 over memory entity tags + tantivy full-text + optional dense embedding similarity). `--datasource` scopes the corpus to one datasource (entity hits + memories pre-filtered). See [docs/concepts/search.md](docs/concepts/search.md). diff --git a/docs/concepts/ingestion.md b/docs/concepts/ingestion.md index 17750bac..7c47d960 100644 --- a/docs/concepts/ingestion.md +++ b/docs/concepts/ingestion.md @@ -153,6 +153,15 @@ flags compose: `--demo` runs first (creating the Jaffle Shop datasource), then the startup-ingest pass runs over every datasource including the freshly-created demo. +Each per-datasource pass refreshes embeddings for the datasource doc, +every visible model + its visible children, **and every memory whose +canonical entities are rooted at the datasource** (DEV-1416). A stale +`embeddings.db` (created without an `OPENAI_API_KEY`, or after a manual +`memories.yaml` edit) is therefore repaired by the next +`--ingest-on-startup` with no extra step. Per-memory embed failures +surface as `IngestionError(model_name="memory:", …)` in the +result's `errors` list. + ### CLI ```bash diff --git a/docs/concepts/search.md b/docs/concepts/search.md index 25789995..3e31f724 100644 --- a/docs/concepts/search.md +++ b/docs/concepts/search.md @@ -253,7 +253,12 @@ sql-mode and query-backed models are silently skipped in v1. (`EmbeddingService._apply_pending`) issues one batched `get_embeddings_for_canonical_ids` for the hash-skip filter and one batched `save_embeddings` for the persist step (DEV-1405) — refresh - cost is independent of subtree size. + cost is independent of subtree size. **Memories** are included in the + `slayer ingest` / `--ingest-on-startup` per-datasource refresh + (DEV-1416), filtered to memories with at least one canonical entity + rooted at the current datasource — so `embeddings.db` can be repaired + by re-running ingest, no separate `slayer embeddings refresh` step + required. - **Cascade** semantics (DEV-1405 fix): `delete_embeddings_for_canonical` matches the canonical id exactly OR as a strict dotted-path descendant (`.<...>`) — never as a character prefix. So `delete_memory(4)` diff --git a/pyproject.toml b/pyproject.toml index 28f38ccf..2966afbb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "motley-slayer" -version = "0.6.4" +version = "0.6.5" description = "A lightweight, agent-first semantic layer for AI agents" requires-python = ">=3.11" license = "MIT" diff --git a/slayer/engine/ingestion.py b/slayer/engine/ingestion.py index 128b9077..48731544 100644 --- a/slayer/engine/ingestion.py +++ b/slayer/engine/ingestion.py @@ -9,6 +9,7 @@ import asyncio import logging +import re import sys from collections import defaultdict, deque from typing import Any, Dict, List, Optional, Set, TextIO, Tuple @@ -21,8 +22,12 @@ from slayer.core.models import Column, DatasourceConfig, ModelJoin, SlayerModel from slayer.engine.profiling import refresh_all_table_backed_sampled from slayer.engine.query_engine import SlayerQueryEngine +from slayer.memories.resolver import canonical_id_rooted_at from slayer.storage.base import StorageBackend + +_MEMORY_REF_RE = re.compile(r"\bmemory:\d+\b") + logger = logging.getLogger(__name__) # Module-level dedup set for unrecognized SA type warnings (see @@ -1010,8 +1015,17 @@ async def ingest_datasource_idempotent( datasource_name=datasource.name, storage=storage, ) for err in embedding_errors: + # DEV-1416: per-memory warnings carry ``memory:`` in the + # string (either from EmbeddingService's own per-row failure + # template or from the defensive try/except inside + # ``_refresh_datasource_embeddings``). Surface the id as + # ``model_name`` so a startup log inspection can distinguish + # memory failures from model / datasource-doc failures at a + # glance. + match = _MEMORY_REF_RE.search(err) + model_name = match.group(0) if match is not None else "" errors.append(IngestionError( - model_name="", + model_name=model_name, data_source=datasource.name, error=f"embedding refresh: {err}", )) @@ -1232,4 +1246,24 @@ async def _refresh_datasource_embeddings( )) except Exception as exc: # noqa: BLE001 — defensive warnings.append(f"{datasource_name} (datasource doc): {exc}") + + # DEV-1416: refresh embeddings for every memory whose canonical + # entities are rooted at this datasource. A memory linked to entities + # in datasources A and B is touched in both passes; hash-skip inside + # ``_apply_pending`` makes the second call a no-op. + try: + memories = await storage.list_memories() + except Exception as exc: # noqa: BLE001 — defensive + return warnings + [f"{datasource_name} (memories): {exc}"] + for memory in memories: + if not any( + canonical_id_rooted_at(e, datasource_name) + for e in memory.entities + ): + continue + try: + memory_warnings = await service.refresh_memory(memory) + except Exception as exc: # noqa: BLE001 — defensive per-memory + memory_warnings = [f"memory:{memory.id}: {exc}"] + warnings.extend(memory_warnings) return warnings diff --git a/tests/test_idempotent_ingestion.py b/tests/test_idempotent_ingestion.py index ca73bc50..3d76a286 100644 --- a/tests/test_idempotent_ingestion.py +++ b/tests/test_idempotent_ingestion.py @@ -16,7 +16,7 @@ import sqlite3 import tempfile from pathlib import Path -from typing import Iterable +from typing import Iterable, List, Optional import pytest @@ -28,7 +28,11 @@ SlayerModel, ) from slayer.core.query import SlayerQuery -from slayer.engine.ingestion import ingest_datasource_idempotent +from slayer.embeddings import client as embedding_client +from slayer.engine.ingestion import ( + _refresh_datasource_embeddings, + ingest_datasource_idempotent, +) from slayer.engine.schema_drift import ( IdempotentIngestResult, ModelAddition, @@ -374,6 +378,228 @@ async def flaky_save(model): assert any(e.model_name == "a_new" for e in result.errors) +class TestMemoryEmbeddingRefresh: + """DEV-1416: `_refresh_datasource_embeddings` must walk memories whose + entities are rooted at the datasource, in addition to models and the + datasource doc. Failures attribute to the offending memory by id.""" + + @staticmethod + def _enable_channel(monkeypatch: pytest.MonkeyPatch) -> List[List[str]]: + """Override the conftest autouse fixture for this test. Returns a + list that captures every ``embed_batch`` call's text payload.""" + monkeypatch.setattr(embedding_client, "is_available", lambda: True) + calls: List[List[str]] = [] + + async def fake_embed_batch( + texts: List[str], *, model: Optional[str] = None, + ) -> List[Optional[List[float]]]: + calls.append(list(texts)) + return [[0.1, 0.2, 0.3] for _ in texts] + + monkeypatch.setattr( + "slayer.embeddings.service.embed_batch", fake_embed_batch, + ) + return calls + + async def test_refreshes_only_memories_rooted_at_datasource( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + storage, ds, _ = await _setup(workspace) + # Second datasource for negative-case memories. We don't actually + # ingest it — it just needs to exist as a config for the entity + # strings to be distinguishable from `ds`. + other = DatasourceConfig(name="other_ds", type="sqlite", database=":memory:") + await storage.save_datasource(other) + + m_in_ds = await storage.save_memory( + learning="rooted at ds", + entities=[f"{ds.name}.orders.amount"], + ) + m_in_other = await storage.save_memory( + learning="rooted at other_ds only", + entities=["other_ds.customers.name"], + ) + m_spanning = await storage.save_memory( + learning="spans both", + entities=[f"{ds.name}.customers.region", "other_ds.regions.country"], + ) + + self._enable_channel(monkeypatch) + + warnings = await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + assert warnings == [] + + rows = await storage.list_embeddings( + embedding_model_name=embedding_client.current_model(), + ) + memory_ids = { + r.canonical_id for r in rows if r.entity_kind == "memory" + } + assert f"memory:{m_in_ds.id}" in memory_ids + assert f"memory:{m_spanning.id}" in memory_ids + assert f"memory:{m_in_other.id}" not in memory_ids + + async def test_second_pass_is_noop_when_content_unchanged( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + storage, ds, _ = await _setup(workspace) + saved = await storage.save_memory( + learning="stable", + entities=[f"{ds.name}.orders.amount"], + ) + calls = self._enable_channel(monkeypatch) + + await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + first_pass_call_count = len(calls) + first_pass_total_texts = sum(len(c) for c in calls) + + # No content changed → no new embed calls. + await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + assert len(calls) == first_pass_call_count + assert sum(len(c) for c in calls) == first_pass_total_texts + + # And the memory row is present after both passes. + rows = await storage.list_embeddings( + embedding_model_name=embedding_client.current_model(), + ) + assert any( + r.canonical_id == f"memory:{saved.id}" for r in rows + ) + + async def test_per_memory_failure_surfaces_with_memory_id_in_ingest_result( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + storage, ds, _ = await _setup(workspace) + saved = await storage.save_memory( + learning="will fail to embed", + entities=[f"{ds.name}.orders.amount"], + ) + self._enable_channel(monkeypatch) + + # Force `refresh_memory` to return the per-row failure warning + # shape that `_apply_pending` would normally emit when + # `embed_batch` returns None. + async def failing_refresh(self, memory): # NOSONAR(S7503) — replaces async refresh_memory + return [ + f"embedding refresh failed for memory:{memory.id}; " + f"skipped (search will still find this entity via tantivy + BM25)." + ] + + monkeypatch.setattr( + "slayer.embeddings.service.EmbeddingService.refresh_memory", + failing_refresh, + ) + + result = await ingest_datasource_idempotent( + datasource=ds, storage=storage, + ) + memory_errors = [ + e for e in result.errors + if e.model_name == f"memory:{saved.id}" + ] + assert memory_errors, ( + f"expected an IngestionError with " + f"model_name='memory:{saved.id}'; got {result.errors!r}" + ) + assert memory_errors[0].data_source == ds.name + assert memory_errors[0].error.startswith("embedding refresh:") + assert f"memory:{saved.id}" in memory_errors[0].error + + async def test_refresh_memory_raise_is_caught_and_tagged( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """The defensive try/except in the memory loop must convert a + raise from ``refresh_memory`` into a tagged warning string — + never propagate.""" + storage, ds, _ = await _setup(workspace) + saved = await storage.save_memory( + learning="raises on refresh", + entities=[f"{ds.name}.orders.amount"], + ) + self._enable_channel(monkeypatch) + + async def raising_refresh(self, memory): # NOSONAR(S7503) — replaces async refresh_memory + raise RuntimeError("boom") + + monkeypatch.setattr( + "slayer.embeddings.service.EmbeddingService.refresh_memory", + raising_refresh, + ) + + warnings = await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + assert any( + f"memory:{saved.id}" in w and "boom" in w for w in warnings + ), warnings + + async def test_extra_not_installed_silent_noop_on_memory_path( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """When ``is_available()`` returns False (extra not installed or + no API key), the memory loop must be silent — no warnings, no + embedding rows.""" + storage, ds, _ = await _setup(workspace) + await storage.save_memory( + learning="unused", entities=[f"{ds.name}.orders.amount"], + ) + # The autouse `_disable_embedding_channel_by_default` fixture + # already forces is_available=False; assert explicitly for + # clarity. + assert embedding_client.is_available() is False + + called: List[List[str]] = [] + + async def should_not_be_called( + texts: List[str], *, model: Optional[str] = None, + ) -> List[Optional[List[float]]]: + called.append(list(texts)) + return [None for _ in texts] + + monkeypatch.setattr( + "slayer.embeddings.service.embed_batch", should_not_be_called, + ) + + warnings = await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + assert warnings == [] + assert called == [] + rows = await storage.list_embeddings( + embedding_model_name=embedding_client.current_model(), + ) + assert [r for r in rows if r.entity_kind == "memory"] == [] + + async def test_list_memories_failure_warns_and_continues( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """A raise inside ``storage.list_memories`` must be captured as a + single warning string — the datasource pass must still complete + without propagating.""" + storage, ds, _ = await _setup(workspace) + self._enable_channel(monkeypatch) + + async def boom(self, *, entities=None): # NOSONAR(S7503) — async list_memories signature + raise RuntimeError("memories table missing") + + monkeypatch.setattr( + "slayer.storage.base.StorageBackend.list_memories", boom, + ) + + warnings = await _refresh_datasource_embeddings( + datasource_name=ds.name, storage=storage, + ) + assert any( + "memories table missing" in w for w in warnings + ), warnings + + class TestExcludeTables: async def test_excluded_tables_not_touched(self, workspace: Path) -> None: storage, ds, db_path = await _setup(workspace) diff --git a/tests/test_startup_ingest.py b/tests/test_startup_ingest.py index 9c71d48f..00015f46 100644 --- a/tests/test_startup_ingest.py +++ b/tests/test_startup_ingest.py @@ -17,8 +17,11 @@ import argparse import io +import sqlite3 import sys +import tempfile import types +from pathlib import Path from typing import List, Optional from unittest.mock import AsyncMock, MagicMock @@ -26,6 +29,7 @@ from slayer import cli from slayer.core.models import DatasourceConfig +from slayer.embeddings import client as embedding_client from slayer.engine import ingestion as ingestion_module from slayer.engine.ingestion import ( StartupIngestSummary, @@ -37,6 +41,7 @@ ModelAddition, WholeModelDelete, ) +from slayer.storage.yaml_storage import YAMLStorage # ───────────────────────────────────────────────────────────────────────────── # NOSONAR(S125) — section separator, not commented-out code @@ -806,3 +811,74 @@ def raising_create_mcp_server(*a, **kw): cli._run_mcp(_mcp_args(ingest_on_startup=True)) assert not any(c[0] == "mcp_run" for c in capture) + + +# ───────────────────────────────────────────────────────────────────────────── +# DEV-1416 — memory embeddings refresh on --ingest-on-startup +# ───────────────────────────────────────────────────────────────────────────── + + +class TestMemoryEmbeddingsOnStartup: + """End-to-end smoke test: a stale `embeddings.db` (zero memory rows) + gets populated by a single `--ingest-on-startup` pass through the + real `ingest_all_datasources_idempotent` orchestrator.""" + + async def test_orchestrator_refreshes_memory_embeddings_for_each_datasource( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + db_path = str(tmp_path / "live.db") + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE orders ( + id INTEGER PRIMARY KEY, + amount REAL NOT NULL + ); + INSERT INTO orders VALUES (1, 100.0); + """ + ) + conn.commit() + conn.close() + + storage = YAMLStorage(base_dir=str(tmp_path / "storage")) + ds = DatasourceConfig(name="ds", type="sqlite", database=db_path) + await storage.save_datasource(ds) + + saved = await storage.save_memory( + learning="orders.amount is in USD cents", + entities=[f"{ds.name}.orders.amount"], + ) + + # Enable the embedding channel (overriding the conftest + # autouse fixture which forces is_available=False) and stub + # embed_batch. + monkeypatch.setattr( + embedding_client, "is_available", lambda: True, + ) + + async def fake_embed_batch( + texts: List[str], *, model: Optional[str] = None, + ) -> List[Optional[List[float]]]: + return [[0.1, 0.2, 0.3] for _ in texts] + + monkeypatch.setattr( + "slayer.embeddings.service.embed_batch", fake_embed_batch, + ) + + stream = io.StringIO() + summary = await ingest_all_datasources_idempotent( + storage=storage, stream=stream, + ) + assert summary.succeeded == ["ds"] + assert summary.failures == [] + + rows = await storage.list_embeddings( + embedding_model_name=embedding_client.current_model(), + ) + assert any( + r.canonical_id == f"memory:{saved.id}" + and r.entity_kind == "memory" + for r in rows + ), f"memory embedding not written; rows={[r.canonical_id for r in rows]}" From 961453c15d890de97ff991c61a89ede245fe9ba1 Mon Sep 17 00:00:00 2001 From: Egor Kraev Date: Thu, 14 May 2026 18:04:08 +0200 Subject: [PATCH 2/3] DEV-1416: address codex + sonar review (validators, structured returns, NOSONAR) * Embedding persist-failure warning now lists the failing rows' canonical ids so per-memory persist failures attribute to memory: via the ingest IngestionError shape. * Forbid ":" in all name validators (Column, ModelMeasure, SlayerModel name + data_source, SlayerQuery, DatasourceConfig). The aggregation separator is reserved across the canonical-id namespace; this enforces the invariant the memory resolver already assumes. * Factor shared name-validation rules into _SubstringRule constants (_NO_DUNDER / _NO_DOT / _NO_COLON / _NO_FWD_SLASH / _NO_BACK_SLASH / _NO_NUL) plus _require_non_empty_trimmed, so the four name validators share one source of truth per character. * Replace the regex-based memory: extraction in ingest_datasource_idempotent with structured (model_name, error_text) tuples returned from three helpers: _refresh_models_for_datasource, _refresh_datasource_doc, _refresh_memories_for_datasource. Drops the fragile string sniffing and brings _refresh_datasource_embeddings' cognitive complexity back under the Sonar gate. * Add NOSONAR(S7503) on three async test stubs that must be `async def` to match the patched signatures. Co-Authored-By: Claude Opus 4.7 (1M context) --- slayer/core/models.py | 186 ++++++++++++++++------------- slayer/core/query.py | 17 +-- slayer/embeddings/service.py | 9 +- slayer/engine/ingestion.py | 145 +++++++++++++++------- tests/test_idempotent_ingestion.py | 56 +++++++-- tests/test_models.py | 41 +++++++ tests/test_startup_ingest.py | 2 +- 7 files changed, 318 insertions(+), 138 deletions(-) diff --git a/slayer/core/models.py b/slayer/core/models.py index 266fb08b..8e85b92b 100644 --- a/slayer/core/models.py +++ b/slayer/core/models.py @@ -27,38 +27,91 @@ _STRING_LITERAL_RE = re.compile(r"'[^']*'") -def _validate_model_name(name: str, context: str) -> str: - """Reject model/query names containing ``__`` or ``.``. +class _SubstringRule: + """Single source of truth for a forbidden substring inside a name. - Model and query names become SQL table aliases where ``__`` encodes - join paths, so both separators are reserved. + Each rule pairs the forbidden character / digraph with the rationale. + Every validator that rejects the same substring uses the same rule + so the wording (and the rejection rationale) lives in one place. """ - if "__" in name: + + __slots__ = ("substring", "reason") + + def __init__(self, *, substring: str, reason: str) -> None: + self.substring = substring + self.reason = reason + + def check(self, name: str, context: str) -> None: + if self.substring in name: + raise ValueError( + f"{context} '{name}' must not contain " + f"{self.substring!r}; {self.reason}" + ) + + +_NO_DUNDER = _SubstringRule( + substring="__", + reason="double underscores are reserved for join path aliases in " + "generated SQL.", +) +_NO_DOT = _SubstringRule( + substring=".", + reason="dots are the canonical-id namespace delimiter " + "(``..``) and the dotted-path reference " + "syntax in queries.", +) +_NO_COLON = _SubstringRule( + substring=":", + reason="colons are reserved as the aggregation separator " + "(``revenue:sum``) and the ``memory:`` canonical-id " + "prefix.", +) +_NO_FWD_SLASH = _SubstringRule( + substring="/", + reason="path separators break the storage layout.", +) +_NO_BACK_SLASH = _SubstringRule( + substring="\\", + reason="path separators break the storage layout.", +) +_NO_NUL = _SubstringRule( + substring="\x00", + reason="NUL bytes are filesystem-unsafe.", +) + + +def _require_non_empty_trimmed(v: str, context: str) -> None: + """Reject empty / whitespace-only inputs and inputs with + leading or trailing whitespace.""" + if not v or not v.strip(): raise ValueError( - f"{context} name '{name}' must not contain '__'. " - f"Double underscores are reserved for join path aliases in generated SQL." + f"{context} must be a non-empty string; got {v!r}." ) - if "." in name: + if v.strip() != v: raise ValueError( - f"{context} name '{name}' must not contain '.'. " - f"Dots are path syntax for referencing joined models in queries." + f"{context} must not have leading/trailing whitespace; " + f"got {v!r}." ) + + +def _validate_model_name(name: str, context: str) -> str: + """Reject model/query names containing ``__``, ``.``, or ``:``.""" + label = f"{context} name" + _NO_DUNDER.check(name, label) + _NO_DOT.check(name, label) + _NO_COLON.check(name, label) return name def _validate_column_name(name: str, context: str) -> str: - """Reject dimension/measure names containing ``.``. + """Reject dimension/measure names containing ``.`` or ``:``. - Dots are path syntax in queries (``customers.name``), not part of names. - ``__`` is allowed — it encodes flattened join paths in virtual models - created by ``_query_as_model`` (e.g., ``stores__name``). + ``__`` is allowed — it encodes flattened join paths in virtual + models created by ``_query_as_model`` (e.g., ``stores__name``). """ - if "." in name: - raise ValueError( - f"{context} name '{name}' must not contain '.'. " - f"Dots are path syntax for referencing joined models in queries, " - f"not part of dimension or measure names." - ) + label = f"{context} name" + _NO_DOT.check(name, label) + _NO_COLON.check(name, label) return name @@ -360,37 +413,26 @@ def _validate_name(cls, v: str) -> str: @field_validator("data_source") @classmethod def _validate_data_source_format(cls, v: str) -> str: - # Format-only checks (run on every input). Emptiness is enforced in - # ``_require_data_source_unless_query_backed`` below so query-backed - # models can be constructed before their cache populator fills in - # ``data_source`` from the resolved virtual model. Whitespace-strip - # mismatch and NUL are rejected here so the rule mirrors the - # ``DatasourceConfig.name`` validator and the storage-layer - # ``_validate_path_component``. - if v and v.strip() != v: + # Format-only checks (run on every input). Emptiness is enforced + # in ``_require_data_source_unless_query_backed`` below so + # query-backed models can be constructed before their cache + # populator fills in ``data_source`` from the resolved virtual + # model. Whitespace-strip mismatch and substring rules mirror + # ``DatasourceConfig.name`` so the two canonical-id ingress + # points share validation logic via the shared ``_NO_*`` rules. + if not v: + return v + if v.strip() != v: raise ValueError( f"Model 'data_source' must not have leading/trailing " f"whitespace; got {v!r}." ) - if "\x00" in v: - raise ValueError( - f"Model 'data_source' must not contain NUL bytes; got {v!r}." - ) - if "/" in v or "\\" in v: - raise ValueError( - f"Model 'data_source' must not contain path separators " - f"('/' or '\\'); got {v!r}." - ) - # DEV-1405: dot is the canonical-id namespace delimiter - # (``..``). Allowing dots in a data_source name - # would let ``delete_datasource('prod')`` cascade-nuke embeddings - # belonging to a sibling datasource named ``prod.legacy``. - if "." in v: - raise ValueError( - f"Model 'data_source' must not contain '.'; " - f"dots are the canonical-id namespace delimiter " - f"(``..``). Got {v!r}." - ) + label = "Model 'data_source'" + _NO_NUL.check(v, label) + _NO_FWD_SLASH.check(v, label) + _NO_BACK_SLASH.check(v, label) + _NO_DOT.check(v, label) + _NO_COLON.check(v, label) return v @model_validator(mode="after") @@ -650,39 +692,23 @@ def _apply_schema_migrations_and_aliases(cls, data: Any) -> Any: @field_validator("name") @classmethod def _validate_name(cls, v: str) -> str: - # DEV-1405: datasource names are the leading segment of every - # canonical-id (````, ``.``, ``..``) - # and become a path component in YAML storage - # (``datasources/.yaml``, ``models//...``). They must - # therefore reject: - # - path separators / NUL (filesystem safety, mirrors - # ``_validate_path_component`` at the storage layer) - # - ``.`` (canonical-id namespace delimiter: ``prod`` vs ``prod.db`` - # would otherwise collide in cascade-delete prefix matches) - # - whitespace-only / empty (storage primary key) - # ``__`` is intentionally NOT rejected: datasource names never become - # SQL table aliases, so the join-path-alias reservation that applies - # to model and query names doesn't apply here. - if not v or not v.strip(): - raise ValueError( - f"Datasource 'name' must be a non-empty string; got {v!r}." - ) - if v.strip() != v: - raise ValueError( - f"Datasource 'name' must not have leading/trailing whitespace; " - f"got {v!r}." - ) - for ch in ("/", "\\", "\x00"): - if ch in v: - raise ValueError( - f"Datasource 'name' must not contain {ch!r}; got {v!r}." - ) - if "." in v: - raise ValueError( - f"Datasource 'name' must not contain '.'; dots are the " - f"canonical-id namespace delimiter " - f"(``..``). Got {v!r}." - ) + # Datasource names are the leading segment of every canonical-id + # (````, ``.``, ``..``) and a + # path component in YAML storage (``datasources/.yaml``, + # ``models//...``). The substring rules are shared with + # ``SlayerModel.data_source`` via the module-level ``_NO_*`` + # rules so the rationale lives in one place. + # + # ``__`` is intentionally NOT rejected: datasource names never + # become SQL table aliases, so the join-path-alias reservation + # that applies to model and query names doesn't apply here. + label = "Datasource 'name'" + _require_non_empty_trimmed(v, label) + _NO_NUL.check(v, label) + _NO_FWD_SLASH.check(v, label) + _NO_BACK_SLASH.check(v, label) + _NO_DOT.check(v, label) + _NO_COLON.check(v, label) return v def get_connection_string(self) -> str: diff --git a/slayer/core/query.py b/slayer/core/query.py index 55cc2bc2..9b988100 100644 --- a/slayer/core/query.py +++ b/slayer/core/query.py @@ -319,13 +319,16 @@ def _apply_schema_migrations(cls, data: Any) -> Any: @field_validator("name") @classmethod - def _validate_no_dunder_in_name(cls, v: Optional[str]) -> Optional[str]: - if v is not None and "__" in v: - raise ValueError( - f"Query name '{v}' must not contain '__'. " - f"Double underscores are reserved for join path aliases in generated SQL." - ) - return v + def _validate_query_name(cls, v: Optional[str]) -> Optional[str]: + # Share the same rejection rules as SlayerModel.name — + # SlayerQuery names occupy the same naming space when persisted + # as query-backed models. Rejects ``__`` (join-path alias + # separator), ``.`` (dotted reference syntax), and ``:`` (DSL + # aggregation separator). + if v is None: + return v + from slayer.core.models import _validate_model_name + return _validate_model_name(v, "Query") dimensions: Annotated[Optional[List[ColumnRef]], BeforeValidator(_coerce_dimensions)] = None time_dimensions: Optional[List[TimeDimension]] = None main_time_dimension: Optional[str] = None # Explicit time dimension for transforms (overrides auto-detection) diff --git a/slayer/embeddings/service.py b/slayer/embeddings/service.py index 1d45f8a9..d1326a71 100644 --- a/slayer/embeddings/service.py +++ b/slayer/embeddings/service.py @@ -252,9 +252,14 @@ async def _apply_pending( try: await self._storage.save_embeddings(rows) except Exception as exc: # NOSONAR(S112) — best-effort persistence + # Include canonical ids so a caller doing failure + # attribution by entity (e.g. ``ingest_datasource_idempotent`` + # tagging memory failures as ``model_name="memory:"``) + # can see which rows did not land. + canonical_ids = ", ".join(r.canonical_id for r in rows) warnings.append( - f"embedding batch persist failed " - f"({len(rows)} rows): {exc}" + f"embedding batch persist failed for " + f"{len(rows)} row(s) [{canonical_ids}]: {exc}" ) _log.debug( "EmbeddingService: refreshed=%d stale=%d total=%d warnings=%d", diff --git a/slayer/engine/ingestion.py b/slayer/engine/ingestion.py index 48731544..fee35df6 100644 --- a/slayer/engine/ingestion.py +++ b/slayer/engine/ingestion.py @@ -9,10 +9,9 @@ import asyncio import logging -import re import sys from collections import defaultdict, deque -from typing import Any, Dict, List, Optional, Set, TextIO, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, TextIO, Tuple import sqlalchemy as sa from pydantic import BaseModel, Field @@ -25,8 +24,12 @@ from slayer.memories.resolver import canonical_id_rooted_at from slayer.storage.base import StorageBackend +if TYPE_CHECKING: + # The runtime import lives inside ``_refresh_datasource_embeddings`` + # so the embeddings module stays off the cold-start import graph + # when the optional extra isn't installed. + from slayer.embeddings.service import EmbeddingService -_MEMORY_REF_RE = re.compile(r"\bmemory:\d+\b") logger = logging.getLogger(__name__) @@ -1014,16 +1017,13 @@ async def ingest_datasource_idempotent( embedding_errors = await _refresh_datasource_embeddings( datasource_name=datasource.name, storage=storage, ) - for err in embedding_errors: - # DEV-1416: per-memory warnings carry ``memory:`` in the - # string (either from EmbeddingService's own per-row failure - # template or from the defensive try/except inside - # ``_refresh_datasource_embeddings``). Surface the id as - # ``model_name`` so a startup log inspection can distinguish - # memory failures from model / datasource-doc failures at a - # glance. - match = _MEMORY_REF_RE.search(err) - model_name = match.group(0) if match is not None else "" + for model_name, err in embedding_errors: + # DEV-1416: each helper inside ``_refresh_datasource_embeddings`` + # attaches the canonical entity tag (``.``, + # ``memory:``, or ``""`` for the datasource doc) so a + # startup log inspection can distinguish memory failures from + # model / datasource-doc failures at a glance — no string + # sniffing of free-form warning text. errors.append(IngestionError( model_name=model_name, data_source=datasource.name, @@ -1208,62 +1208,125 @@ async def ingest_all_datasources_idempotent( return summary -async def _refresh_datasource_embeddings( - *, datasource_name: str, storage: StorageBackend, -) -> List[str]: - """Walk every model in the datasource + the datasource doc itself - through ``EmbeddingService.refresh_*``. Best-effort: returns warning - strings; never raises.""" - # Local import to avoid pulling embeddings into ingestion's import - # graph on a cold start without the optional extra installed. - from slayer.embeddings.service import EmbeddingService - - service = EmbeddingService(storage=storage) - warnings: List[str] = [] - models_in_ds: List = [] +async def _refresh_models_for_datasource( + *, + datasource_name: str, + storage: StorageBackend, + service: "EmbeddingService", +) -> Tuple[List[Tuple[str, str]], List[SlayerModel]]: + """Refresh embeddings for every visible model in the datasource. + + Returns ``(warnings, models_in_ds)``. Each warning is tagged with + the model's ``.`` so the orchestrator can route it to the + right ``IngestionError.model_name``. ``models_in_ds`` is forwarded + to the datasource-doc refresh that follows. + """ + warnings: List[Tuple[str, str]] = [] + models_in_ds: List[SlayerModel] = [] try: identities = await storage._list_all_model_identities() except Exception as exc: # noqa: BLE001 — defensive - return [f"{datasource_name}: {exc}"] + return [("", f"{datasource_name}: {exc}")], models_in_ds for ds, name in identities: if ds != datasource_name: continue + tag = f"{ds}.{name}" try: m = await storage.get_model(name, data_source=ds) except Exception as exc: # noqa: BLE001 — defensive per-model - warnings.append(f"{ds}.{name}: {exc}") + warnings.append((tag, str(exc))) continue if m is None: continue models_in_ds.append(m) try: - warnings.extend(await service.refresh_model_subtree(m)) + subtree_warnings = await service.refresh_model_subtree(m) except Exception as exc: # noqa: BLE001 — defensive per-model - warnings.append(f"{ds}.{name}: {exc}") + subtree_warnings = [str(exc)] + for w in subtree_warnings: + warnings.append((tag, w)) + return warnings, models_in_ds + + +async def _refresh_datasource_doc( + *, + datasource_name: str, + models: List[SlayerModel], + service: "EmbeddingService", +) -> List[Tuple[str, str]]: + """Refresh the datasource doc embedding. Warnings are tagged with + an empty ``model_name`` since the doc has no specific entity name.""" try: - warnings.extend(await service.refresh_datasource( - name=datasource_name, models=models_in_ds, - )) + doc_warnings = await service.refresh_datasource( + name=datasource_name, models=models, + ) except Exception as exc: # noqa: BLE001 — defensive - warnings.append(f"{datasource_name} (datasource doc): {exc}") + return [("", f"{datasource_name} (datasource doc): {exc}")] + return [("", w) for w in doc_warnings] + - # DEV-1416: refresh embeddings for every memory whose canonical - # entities are rooted at this datasource. A memory linked to entities - # in datasources A and B is touched in both passes; hash-skip inside - # ``_apply_pending`` makes the second call a no-op. +async def _refresh_memories_for_datasource( + *, + datasource_name: str, + storage: StorageBackend, + service: "EmbeddingService", +) -> List[Tuple[str, str]]: + """Refresh embeddings for every memory whose canonical entities are + rooted at this datasource. Each warning is tagged with + ``memory:`` so a startup log inspection can distinguish memory + failures from datasource-doc / model failures at a glance. + + A memory linked to entities in datasources A and B is touched in + both passes; hash-skip inside ``_apply_pending`` makes the second + call a no-op. + """ try: memories = await storage.list_memories() except Exception as exc: # noqa: BLE001 — defensive - return warnings + [f"{datasource_name} (memories): {exc}"] + return [("", f"{datasource_name} (memories): {exc}")] + warnings: List[Tuple[str, str]] = [] for memory in memories: if not any( canonical_id_rooted_at(e, datasource_name) for e in memory.entities ): continue + tag = f"memory:{memory.id}" try: memory_warnings = await service.refresh_memory(memory) except Exception as exc: # noqa: BLE001 — defensive per-memory - memory_warnings = [f"memory:{memory.id}: {exc}"] - warnings.extend(memory_warnings) + memory_warnings = [str(exc)] + for w in memory_warnings: + warnings.append((tag, w)) return warnings + + +async def _refresh_datasource_embeddings( + *, datasource_name: str, storage: StorageBackend, +) -> List[Tuple[str, str]]: + """Refresh persisted embeddings for everything reachable from this + datasource: every visible model + its visible children, the + datasource doc itself, and every memory whose canonical entities + are rooted at the datasource. + + Best-effort: returns ``(model_name, error_text)`` tuples; never + raises. ``model_name`` is the canonical entity tag + (``.``, ``memory:``, or ``""`` for the datasource + doc) used by ``ingest_datasource_idempotent`` to route per-entity + failures to the matching ``IngestionError``. + """ + # Local import to avoid pulling embeddings into ingestion's import + # graph on a cold start without the optional extra installed. + from slayer.embeddings.service import EmbeddingService + + service = EmbeddingService(storage=storage) + model_warnings, models_in_ds = await _refresh_models_for_datasource( + datasource_name=datasource_name, storage=storage, service=service, + ) + doc_warnings = await _refresh_datasource_doc( + datasource_name=datasource_name, models=models_in_ds, service=service, + ) + memory_warnings = await _refresh_memories_for_datasource( + datasource_name=datasource_name, storage=storage, service=service, + ) + return model_warnings + doc_warnings + memory_warnings diff --git a/tests/test_idempotent_ingestion.py b/tests/test_idempotent_ingestion.py index 3d76a286..7ffd2e9e 100644 --- a/tests/test_idempotent_ingestion.py +++ b/tests/test_idempotent_ingestion.py @@ -390,7 +390,7 @@ def _enable_channel(monkeypatch: pytest.MonkeyPatch) -> List[List[str]]: monkeypatch.setattr(embedding_client, "is_available", lambda: True) calls: List[List[str]] = [] - async def fake_embed_batch( + async def fake_embed_batch( # NOSONAR(S7503) — must be `async def` to match the patched embed_batch signature texts: List[str], *, model: Optional[str] = None, ) -> List[Optional[List[float]]]: calls.append(list(texts)) @@ -515,8 +515,8 @@ async def test_refresh_memory_raise_is_caught_and_tagged( self, workspace: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: """The defensive try/except in the memory loop must convert a - raise from ``refresh_memory`` into a tagged warning string — - never propagate.""" + raise from ``refresh_memory`` into a tagged ``(model_name, + error_text)`` tuple — never propagate.""" storage, ds, _ = await _setup(workspace) saved = await storage.save_memory( learning="raises on refresh", @@ -536,7 +536,8 @@ async def raising_refresh(self, memory): # NOSONAR(S7503) — replaces async re datasource_name=ds.name, storage=storage, ) assert any( - f"memory:{saved.id}" in w and "boom" in w for w in warnings + model_name == f"memory:{saved.id}" and "boom" in err + for model_name, err in warnings ), warnings async def test_extra_not_installed_silent_noop_on_memory_path( @@ -556,7 +557,7 @@ async def test_extra_not_installed_silent_noop_on_memory_path( called: List[List[str]] = [] - async def should_not_be_called( + async def should_not_be_called( # NOSONAR(S7503) — must be `async def` to match the patched embed_batch signature texts: List[str], *, model: Optional[str] = None, ) -> List[Optional[List[float]]]: called.append(list(texts)) @@ -576,11 +577,52 @@ async def should_not_be_called( ) assert [r for r in rows if r.entity_kind == "memory"] == [] + async def test_save_embeddings_failure_during_memory_persist_attributes_to_memory_id( + self, workspace: Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Codex Finding 1: when ``save_embeddings`` raises while + persisting a memory's embedding, the resulting warning carries + the memory's canonical id so ``ingest_datasource_idempotent`` + can route the failure to ``IngestionError(model_name="memory:")`` + rather than the unattributed ``model_name=""`` bucket.""" + storage, ds, _ = await _setup(workspace) + saved = await storage.save_memory( + learning="row that will fail to persist", + entities=[f"{ds.name}.orders.amount"], + ) + self._enable_channel(monkeypatch) + + # Patch the storage backend so save_embeddings raises only for + # the memory canonical id; model + datasource-doc rows persist + # normally so we can pin that this is the only failure. + original_save = storage.save_embeddings + + async def selective_save(rows): + if any(r.canonical_id == f"memory:{saved.id}" for r in rows): + raise RuntimeError("disk full") + await original_save(rows) + + monkeypatch.setattr(storage, "save_embeddings", selective_save) + + result = await ingest_datasource_idempotent( + datasource=ds, storage=storage, + ) + memory_errors = [ + e for e in result.errors + if e.model_name == f"memory:{saved.id}" + ] + assert memory_errors, ( + f"expected an IngestionError with " + f"model_name='memory:{saved.id}'; got {result.errors!r}" + ) + assert "disk full" in memory_errors[0].error + assert f"memory:{saved.id}" in memory_errors[0].error + async def test_list_memories_failure_warns_and_continues( self, workspace: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: """A raise inside ``storage.list_memories`` must be captured as a - single warning string — the datasource pass must still complete + single warning tuple — the datasource pass must still complete without propagating.""" storage, ds, _ = await _setup(workspace) self._enable_channel(monkeypatch) @@ -596,7 +638,7 @@ async def boom(self, *, entities=None): # NOSONAR(S7503) — async list_memorie datasource_name=ds.name, storage=storage, ) assert any( - "memories table missing" in w for w in warnings + "memories table missing" in err for _, err in warnings ), warnings diff --git a/tests/test_models.py b/tests/test_models.py index 5b68588c..93dc7466 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -305,6 +305,47 @@ def test_datasource_name_rejects_empty(self) -> None: with pytest.raises(ValueError, match="non-empty string"): DatasourceConfig(name="", type="postgres") + def test_datasource_name_rejects_colon(self) -> None: + """Colon is reserved as the DSL aggregation separator + (``revenue:sum``) and the ``memory:`` canonical-id prefix. + Allowing it in a datasource name would let ``memory:42`` collide + with the memory canonical-id namespace.""" + from slayer.core.models import DatasourceConfig + with pytest.raises(ValueError, match="must not contain ':'"): + DatasourceConfig(name="memory:42", type="sqlite") + + def test_model_data_source_rejects_colon(self) -> None: + """A model's ``data_source`` shares the same canonical-id + namespace constraints as ``DatasourceConfig.name``.""" + with pytest.raises(ValueError, match="must not contain ':'"): + SlayerModel(name="orders", sql_table="t", data_source="memory:42") + + def test_model_name_rejects_colon(self) -> None: + """Colon is reserved as the DSL aggregation separator + (``revenue:sum``) — model names sharing the shape would collide + with formula parsing.""" + with pytest.raises(ValueError, match="must not contain ':'"): + SlayerModel(name="rev:sum", sql_table="t", data_source="ds") + + def test_query_name_rejects_colon(self) -> None: + """SlayerQuery names share the same naming space as SlayerModel + names (a query can be persisted as a query-backed model), so the + same rejection rules apply.""" + with pytest.raises(ValueError, match="must not contain ':'"): + SlayerQuery(name="rev:sum", source_model="orders") + + def test_query_name_rejects_dot(self) -> None: + """Dotted SlayerQuery names would collide with the dotted-path + reference syntax used in queries.""" + with pytest.raises(ValueError, match=r"must not contain '\.'"): + SlayerQuery(name="prod.summary", source_model="orders") + + def test_column_name_rejects_colon(self) -> None: + """Column names containing ``:`` would collide with the + aggregation colon syntax (``revenue:sum``).""" + with pytest.raises(ValueError, match="must not contain ':'"): + Column(name="rev:sum") + class TestWithinListDuplicateNames: """Duplicate names within ``columns`` or within ``measures`` are rejected. diff --git a/tests/test_startup_ingest.py b/tests/test_startup_ingest.py index 00015f46..762887ac 100644 --- a/tests/test_startup_ingest.py +++ b/tests/test_startup_ingest.py @@ -858,7 +858,7 @@ async def test_orchestrator_refreshes_memory_embeddings_for_each_datasource( embedding_client, "is_available", lambda: True, ) - async def fake_embed_batch( + async def fake_embed_batch( # NOSONAR(S7503) — must be `async def` to match the patched embed_batch signature texts: List[str], *, model: Optional[str] = None, ) -> List[Optional[List[float]]]: return [[0.1, 0.2, 0.3] for _ in texts] From 746438eb63f6e060401f49522629faa91610f56e Mon Sep 17 00:00:00 2001 From: Egor Kraev Date: Thu, 14 May 2026 18:14:47 +0200 Subject: [PATCH 3/3] =?UTF-8?q?DEV-1416:=20address=20coderabbit=20nitpick?= =?UTF-8?q?=20=E2=80=94=20keyword=20args=20on=20new=20validators?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the new _NO_*.check() and _require_non_empty_trimmed() call sites in slayer/core/models.py to keyword form, per the global rule about multi-parameter helper calls. Mechanical change; existing validator tests pin behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- slayer/core/models.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/slayer/core/models.py b/slayer/core/models.py index 8e85b92b..dadaf80f 100644 --- a/slayer/core/models.py +++ b/slayer/core/models.py @@ -97,9 +97,9 @@ def _require_non_empty_trimmed(v: str, context: str) -> None: def _validate_model_name(name: str, context: str) -> str: """Reject model/query names containing ``__``, ``.``, or ``:``.""" label = f"{context} name" - _NO_DUNDER.check(name, label) - _NO_DOT.check(name, label) - _NO_COLON.check(name, label) + _NO_DUNDER.check(name=name, context=label) + _NO_DOT.check(name=name, context=label) + _NO_COLON.check(name=name, context=label) return name @@ -110,8 +110,8 @@ def _validate_column_name(name: str, context: str) -> str: models created by ``_query_as_model`` (e.g., ``stores__name``). """ label = f"{context} name" - _NO_DOT.check(name, label) - _NO_COLON.check(name, label) + _NO_DOT.check(name=name, context=label) + _NO_COLON.check(name=name, context=label) return name @@ -428,11 +428,11 @@ def _validate_data_source_format(cls, v: str) -> str: f"whitespace; got {v!r}." ) label = "Model 'data_source'" - _NO_NUL.check(v, label) - _NO_FWD_SLASH.check(v, label) - _NO_BACK_SLASH.check(v, label) - _NO_DOT.check(v, label) - _NO_COLON.check(v, label) + _NO_NUL.check(name=v, context=label) + _NO_FWD_SLASH.check(name=v, context=label) + _NO_BACK_SLASH.check(name=v, context=label) + _NO_DOT.check(name=v, context=label) + _NO_COLON.check(name=v, context=label) return v @model_validator(mode="after") @@ -703,12 +703,12 @@ def _validate_name(cls, v: str) -> str: # become SQL table aliases, so the join-path-alias reservation # that applies to model and query names doesn't apply here. label = "Datasource 'name'" - _require_non_empty_trimmed(v, label) - _NO_NUL.check(v, label) - _NO_FWD_SLASH.check(v, label) - _NO_BACK_SLASH.check(v, label) - _NO_DOT.check(v, label) - _NO_COLON.check(v, label) + _require_non_empty_trimmed(v=v, context=label) + _NO_NUL.check(name=v, context=label) + _NO_FWD_SLASH.check(name=v, context=label) + _NO_BACK_SLASH.check(name=v, context=label) + _NO_DOT.check(name=v, context=label) + _NO_COLON.check(name=v, context=label) return v def get_connection_string(self) -> str: