From 8e7c4839502e842b21f882b3deab9d5c49a85654 Mon Sep 17 00:00:00 2001 From: AivanF Date: Thu, 14 May 2026 14:39:23 +0300 Subject: [PATCH] Multi-stage queries in REST API --- .claude/skills/slayer-query.md | 2 +- CLAUDE.md | 3 +- docs/concepts/queries.md | 9 +- pyproject.toml | 2 +- slayer/api/server.py | 52 ++++++++++- tests/test_api_server.py | 165 +++++++++++++++++++++++++++++++++ 6 files changed, 225 insertions(+), 8 deletions(-) diff --git a/.claude/skills/slayer-query.md b/.claude/skills/slayer-query.md index 0124fe1..607ba29 100644 --- a/.claude/skills/slayer-query.md +++ b/.claude/skills/slayer-query.md @@ -156,7 +156,7 @@ Pass a list of queries — earlier queries are named sub-queries; the last is th Order doesn't matter for runtime lists — the engine auto-sorts so every stage appears after the siblings it references. The **last entry stays last** as the entry point. Cycles, self-references, and a non-final stage referencing the root are rejected; unreachable utility stages are accepted (silently dropped from the emitted SQL). -Surfaces: Python SDK `engine.execute(query=[...])`; CLI `slayer query @file.json` (accepts both single object and top-level list); MCP `query_nested(queries=[...])`. The single-stage MCP `query` tool and REST `POST /query` are single-query only — for the multi-stage shape from REST, save the list as a query-backed model (`POST /models` with `source_queries`) and run by name. `SlayerModel.source_queries` itself keeps strict top-to-bottom order; runtime lists are the only DAG-auto-sort surface. +Surfaces: Python SDK `engine.execute(query=[...])`; CLI `slayer query @file.json` (accepts both single object and top-level list); MCP `query_nested(queries=[...])`; REST `POST /query` with body `{"queries": [...], "variables": {...}, "dry_run": ..., "explain": ...}` (the single-query body shape is also still accepted). The single-stage MCP `query` tool stays single-query only — use it when the typed per-field schema fits a one-shot query. `SlayerModel.source_queries` itself keeps strict top-to-bottom order; runtime lists are the only DAG-auto-sort surface. ## Result format diff --git a/CLAUDE.md b/CLAUDE.md index a842f77..acf08e8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -118,7 +118,8 @@ poetry run ruff check slayer/ tests/ - `slayer search [--entity ENT ...] [--query JSON_OR_@FILE] [--question TEXT] [--datasource DS] [--max-memories N] [--max-example-queries N] [--max-entities N] [--format json|text]` (DEV-1375 / DEV-1386 / DEV-1409) — up to three-channel semantic search over memories + canonical entities (BM25 over memory entity tags + tantivy full-text + optional dense embedding similarity). `--datasource` scopes the corpus to one datasource (entity hits + memories pre-filtered). See [docs/concepts/search.md](docs/concepts/search.md). - `slayer search refresh-samples [--data-source X] [--model M ...]` (DEV-1375) — re-profile and persist `Column.sampled` for table-backed models. Best-effort: per-column failures are reported but don't abort. - MCP `query()` tool has a `format` parameter: `"markdown"` (default), `"json"`, or `"csv"`. -- **`query_nested` MCP tool**: companion to `query` for the multi-stage DAG shape that `engine.execute(query=list[...])` already supports. Takes `queries: List[Dict[str, Any]]` plus the usual `variables` / `show_sql` / `dry_run` / `explain` / `format` knobs. Earlier entries are named sub-queries that later entries reference via `source_model: ""` or `joins.target_model`; forward refs raise. The single-stage `query` tool is unchanged — keep using it whenever the typed per-field schema fits, since it surfaces a richer signature to agents. REST `POST /query` still accepts only a single query body (multi-stage from REST goes via `POST /models` with `source_queries`, then run-by-name). The CLI route is `slayer query @file.json` — the CLI parser already accepts both a single object and a top-level list. +- **`query_nested` MCP tool**: companion to `query` for the multi-stage DAG shape that `engine.execute(query=list[...])` already supports. Takes `queries: List[Dict[str, Any]]` plus the usual `variables` / `show_sql` / `dry_run` / `explain` / `format` knobs. Earlier entries are named sub-queries that later entries reference via `source_model: ""` or `joins.target_model`; the engine auto-sorts the list (Kahn's algorithm), so order doesn't matter. The single-stage `query` tool is unchanged — keep using it whenever the typed per-field schema fits, since it surfaces a richer signature to agents. +- **REST `POST /query` accepts both shapes**: body is `Union[QueryRequest, QueryListRequest]` (FastAPI auto-discriminates by the presence of `queries`). Single-query body is unchanged; multi-stage body is `{"queries": [{...}, ...], "variables": {...}, "dry_run": ..., "explain": ...}` — mirrors `query_nested` and `engine.execute(query=list[...])`. `QueryRequest.source_model` accepts a string (stored model name) or a dict (inline `ModelExtension` / `SlayerModel`) for both single and list shapes. The CLI route is `slayer query @file.json` — the CLI parser also accepts both a single object and a top-level list. ## Database Support diff --git a/docs/concepts/queries.md b/docs/concepts/queries.md index edebb39..4cbf963 100644 --- a/docs/concepts/queries.md +++ b/docs/concepts/queries.md @@ -458,7 +458,14 @@ Sibling stages can also reference each other — any non-final stage may use a * `SlayerModel.source_queries` (stored, YAML-defined) keeps stricter top-to-bottom rules: any reference must point to a stage defined *earlier* in the list, so the file reads top-to-bottom as the execution order. -**Surface coverage.** Query lists work via `engine.execute(query=[...])` (Python SDK), the CLI (`slayer query @file.json` accepts both a single object and a top-level list), and the MCP tool `query_nested(queries=[...])`. The single-stage MCP tool `query` and the REST `POST /query` endpoint accept a single query only — for the multi-stage shape from REST, save the list as a query-backed model (`POST /models` with `source_queries`) and run it by name. +**Surface coverage.** Query lists work via every surface: + +- Python SDK: `engine.execute(query=[...])`. +- CLI: `slayer query @file.json` — accepts both a single object and a top-level list. +- MCP: the `query_nested` tool, `queries=[...]` argument. +- REST: `POST /query` with body `{"queries": [...], "variables": {...}, "dry_run": ..., "explain": ...}` (the single-query body shape is also still accepted). + +The single-stage MCP tool `query` stays single-query only — use it when the typed per-field schema fits a one-shot query; reach for `query_nested` for multi-stage. ### ModelExtension diff --git a/pyproject.toml b/pyproject.toml index e760efb..9b55f6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "motley-slayer" -version = "0.6.6" +version = "0.6.7" description = "A lightweight, agent-first semantic layer for AI agents" requires-python = ">=3.11" license = "MIT" diff --git a/slayer/api/server.py b/slayer/api/server.py index 9560dc2..62a20bf 100644 --- a/slayer/api/server.py +++ b/slayer/api/server.py @@ -2,7 +2,7 @@ import logging from importlib.metadata import PackageNotFoundError, version as _pkg_version -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from fastapi import FastAPI, HTTPException from pydantic import BaseModel, ConfigDict @@ -31,7 +31,13 @@ class QueryRequest(BaseModel): model_config = ConfigDict(extra="allow") name: Optional[str] = None # Run-by-name: backing query for a query-backed model - source_model: Optional[str] = None + # ``source_model`` accepts a string (stored model name) or a dict + # — the dict form is an inline ``ModelExtension`` (``{"source_name": + # "", "columns": [...], "joins": [...]}``) or an inline + # ``SlayerModel`` (``{"name": "...", "sql_table": "...", "data_source": + # "...", "columns": [...]}``). The full polymorphism is handled by + # ``SlayerQuery.model_validate`` downstream. + source_model: Optional[Union[str, Dict[str, Any]]] = None measures: Optional[List[Dict[str, Any]]] = None dimensions: Optional[List[Dict[str, Any]]] = None time_dimensions: Optional[List[Dict[str, Any]]] = None @@ -45,6 +51,24 @@ class QueryRequest(BaseModel): variables: Optional[Dict[str, Any]] = None +class QueryListRequest(BaseModel): + """Body shape for multi-stage DAG queries at ``POST /query``. + + ``queries`` is a non-empty list of query dicts forming a DAG — + same shape as ``engine.execute(query=[...])`` and the MCP + ``query_nested`` tool. Order doesn't matter; the engine auto-sorts. + Top-level ``variables`` / ``dry_run`` / ``explain`` apply to the + whole execution. + """ + + model_config = ConfigDict(extra="forbid") + + queries: List[Dict[str, Any]] + variables: Optional[Dict[str, Any]] = None + dry_run: Optional[bool] = None + explain: Optional[bool] = None + + class FieldMetadataResponse(BaseModel): label: Optional[str] = None format: Optional[NumberFormat] = None @@ -168,13 +192,33 @@ async def health() -> Dict[str, str]: }, }, ) - async def query(request: QueryRequest) -> QueryResponse: + async def query( + request: Union[QueryRequest, QueryListRequest], + ) -> QueryResponse: try: + # Multi-stage DAG: body is ``{"queries": [...], "variables": ..., + # "dry_run": ..., "explain": ...}``. Mirrors the MCP + # ``query_nested`` tool and ``engine.execute(query=[...])``. + # Engine auto-sorts the list and validates DAG invariants. + if isinstance(request, QueryListRequest): + if not request.queries: + raise HTTPException( + status_code=400, + detail="'queries' must be a non-empty list.", + ) + dry_run = bool(request.dry_run) + explain = bool(request.explain) + result = await engine.execute( + query=list(request.queries), + variables=request.variables or {}, + dry_run=dry_run, + explain=explain, + ) # Run-by-name: ``{"name": "", "variables": {...}}`` # routes through ``engine.execute(str)`` so the model's stored # backing query runs directly. Cannot be combined with # ``source_model`` or other query fields. - if request.name is not None: + elif request.name is not None: disallowed = [ f for f in ( request.source_model, request.measures, request.dimensions, diff --git a/tests/test_api_server.py b/tests/test_api_server.py index 3c97988..fbcd733 100644 --- a/tests/test_api_server.py +++ b/tests/test_api_server.py @@ -395,6 +395,171 @@ def test_post_query_either_name_or_source_model_required( assert resp.status_code == 400 +class TestQueryListBody: + """POST /query accepts a multi-stage DAG via ``{"queries": [...]}``. + + Mirrors ``engine.execute(query=[...])`` and the MCP ``query_nested`` + tool: earlier entries are named sub-queries, the last entry is the + DAG root, order doesn't matter (engine auto-sorts), cycles and + self-references are rejected with 400. + """ + + @staticmethod + def _setup_orders(client: TestClient) -> None: + # Use POST endpoints to keep storage state consistent with the client. + resp = client.post("/datasources", json={ + "name": "ds_list", "type": "sqlite", "database": ":memory:", + }) + assert resp.status_code in (200, 201), resp.text + resp = client.post("/models", json={ + "name": "orders", + "data_source": "ds_list", + "sql_table": "orders", + "columns": [ + {"name": "amount", "sql": "amount", "type": "DOUBLE"}, + {"name": "status", "sql": "status", "type": "TEXT"}, + {"name": "customer_id", "sql": "customer_id", "type": "DOUBLE"}, + ], + }) + assert resp.status_code in (200, 201), resp.text + + def test_two_stage_dag_dry_run(self, client: TestClient) -> None: + """``{"queries": [stage1, stage2], "dry_run": true}`` runs through + the engine list path and returns the generated SQL. + """ + self._setup_orders(client) + resp = client.post("/query", json={ + "queries": [ + { + "name": "by_customer", + "source_model": "orders", + "measures": [{"formula": "amount:sum"}], + "dimensions": [{"name": "customer_id"}], + }, + { + "source_model": "by_customer", + "measures": [{"formula": "amount_sum:avg"}], + }, + ], + "dry_run": True, + }) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["sql"] is not None + assert "avg(" in body["sql"].lower() + + def test_out_of_order_dag_auto_sorts(self, client: TestClient) -> None: + """Stages can be submitted in any order — the engine auto-sorts.""" + self._setup_orders(client) + resp = client.post("/query", json={ + "queries": [ + # ``a`` depends on ``b`` but is defined first — auto-sort moves + # ``b`` ahead so the SQL emits cleanly. + { + "name": "a", + "source_model": "b", + "measures": [{"formula": "amount_sum:avg"}], + }, + { + "name": "b", + "source_model": "orders", + "measures": [{"formula": "amount:sum"}], + "dimensions": [{"name": "customer_id"}], + }, + { + "source_model": "a", + "measures": [{"formula": "amount_sum_avg:max"}], + }, + ], + "dry_run": True, + }) + assert resp.status_code == 200, resp.text + assert "max(" in resp.json()["sql"].lower() + + def test_empty_queries_rejected(self, client: TestClient) -> None: + resp = client.post("/query", json={"queries": []}) + assert resp.status_code == 400 + assert "non-empty" in resp.json()["detail"].lower() + + def test_cycle_rejected(self, client: TestClient) -> None: + """A cycle between stages must surface as 400 with a cycle message.""" + self._setup_orders(client) + resp = client.post("/query", json={ + "queries": [ + {"name": "a", "source_model": "b", "measures": [{"formula": "amount:sum"}]}, + {"name": "b", "source_model": "a", "measures": [{"formula": "amount:sum"}]}, + {"source_model": "orders", "measures": [{"formula": "amount:sum"}]}, + ], + "dry_run": True, + }) + assert resp.status_code == 400 + assert "cycle" in resp.json()["detail"].lower() + + +class TestQueryInlineSourceModel: + """POST /query accepts ``source_model`` as a string OR a dict (inline + ``ModelExtension`` / ``SlayerModel``), matching the MCP ``query`` tool + and ``SlayerQuery.source_model`` polymorphism. + """ + + @staticmethod + def _setup_orders(client: TestClient) -> None: + resp = client.post("/datasources", json={ + "name": "ds_inline", "type": "sqlite", "database": ":memory:", + }) + assert resp.status_code in (200, 201), resp.text + resp = client.post("/models", json={ + "name": "orders", + "data_source": "ds_inline", + "sql_table": "orders", + "columns": [ + {"name": "amount", "sql": "amount", "type": "DOUBLE"}, + ], + }) + assert resp.status_code in (200, 201), resp.text + + def test_inline_model_extension_dict(self, client: TestClient) -> None: + """``source_model: {"source_name": "orders", "columns": [...]}`` + extends the stored model with an extra column for this one query. + """ + self._setup_orders(client) + resp = client.post("/query", json={ + "source_model": { + "source_name": "orders", + "columns": [ + {"name": "double_amount", "sql": "amount * 2", "type": "DOUBLE"}, + ], + }, + "measures": [{"formula": "double_amount:sum"}], + "dry_run": True, + }) + assert resp.status_code == 200, resp.text + sql = resp.json()["sql"].lower() + assert "amount * 2" in sql or "amount*2" in sql + + def test_inline_slayer_model_dict(self, client: TestClient) -> None: + """``source_model: {"name": ..., "sql_table": ..., ...}`` defines + an ad-hoc model for this one query. + """ + # Datasource only — table need not exist for dry_run. + resp = client.post("/datasources", json={ + "name": "ds_adhoc", "type": "sqlite", "database": ":memory:", + }) + assert resp.status_code in (200, 201), resp.text + resp = client.post("/query", json={ + "source_model": { + "name": "ad_hoc", + "sql_table": "things", + "data_source": "ds_adhoc", + "columns": [{"name": "x", "sql": "x", "type": "DOUBLE"}], + }, + "measures": [{"formula": "x:sum"}], + "dry_run": True, + }) + assert resp.status_code == 200, resp.text + assert "things" in resp.json()["sql"].lower() + + class TestOpenAPI400Documentation: """Endpoints that raise HTTPException(400) should declare it in OpenAPI so generated SDKs surface the error shape (Sonar S8415).