From 86a984b017151bde7b6793a5fc51b74de852c541 Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Mon, 5 Jan 2026 20:11:38 -0500 Subject: [PATCH] Add catalog interface tests and InMemoryCatalog example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds comprehensive tests for the catalog interface and includes the InMemoryCatalog example implementation: Tests (tests/catalog/): - test_serialization.py: Round-trip tests for all catalog dataclasses - test_catalog_interface.py: Tests for CatalogInterface ABC and ReadOnlyCatalogInterface DDL rejection - test_integration.py: End-to-end tests using InMemoryCatalog for catalog operations (attach/detach, schemas, tables, views, versioning) InMemoryCatalog (vgi/examples/catalog.py): - In-memory catalog implementation for testing - Supports DDL operations (create/drop/rename for schemas, tables, views) - UUID-based attach_id generation - Catalog versioning on modifications - Entry point: vgi-example-catalog-worker 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .beads/issues.jsonl | 8 +- pyproject.toml | 1 + tests/catalog/__init__.py | 1 + tests/catalog/test_catalog_interface.py | 425 ++++++++++++++++ tests/catalog/test_integration.py | 621 ++++++++++++++++++++++++ tests/catalog/test_serialization.py | 398 +++++++++++++++ vgi/examples/catalog.py | 591 ++++++++++++++++++++++ 7 files changed, 2041 insertions(+), 4 deletions(-) create mode 100644 tests/catalog/__init__.py create mode 100644 tests/catalog/test_catalog_interface.py create mode 100644 tests/catalog/test_integration.py create mode 100644 tests/catalog/test_serialization.py create mode 100644 vgi/examples/catalog.py diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 9950e1a..930e728 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -8,7 +8,7 @@ {"id":"vgi-python-34l","title":"Create catalog integration tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_types.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_types.py:\n- Test all dataclass instantiation\n- Test frozen immutability\n- Test type alias usage\n\ntest_serialization.py:\n- Round-trip tests for all types\n- Edge cases: empty strings, empty lists, None values\n- Invalid schema rejection\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Mock worker tests for each client method\n- Error handling tests\n- Streaming response tests\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query, detach\n- Schema operations\n- Table operations\n- Error propagation\n\nProtocol conformance tests:\n- Invalid schemas (wrong column types)\n- Missing required columns\n- Multi-row input batches (should fail)\n- Extra columns (should be ignored)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:24.163718-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.050722-05:00","closed_at":"2026-01-05T19:21:50.050722-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-e46","type":"blocks","created_at":"2026-01-05T19:18:44.818195-05:00","created_by":"rusty"},{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-vxy","type":"blocks","created_at":"2026-01-05T19:18:48.578947-05:00","created_by":"rusty"},{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-nju","type":"blocks","created_at":"2026-01-05T19:18:48.686098-05:00","created_by":"rusty"}]} {"id":"vgi-python-35i","title":"Test SchemaValidationError detailed message paths","notes":"Coverage: 67% in vgi/exceptions.py. Missing tests for:\n- Lines 116-123: Type mismatch detection in _build_detailed_message\n- Lines 128-131: Field order difference detection \n- Lines 149-151: Type mismatch reporting\n- Lines 155-157: Field order difference reporting\n\nTest scenarios needed:\n1. Schema with same fields but different types\n2. Schema with nullable vs non-nullable mismatch\n3. Schema with same fields in different order","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T22:15:25.858704-05:00","created_by":"rusty","updated_at":"2026-01-04T22:25:58.697852-05:00","closed_at":"2026-01-04T22:25:58.697852-05:00","close_reason":"Added comprehensive tests for SchemaValidationError. Coverage improved from 67% to 99%."} {"id":"vgi-python-36f","title":"Split metadata.py Arrow serialization into separate module","description":"metadata.py is 932 lines with two distinct concerns: 1) metadata resolution (enums, dataclasses, parameter extraction, resolve_metadata) and 2) Arrow serialization (schema definitions, to_arrow/from_arrow functions). Split Arrow serialization into metadata_serialization.py or metadata_arrow.py for better separation of concerns.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.481364-05:00","created_by":"rusty","updated_at":"2026-01-04T22:01:52.100108-05:00","closed_at":"2026-01-04T22:01:52.100108-05:00","close_reason":"Not warranted - metadata.py is well-organized with clear section headers. Arrow serialization (~165 lines) is tightly coupled to data classes (uses their to_dict/from_dict methods). Splitting would add import complexity without significant benefit."} -{"id":"vgi-python-3bn","title":"Fix schema_contents() default implementation","description":"Implement the default schema_contents() method that has a FIXME comment.\n\nFile: vgi/catalog/catalog_interface.py\n\nCurrent state: \n```python\ndef schema_contents(...) -\u003e Iterable[TableInfo | ViewInfo | FunctionInfo]:\n # FIXME: write this implementation for the worker.\n```\n\nThe default implementation should:\n1. Access the Worker's registered functions\n2. Convert each function's metadata to FunctionInfo\n3. Return them as the contents of the 'main' schema\n\nThis requires:\n- Access to Worker.functions registry\n- Converting function metadata to FunctionInfo format\n- May need to pass worker reference to CatalogInterface\n\nAlternative: Just raise NotImplementedError and require subclasses to implement.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.482694-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:27.482694-05:00"} +{"id":"vgi-python-3bn","title":"Fix schema_contents() default implementation","description":"Implement the default schema_contents() method that has a FIXME comment.\n\nFile: vgi/catalog/catalog_interface.py\n\nCurrent state: \n```python\ndef schema_contents(...) -\u003e Iterable[TableInfo | ViewInfo | FunctionInfo]:\n # FIXME: write this implementation for the worker.\n```\n\nThe default implementation should:\n1. Access the Worker's registered functions\n2. Convert each function's metadata to FunctionInfo\n3. Return them as the contents of the 'main' schema\n\nThis requires:\n- Access to Worker.functions registry\n- Converting function metadata to FunctionInfo format\n- May need to pass worker reference to CatalogInterface\n\nAlternative: Just raise NotImplementedError and require subclasses to implement.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.482694-05:00","created_by":"rusty","updated_at":"2026-01-05T19:59:02.922654-05:00","closed_at":"2026-01-05T19:59:02.922654-05:00","close_reason":"Already fixed in PR #24 - now raises NotImplementedError"} {"id":"vgi-python-3fq","title":"Abstract common worker batch processing logic","description":"Worker batch processing methods _process_scalar_batches (377-466), _process_batches (468-550), and _generate_batches (552-593) share significant structure: IPC writer/reader setup, batch counting/logging, main processing loop. Extract common logic to reduce duplication - consider a BatchProcessor helper class or template method pattern.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:53.350497-05:00","created_by":"rusty","updated_at":"2026-01-04T21:20:24.509785-05:00","closed_at":"2026-01-04T21:20:24.509785-05:00","close_reason":"Analysis complete: abstraction not warranted. The three methods have sufficiently different logic (input handling, log message loops, protocol types) that abstracting them would add complexity without meaningful benefit. Current code is already readable at ~70-90 lines each."} {"id":"vgi-python-4mg","title":"Add InvocationType.CATALOG to vgi/invocation.py","description":"Extend InvocationType enum to support catalog invocations.\n\nFiles to modify:\n- vgi/invocation.py\n\nChanges:\n1. Add CATALOG = 'catalog' to InvocationType enum\n2. Update docstrings to document the new type\n3. Ensure serialization/deserialization handles the new value\n\nThe CATALOG invocation type indicates the function_name field contains a CatalogInterface method name (e.g., 'catalog_attach', 'schemas', 'table_get').\n\nTest that CATALOG type serializes and deserializes correctly.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:17:45.649509-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.062449-05:00","closed_at":"2026-01-05T19:21:50.062449-05:00","close_reason":"User requested closure"} {"id":"vgi-python-5er","title":"Extract _should_terminate into shared base class","description":"Identical _should_terminate method is copy-pasted in all three function modules. Implementation is always: check if log_message exists and level is EXCEPTION. Move to shared base class (Function or new ProcessingMixin) to eliminate duplication.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.190482-05:00","created_by":"rusty","updated_at":"2026-01-04T21:49:59.765614-05:00","closed_at":"2026-01-04T21:49:59.765614-05:00","close_reason":"Completed as part of PR #8 - _should_terminate moved to Function base class","dependencies":[{"issue_id":"vgi-python-5er","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.283865-05:00","created_by":"rusty"}]} @@ -38,13 +38,13 @@ {"id":"vgi-python-cvj","title":"Add PYTHON_TO_ARROW type mapping to vgi/arguments.py","description":"Add the Python→Arrow type mapping dict after imports:\n```python\nPYTHON_TO_ARROW: dict[type, pa.DataType] = {\n int: pa.int64(),\n str: pa.utf8(),\n float: pa.float64(),\n bool: pa.bool_(),\n bytes: pa.binary(),\n}\n```\nExport in __all__.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:37.900421-05:00","created_by":"rusty","updated_at":"2026-01-05T15:48:42.422086-05:00","closed_at":"2026-01-05T15:48:42.422086-05:00","close_reason":"PR #18 created"} {"id":"vgi-python-d73","title":"Create docs/argument-serialization.md","description":"## Overview\n\nCreate LLM-friendly documentation explaining the argument specification serialization format. This document should enable future implementors (human or AI) to understand how function argument signatures are serialized to Arrow schemas.\n\n## File Location\n\n`docs/argument-serialization.md`\n\n## Document Structure\n\n### Title and Purpose\n\nExplain that this document describes how VGI function argument specifications are serialized to Apache Arrow schemas for IPC transmission and DuckDB function registration.\n\n### Quick Reference\n\nA concise summary table showing:\n- Metadata keys and their meanings\n- Special type representations\n\n### Schema Format\n\nExplain the single-schema design:\n1. All arguments are fields in one Arrow schema\n2. Positional arguments come first, in order (field index = position index)\n3. Named arguments follow, marked with metadata\n4. Field name = Python attribute name (or argument key for named)\n5. Field type = exact Arrow type\n\n### Metadata Keys Reference\n\nComplete table of all metadata keys:\n\n| Key | Value | Description |\n|-----|-------|-------------|\n| `vgi_arg` | `named` | Field is a named argument, not positional. The field name is the argument key. |\n| `vgi_type` | `table` | Argument receives streaming table input (Arg[TableInput]). Arrow type is pa.null(). |\n| `vgi_type` | `any` | Argument accepts any Arrow type (Arg[AnyArrow]). Arrow type is pa.null(). |\n| `vgi_varargs` | `true` | Argument collects all remaining positional args. Arrow type is the element type. |\n\n### Special Type Handling\n\nExplain how special argument types are represented:\n\n#### TableInput\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"table\"}`\n- Meaning: This position receives streaming RecordBatches, not a scalar value\n\n#### AnyArrow\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"any\"}`\n- Meaning: Accepts any valid Arrow scalar type at runtime\n\n#### Varargs\n- Arrow type: The element type (e.g., `pa.int64()` for `Arg[int](..., varargs=True)`)\n- Metadata: `{b\"vgi_varargs\": b\"true\"}`\n- Meaning: Collects all remaining positional arguments from this position onwards\n\n### Examples\n\n#### Example 1: Simple Function\n\n```python\nclass MyFunction(TableInOutFunction):\n count = Arg[int](0) # Positional 0\n name = Arg[str](1) # Positional 1\n verbose = Arg[bool](\"verbose\") # Named\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"name\", pa.utf8()),\n pa.field(\"verbose\", pa.bool_(), metadata={b\"vgi_arg\": b\"named\"}),\n])\n```\n\n#### Example 2: Function with Table Input\n\n```python\nclass TransformFunction(TableInOutFunction):\n multiplier = Arg[float](0)\n data = Arg[TableInput](1)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"multiplier\", pa.float64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n])\n```\n\n#### Example 3: Function with Varargs\n\n```python\nclass SumFunction(TableInOutFunction):\n columns = Arg[str](0, varargs=True)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"columns\", pa.utf8(), metadata={b\"vgi_varargs\": b\"true\"}),\n])\n```\n\n#### Example 4: Complex Function\n\n```python\nclass ComplexFunction(TableInOutFunction):\n count = Arg[int](0)\n data = Arg[TableInput](1)\n extra = Arg[float](2, varargs=True)\n format = Arg[str](\"format\")\n threshold = Arg[AnyArrow](\"threshold\")\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n pa.field(\"extra\", pa.float64(), metadata={b\"vgi_varargs\": b\"true\"}),\n pa.field(\"format\", pa.utf8(), metadata={b\"vgi_arg\": b\"named\"}),\n pa.field(\"threshold\", pa.null(), metadata={b\"vgi_arg\": b\"named\", b\"vgi_type\": b\"any\"}),\n])\n```\n\n### Serialization Code\n\nShow how to serialize and deserialize:\n\n```python\n# Serialize to bytes\nschema_bytes = schema.serialize().to_pybytes()\n\n# Deserialize from bytes\nschema = pa.ipc.read_schema(pa.py_buffer(schema_bytes))\n```\n\n### Parsing Algorithm\n\nExplain how to parse a schema back to argument specs:\n\n1. Initialize position_index = 0\n2. For each field in schema:\n a. Check if field has `vgi_arg=named` metadata\n b. If named: position = field.name (string)\n c. If positional: position = position_index, then increment position_index\n d. Check for `vgi_type` metadata (table or any)\n e. Check for `vgi_varargs` metadata\n f. Create ArgumentSpec with extracted info\n\n### Not Included\n\nExplicitly state what is NOT serialized:\n- Default values\n- Validation constraints (ge, le, choices, pattern)\n- Documentation strings\n\nThese are Python-side concerns handled by the Arg descriptor at runtime.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:19:17.488877-05:00","created_by":"rusty","updated_at":"2026-01-05T11:33:29.168007-05:00","closed_at":"2026-01-05T11:33:29.168007-05:00","close_reason":"Created comprehensive LLM-friendly documentation","dependencies":[{"issue_id":"vgi-python-d73","depends_on_id":"vgi-python-8ra","type":"blocks","created_at":"2026-01-05T11:19:30.820384-05:00","created_by":"rusty"}]} {"id":"vgi-python-dv0","title":"Add arrow_type parameter to Arg class","description":"In vgi/arguments.py:\n1. Add 'arrow_type' to __slots__\n2. Add parameter: arrow_type: pa.DataType | None = None\n3. Store: self.arrow_type = arrow_type\n4. Update __repr__ to include arrow_type if set","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:38.020395-05:00","created_by":"rusty","updated_at":"2026-01-05T15:50:51.513273-05:00","closed_at":"2026-01-05T15:50:51.513273-05:00","close_reason":"PR #19 created","dependencies":[{"issue_id":"vgi-python-dv0","depends_on_id":"vgi-python-cvj","type":"blocks","created_at":"2026-01-05T15:45:13.696822-05:00","created_by":"rusty"}]} -{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:54:28.722579-05:00"} +{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:57:59.2381-05:00","closed_at":"2026-01-05T19:57:59.2381-05:00","close_reason":"PR #28 created"} {"id":"vgi-python-dvo","title":"Export AnyValue in vgi/__init__.py","description":"Import and add AnyValue to __all__ exports","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:41:41.65732-05:00","created_by":"rusty","updated_at":"2026-01-05T11:07:09.187969-05:00","closed_at":"2026-01-05T11:07:09.187969-05:00","close_reason":"Exported AnyArrow in vgi/__init__.py","dependencies":[{"issue_id":"vgi-python-dvo","depends_on_id":"vgi-python-ckg","type":"blocks","created_at":"2026-01-05T10:41:48.715634-05:00","created_by":"rusty"}]} {"id":"vgi-python-e37","title":"move Invocation from function.py out to own file","description":"The Invocation clas is kind of seperate from functions, so it should be in its own file. Move it and all of its other associated classes like InvocationType to its own file","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:18:46.605941-05:00","created_by":"rusty","updated_at":"2026-01-04T09:24:37.922675-05:00","closed_at":"2026-01-04T09:24:37.922675-05:00","close_reason":"Closed"} {"id":"vgi-python-e46","title":"Create vgi/catalog/client.py - CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFiles to create:\n- vgi/catalog/client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Context manager support (__enter__, __exit__)\n- start() / stop() methods\n\nCore methods (mirroring CatalogInterface):\n- catalogs() -\u003e list[str]\n- attach(name, options) -\u003e CatalogAttachResult\n- detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(attach_id, transaction_id, name) -\u003e SchemaInfo | None\n- schema_contents(attach_id, transaction_id, name) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- function_get(...) -\u003e FunctionInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (optional, may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_create, table_drop, table_rename, etc.\n- view_create, view_drop, view_rename, etc.\n\nTransaction methods:\n- transaction_begin(attach_id) -\u003e TransactionId | None\n- transaction_commit(attach_id, transaction_id)\n- transaction_rollback(attach_id, transaction_id)\n\nInternal methods:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation\n- _deserialize_result(batch, return_type) -\u003e Any\n\nHandle:\n- Streaming responses for Iterable returns\n- Exception propagation from worker\n- None returns (0-row/0-column batches)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:04.65125-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.055794-05:00","closed_at":"2026-01-05T19:21:50.055794-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.316642-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-fd2","type":"blocks","created_at":"2026-01-05T19:18:44.440065-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-4mg","type":"blocks","created_at":"2026-01-05T19:18:44.559963-05:00","created_by":"rusty"}]} {"id":"vgi-python-e6o","title":"Implement CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFile: vgi/client/catalog_client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Each method call spawns new worker (matches VGI short-lived pattern)\n\nCore methods mirroring CatalogInterface:\n- catalogs() -\u003e list[str]\n- catalog_attach(name, options) -\u003e CatalogAttachResult\n- catalog_detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(...) -\u003e SchemaInfo | None\n- schema_contents(...) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_* methods, view_* methods\n\nTransaction methods:\n- catalog_transaction_begin/commit/rollback\n\nInternal:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation (with InvocationType.CATALOG)\n- Uses existing IPC utilities for communication","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.975309-05:00","created_by":"rusty","updated_at":"2026-01-05T19:48:33.366915-05:00","closed_at":"2026-01-05T19:48:33.366915-05:00","close_reason":"PR #27 created with CatalogClient implementation","dependencies":[{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.730122-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.762036-05:00","created_by":"rusty"}]} {"id":"vgi-python-e9q","title":"Unify ProtocolOutput classes with shared base","description":"ProtocolOutput classes in table_function.py:177-224 and table_in_out_function.py:144-207 share similar metadata() method and from_process_result() classmethod. The table_in_out version adds status field. Create shared base with table_in_out extending it for status support.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.45014-05:00","created_by":"rusty","updated_at":"2026-01-04T21:54:55.871986-05:00","closed_at":"2026-01-04T21:54:55.871986-05:00","close_reason":"Not warranted - dataclass inheritance with slots=True doesn't allow adding required field (status) between inherited fields. The classes have different semantics (table_in_out requires status for generator state tracking) making inheritance impractical."} -{"id":"vgi-python-eg7","title":"Create InMemoryCatalog example implementation","description":"Create an in-memory catalog implementation for testing and as an example.\n\nFile: vgi/examples/catalog.py\n\nInMemoryCatalog(CatalogInterface):\n- In-memory storage using dicts\n- Implements all required abstract methods\n- Implements common optional methods (schema_create, table_create, etc.)\n- Generates attach_id as random UUID bytes\n- Does NOT support transactions (returns None)\n\nData structures:\n- _catalogs: dict[str, CatalogData]\n- _attachments: dict[AttachId, str] # attach_id -\u003e catalog_name\n\nCreate example worker:\n```python\nclass InMemoryCatalogWorker(Worker):\n catalog_interface = InMemoryCatalog\n```\n\nAdd entry point: vgi-example-catalog-worker","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.604912-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:27.604912-05:00","dependencies":[{"issue_id":"vgi-python-eg7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.87322-05:00","created_by":"rusty"}]} +{"id":"vgi-python-eg7","title":"Create InMemoryCatalog example implementation","description":"Create an in-memory catalog implementation for testing and as an example.\n\nFile: vgi/examples/catalog.py\n\nInMemoryCatalog(CatalogInterface):\n- In-memory storage using dicts\n- Implements all required abstract methods\n- Implements common optional methods (schema_create, table_create, etc.)\n- Generates attach_id as random UUID bytes\n- Does NOT support transactions (returns None)\n\nData structures:\n- _catalogs: dict[str, CatalogData]\n- _attachments: dict[AttachId, str] # attach_id -\u003e catalog_name\n\nCreate example worker:\n```python\nclass InMemoryCatalogWorker(Worker):\n catalog_interface = InMemoryCatalog\n```\n\nAdd entry point: vgi-example-catalog-worker","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.604912-05:00","created_by":"rusty","updated_at":"2026-01-05T20:01:59.38105-05:00","closed_at":"2026-01-05T20:01:59.38105-05:00","close_reason":"PR #29 created","dependencies":[{"issue_id":"vgi-python-eg7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.87322-05:00","created_by":"rusty"}]} {"id":"vgi-python-f5z","title":"Create vgi/catalog/storage.py - Catalog persistence","description":"Create storage layer for catalog attach_id and transaction_id persistence.\n\nFiles to create:\n- vgi/catalog/storage.py\n\nCatalogStorage protocol (similar to FunctionStorage):\n- attach_put(attach_id, catalog_name, options) -\u003e None\n- attach_get(attach_id) -\u003e tuple[str, dict] | None\n- attach_delete(attach_id) -\u003e None\n- attach_list() -\u003e list[AttachId]\n\n- transaction_put(transaction_id, attach_id, state) -\u003e None\n- transaction_get(transaction_id) -\u003e tuple[AttachId, bytes] | None\n- transaction_delete(transaction_id) -\u003e None\n\nCatalogStorageSqlite implementation:\n- Default location: ~/.state/vgi/vgi_catalog.db\n- WAL mode for concurrent access\n- Schema:\n CREATE TABLE catalog_attachments (\n attach_id BLOB PRIMARY KEY,\n catalog_name TEXT NOT NULL,\n options TEXT, -- JSON\n created_at REAL DEFAULT (julianday('now'))\n )\n CREATE TABLE catalog_transactions (\n transaction_id BLOB PRIMARY KEY,\n attach_id BLOB NOT NULL,\n state BLOB,\n created_at REAL DEFAULT (julianday('now'))\n )\n\nInclude cleanup strategies for stale attachments/transactions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:18:04.531387-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.073983-05:00","closed_at":"2026-01-05T19:21:50.073983-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-f5z","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.194468-05:00","created_by":"rusty"}]} {"id":"vgi-python-fd2","title":"Create vgi/catalog/serialization.py - Arrow serialization","description":"Create Arrow IPC serialization for all catalog types.\n\nFiles to create:\n- vgi/catalog/serialization.py\n\nArrow schemas for:\n- CatalogAttachResult: attach_id, supports_transactions, supports_time_travel, catalog_version_frozen, catalog_version\n- SchemaInfo: attach_id, name, is_default, comment, tags\n- TableInfo: name, schema_name, columns, primary_key_columns, not_null_constraints, unique_constraints, check_constraints, comment, tags\n- ViewInfo: name, schema_name, definition, comment, tags\n- FunctionInfo: name, schema_name, function_type, arguments, output_schema, comment, tags\n- ScanFunctionResult: function_name, max_processes, invocation_id\n\nFunctions:\n- serialize_\u003ctype\u003e() -\u003e bytes for each type\n- deserialize_\u003ctype\u003e(batch) -\u003e Type for each type\n- Arrow schema constants for each type\n\nSerialization convention:\n- Single-row batches for scalar returns\n- Multi-row batches for streaming (Iterable returns)\n- None = 0-row/0-column batch\n- Empty list = 0-row batch with schema\n\nInclude round-trip serialization tests for all types.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:17:15.404739-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.068663-05:00","closed_at":"2026-01-05T19:21:50.068663-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-fd2","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:36.318762-05:00","created_by":"rusty"}]} {"id":"vgi-python-g1m","title":"Use sentinel type pattern instead of Any for _MISSING in arguments.py","notes":"Line 33: _MISSING: Any = object()\n\nReplace with proper sentinel type pattern:\n```python\nfrom typing import Final\n\nclass _Missing:\n __slots__ = ()\n def __repr__(self) -\u003e str:\n return '\u003cMISSING\u003e'\n\nMISSING: Final = _Missing()\n```\n\nThis removes 1 Any and provides better type safety for default value checking.\nPart of 26.89% imprecision in arguments.py (59 Anys total).","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-04T22:19:50.079174-05:00","created_by":"rusty","updated_at":"2026-01-04T22:35:56.508153-05:00","closed_at":"2026-01-04T22:35:56.508153-05:00","close_reason":"Replaced _MISSING: Any = object() with proper _MissingType sentinel class. Improves type safety and removes 1 Any."} @@ -56,7 +56,7 @@ {"id":"vgi-python-j9k","title":"Add protocol types for IPC stream writers in cli.py","notes":"Line 53: self._writer: Any = None\n\nCould define a Protocol type for the IPC stream writer interface:\n```python\nclass IPCWriter(Protocol):\n def write_batch(self, batch: pa.RecordBatch) -\u003e None: ...\n def close(self) -\u003e None: ...\n```\n\nPart of 14.17% imprecision in cli.py (34 Anys total).","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-04T22:19:50.31711-05:00","created_by":"rusty","updated_at":"2026-01-04T22:37:01.488788-05:00","closed_at":"2026-01-04T22:37:01.488788-05:00","close_reason":"Replaced _writer: Any with _writer: pq.ParquetWriter | None. Removes 1 Any and provides proper type information."} {"id":"vgi-python-jrf","title":"Add varargs parameter to Arg descriptor","description":"In vgi/arguments.py:\n- Add varargs: bool = False to Arg.__init__ and __slots__\n- Update _resolve() to collect positional[position:] when varargs=True\n- Validate at least 1 value provided\n- Update _validate() to validate each element in tuple\n- Add Arguments.get_varargs(start, type=None) method\n- Update __repr__ to show varargs flag","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:49:20.012964-05:00","created_by":"rusty","updated_at":"2026-01-05T10:55:22.479344-05:00","closed_at":"2026-01-05T10:55:22.479344-05:00","close_reason":"Implemented varargs parameter in Arg descriptor with get_varargs() method and _validate_single()"} {"id":"vgi-python-k7x","title":"Use Mapping instead of dict in extract_argument_specs signature","description":"The arg_types parameter in extract_argument_specs() is typed as dict[str, pa.DataType]. Using Mapping[str, pa.DataType] from collections.abc would be more flexible, accepting any mapping type.","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-05T11:51:21.021496-05:00","created_by":"rusty","updated_at":"2026-01-05T12:03:51.771301-05:00","closed_at":"2026-01-05T12:03:51.771301-05:00","close_reason":"Closed"} -{"id":"vgi-python-kgm","title":"Create catalog interface tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_serialization.py:\n- Round-trip tests for all dataclass types\n- Edge cases: empty strings, empty lists, None values, empty tags\n- Verify Arrow schema correctness\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations (schemas(), catalog_version())\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Test CatalogClient with mock worker\n- Test each client method\n- Test error handling\n- Test streaming responses\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query schemas, query tables, detach\n- DDL operations: create/drop schema, create/drop table\n- Error propagation\n\nProtocol conformance tests:\n- Invalid input schemas\n- Missing required columns\n- Wrong column types\n- Multi-row input batches (should fail)","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T19:27:36.864383-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:36.864383-05:00","dependencies":[{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-e6o","type":"blocks","created_at":"2026-01-05T19:27:50.987057-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-9j7","type":"blocks","created_at":"2026-01-05T19:27:51.017259-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-eg7","type":"blocks","created_at":"2026-01-05T19:27:51.046187-05:00","created_by":"rusty"}]} +{"id":"vgi-python-kgm","title":"Create catalog interface tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_serialization.py:\n- Round-trip tests for all dataclass types\n- Edge cases: empty strings, empty lists, None values, empty tags\n- Verify Arrow schema correctness\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations (schemas(), catalog_version())\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Test CatalogClient with mock worker\n- Test each client method\n- Test error handling\n- Test streaming responses\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query schemas, query tables, detach\n- DDL operations: create/drop schema, create/drop table\n- Error propagation\n\nProtocol conformance tests:\n- Invalid input schemas\n- Missing required columns\n- Wrong column types\n- Multi-row input batches (should fail)","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:27:36.864383-05:00","created_by":"rusty","updated_at":"2026-01-05T20:02:24.403675-05:00","dependencies":[{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-e6o","type":"blocks","created_at":"2026-01-05T19:27:50.987057-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-9j7","type":"blocks","created_at":"2026-01-05T19:27:51.017259-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-eg7","type":"blocks","created_at":"2026-01-05T19:27:51.046187-05:00","created_by":"rusty"}]} {"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T21:43:58.141038-05:00","closed_at":"2026-01-04T21:43:58.141038-05:00","close_reason":"PR #7 created: https://github.com/Query-farm/vgi-python/pull/7"} {"id":"vgi-python-l1u","title":"Consider custom __repr__ for ArgumentSpec","description":"The default dataclass __repr__ includes the full Arrow type repr which can be verbose. Consider a custom __repr__ that's more concise for debugging, e.g., 'ArgumentSpec(name=\"count\", pos=0, type=int64)' instead of showing the full pa.DataType object.","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-05T11:51:21.415976-05:00","created_by":"rusty","updated_at":"2026-01-05T12:15:02.029743-05:00","closed_at":"2026-01-05T12:15:02.029743-05:00","close_reason":"Closed"} {"id":"vgi-python-l5z","title":"Update existing tests that use arg_types parameter","description":"In tests/test_argument_spec.py:\n- Update all calls to extract_argument_specs() that pass arg_types\n- Remove the arg_types parameter from test function calls\n- Ensure tests still pass with auto-inference","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:56.81929-05:00","created_by":"rusty","updated_at":"2026-01-05T15:56:39.371768-05:00","closed_at":"2026-01-05T15:56:39.371768-05:00","close_reason":"Completed as part of PR #20","dependencies":[{"issue_id":"vgi-python-l5z","depends_on_id":"vgi-python-coi","type":"blocks","created_at":"2026-01-05T15:45:13.980985-05:00","created_by":"rusty"}]} diff --git a/pyproject.toml b/pyproject.toml index 6c39f3b..f06edb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dev = ["mypy", "pyarrow-stubs", "pytest", "pytest-cov", "pytest-mypy", "pytest-r [project.scripts] vgi-client = "vgi.client.cli:main" vgi-example-worker = "vgi.examples.worker:main" +vgi-example-catalog-worker = "vgi.examples.catalog:main" [build-system] requires = ["hatchling"] diff --git a/tests/catalog/__init__.py b/tests/catalog/__init__.py new file mode 100644 index 0000000..fe66b17 --- /dev/null +++ b/tests/catalog/__init__.py @@ -0,0 +1 @@ +"""Tests for the VGI catalog interface.""" diff --git a/tests/catalog/test_catalog_interface.py b/tests/catalog/test_catalog_interface.py new file mode 100644 index 0000000..76fb900 --- /dev/null +++ b/tests/catalog/test_catalog_interface.py @@ -0,0 +1,425 @@ +"""Tests for CatalogInterface ABC and default implementations.""" + +from collections.abc import Iterable +from typing import Any + +import pytest + +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + CatalogInterface, + OnConflict, + SchemaInfo, + SerializedSchema, + TableInfo, + TransactionId, + ViewInfo, +) +from vgi.catalog.catalog_interface import ReadOnlyCatalogInterface +from vgi.exceptions import CatalogReadOnlyError + + +class MinimalCatalog(CatalogInterface): + """Minimal implementation for testing abstract method requirements.""" + + def catalogs(self) -> Iterable[str]: + """Return list of catalogs.""" + return ["test"] + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to catalog.""" + return CatalogAttachResult( + attach_id=AttachId(b"test"), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=1, + ) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get schema info.""" + if name == "main": + return SchemaInfo( + attach_id=attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + return None + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get table info.""" + return None + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get view info.""" + return None + + +class TestCatalogInterfaceAbstract: + """Test abstract method enforcement.""" + + def test_cannot_instantiate_abstract_class(self) -> None: + """CatalogInterface cannot be instantiated directly.""" + with pytest.raises(TypeError): + CatalogInterface() # type: ignore[abstract] + + def test_minimal_implementation_works(self) -> None: + """A minimal implementation can be instantiated.""" + catalog = MinimalCatalog() + assert list(catalog.catalogs()) == ["test"] + + +class TestCatalogInterfaceDefaults: + """Test default method implementations.""" + + def test_schemas_returns_main(self) -> None: + """Default schemas() returns single 'main' schema.""" + catalog = MinimalCatalog() + attach_id = AttachId(b"test") + schemas = list(catalog.schemas(attach_id=attach_id, transaction_id=None)) + + assert len(schemas) == 1 + assert schemas[0].name == "main" + assert schemas[0].is_default is True + assert schemas[0].comment is None + assert schemas[0].tags == {} + + def test_catalog_version_returns_zero(self) -> None: + """Default catalog_version() returns 0.""" + catalog = MinimalCatalog() + version = catalog.catalog_version( + attach_id=AttachId(b"test"), transaction_id=None + ) + assert version == 0 + + def test_catalog_detach_does_nothing(self) -> None: + """Default catalog_detach() does nothing (no exception).""" + catalog = MinimalCatalog() + # Should not raise + catalog.catalog_detach(attach_id=AttachId(b"test")) + + def test_interface_feature_flags_empty(self) -> None: + """Default interface_feature_flags returns empty set.""" + catalog = MinimalCatalog() + assert catalog.interface_feature_flags == set() + + +class TestCatalogInterfaceNotImplemented: + """Test that optional methods raise NotImplementedError by default.""" + + def test_catalog_create_not_implemented(self) -> None: + """catalog_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Catalog create not implemented"): + catalog.catalog_create( + name="test", on_conflict=OnConflict.ERROR, options={} + ) + + def test_catalog_drop_not_implemented(self) -> None: + """catalog_drop raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Catalog drop not implemented"): + catalog.catalog_drop(name="test") + + def test_transaction_begin_not_implemented(self) -> None: + """catalog_transaction_begin raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_begin(attach_id=AttachId(b"test")) + + def test_transaction_commit_not_implemented(self) -> None: + """catalog_transaction_commit raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_commit( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_transaction_rollback_not_implemented(self) -> None: + """catalog_transaction_rollback raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_rollback( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_schema_create_not_implemented(self) -> None: + """schema_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Schema create not implemented"): + catalog.schema_create( + attach_id=AttachId(b"test"), + transaction_id=None, + name="new_schema", + comment=None, + tags={}, + ) + + def test_schema_drop_not_implemented(self) -> None: + """schema_drop raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Schema drop not implemented"): + catalog.schema_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + name="schema", + ignore_not_found=False, + cascade=False, + ) + + def test_schema_contents_not_implemented(self) -> None: + """schema_contents raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Schema contents not implemented" + ): + catalog.schema_contents( + attach_id=AttachId(b"test"), transaction_id=None, name="main" + ) + + def test_table_create_not_implemented(self) -> None: + """table_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Table create not implemented"): + catalog.table_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + columns=SerializedSchema(b""), + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_view_create_not_implemented(self) -> None: + """view_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="View create not implemented"): + catalog.view_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + +class MinimalReadOnlyCatalog(ReadOnlyCatalogInterface): + """Minimal read-only implementation for testing.""" + + def catalogs(self) -> Iterable[str]: + """Return list of catalogs.""" + return ["readonly"] + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to catalog.""" + return CatalogAttachResult( + attach_id=AttachId(b"readonly"), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=True, + catalog_version=1, + ) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get schema info.""" + return None + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get table info.""" + return None + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get view info.""" + return None + + +class TestReadOnlyCatalogInterface: + """Test ReadOnlyCatalogInterface DDL rejection.""" + + def test_catalog_create_raises_readonly_error(self) -> None: + """catalog_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_create( + name="test", on_conflict=OnConflict.ERROR, options={} + ) + + def test_catalog_drop_raises_readonly_error(self) -> None: + """catalog_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_drop(name="test") + + def test_transaction_begin_raises_readonly_error(self) -> None: + """catalog_transaction_begin raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_begin(attach_id=AttachId(b"test")) + + def test_transaction_commit_raises_readonly_error(self) -> None: + """catalog_transaction_commit raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_commit( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_transaction_rollback_raises_readonly_error(self) -> None: + """catalog_transaction_rollback raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_rollback( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_schema_create_raises_readonly_error(self) -> None: + """schema_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.schema_create( + attach_id=AttachId(b"test"), + transaction_id=None, + name="new", + comment=None, + tags={}, + ) + + def test_schema_drop_raises_readonly_error(self) -> None: + """schema_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.schema_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + name="main", + ignore_not_found=False, + cascade=False, + ) + + def test_table_create_raises_readonly_error(self) -> None: + """table_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + columns=SerializedSchema(b""), + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_drop_raises_readonly_error(self) -> None: + """table_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + ignore_not_found=False, + ) + + def test_table_rename_raises_readonly_error(self) -> None: + """table_rename raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_rename( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="old", + new_name="new", + ignore_not_found=False, + ) + + def test_view_create_raises_readonly_error(self) -> None: + """view_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.view_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + def test_view_drop_raises_readonly_error(self) -> None: + """view_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.view_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + ignore_not_found=False, + ) + + def test_class_attributes(self) -> None: + """ReadOnlyCatalogInterface has correct class attributes.""" + assert ReadOnlyCatalogInterface.supports_transactions is False + assert ReadOnlyCatalogInterface.catalog_version_frozen is True diff --git a/tests/catalog/test_integration.py b/tests/catalog/test_integration.py new file mode 100644 index 0000000..fb30d95 --- /dev/null +++ b/tests/catalog/test_integration.py @@ -0,0 +1,621 @@ +"""Integration tests for catalog interface using InMemoryCatalog.""" + +import pyarrow as pa +import pytest + +from vgi import schema +from vgi.catalog import OnConflict, SerializedSchema +from vgi.examples.catalog import InMemoryCatalog + + +class TestInMemoryCatalogBasic: + """Test basic InMemoryCatalog functionality.""" + + def test_default_catalog_exists(self) -> None: + """Default 'memory' catalog exists on creation.""" + catalog = InMemoryCatalog() + catalogs = list(catalog.catalogs()) + assert "memory" in catalogs + + def test_attach_to_default_catalog(self) -> None: + """Can attach to the default memory catalog.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + assert result.attach_id is not None + assert len(result.attach_id) == 16 # UUID bytes + assert result.supports_transactions is False + assert result.supports_time_travel is False + assert result.catalog_version_frozen is False + + def test_attach_to_nonexistent_catalog_raises(self) -> None: + """Attaching to nonexistent catalog raises error.""" + catalog = InMemoryCatalog() + with pytest.raises(ValueError, match="not found"): + catalog.catalog_attach(name="nonexistent", options={}) + + def test_detach(self) -> None: + """Can detach from an attached catalog.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + # Should not raise + catalog.catalog_detach(attach_id=result.attach_id) + + +class TestInMemoryCatalogSchemas: + """Test schema operations.""" + + def test_default_main_schema_exists(self) -> None: + """Default 'main' schema exists after attach.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schemas = list(catalog.schemas(attach_id=result.attach_id, transaction_id=None)) + + assert len(schemas) == 1 + assert schemas[0].name == "main" + assert schemas[0].is_default is True + + def test_schema_get_main(self) -> None: + """Can get the main schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="main" + ) + + assert schema is not None + assert schema.name == "main" + + def test_schema_get_nonexistent(self) -> None: + """Getting nonexistent schema returns None.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="nonexistent" + ) + + assert schema is None + + def test_schema_create(self) -> None: + """Can create a new schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="analytics", + comment="Analytics schema", + tags={"team": "data"}, + ) + + schemas = list(catalog.schemas(attach_id=result.attach_id, transaction_id=None)) + schema_names = [s.name for s in schemas] + assert "analytics" in schema_names + + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="analytics" + ) + assert schema is not None + assert schema.comment == "Analytics schema" + assert schema.tags == {"team": "data"} + + def test_schema_create_duplicate_raises(self) -> None: + """Creating duplicate schema raises error.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + with pytest.raises(ValueError, match="already exists"): + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="main", # Already exists + comment=None, + tags={}, + ) + + def test_schema_drop(self) -> None: + """Can drop a schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="to_drop", + comment=None, + tags={}, + ) + + catalog.schema_drop( + attach_id=result.attach_id, + transaction_id=None, + name="to_drop", + ignore_not_found=False, + cascade=False, + ) + + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="to_drop" + ) + assert schema is None + + def test_schema_drop_ignore_not_found(self) -> None: + """Dropping nonexistent schema with ignore_not_found=True succeeds.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + # Should not raise + catalog.schema_drop( + attach_id=result.attach_id, + transaction_id=None, + name="nonexistent", + ignore_not_found=True, + cascade=False, + ) + + +class TestInMemoryCatalogTables: + """Test table operations.""" + + def test_table_create_and_get(self) -> None: + """Can create and retrieve a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns_schema = schema(id=pa.int64(), name=pa.string()) + columns = SerializedSchema(columns_schema.serialize().to_pybytes()) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[0], + unique_constraints=[[0]], + check_constraints=[], + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + ) + + assert table is not None + assert table.name == "users" + assert table.schema_name == "main" + assert table.columns == columns + assert table.not_null_constraints == [0] + assert table.unique_constraints == [[0]] + + def test_table_get_nonexistent(self) -> None: + """Getting nonexistent table returns None.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="nonexistent", + ) + assert table is None + + def test_table_drop(self) -> None: + """Can drop a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_drop( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ignore_not_found=False, + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ) + assert table is None + + def test_table_rename(self) -> None: + """Can rename a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_rename( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + new_name="new_name", + ignore_not_found=False, + ) + + old = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + ) + new = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="new_name", + ) + + assert old is None + assert new is not None + assert new.name == "new_name" + + def test_table_comment_set(self) -> None: + """Can set table comment.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_comment_set( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + comment="This is a comment", + ignore_not_found=False, + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + ) + assert table is not None + assert table.comment == "This is a comment" + + +class TestInMemoryCatalogViews: + """Test view operations.""" + + def test_view_create_and_get(self) -> None: + """Can create and retrieve a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + definition="SELECT * FROM users", + on_conflict=OnConflict.ERROR, + ) + + view = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + ) + + assert view is not None + assert view.name == "user_view" + assert view.definition == "SELECT * FROM users" + + def test_view_drop(self) -> None: + """Can drop a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + catalog.view_drop( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ignore_not_found=False, + ) + + view = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ) + assert view is None + + def test_view_rename(self) -> None: + """Can rename a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + catalog.view_rename( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + new_name="new_view", + ignore_not_found=False, + ) + + old = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + ) + new = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="new_view", + ) + + assert old is None + assert new is not None + + +class TestInMemoryCatalogVersioning: + """Test catalog versioning.""" + + def test_version_increments_on_schema_create(self) -> None: + """Version increments when schema is created.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + version1 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="new_schema", + comment=None, + tags={}, + ) + + version2 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + assert version2 > version1 + + def test_version_increments_on_table_create(self) -> None: + """Version increments when table is created.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + version1 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + version2 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + assert version2 > version1 + + +class TestInMemoryCatalogSchemaContents: + """Test schema_contents operation.""" + + def test_schema_contents_lists_tables_and_views(self) -> None: + """schema_contents returns tables and views.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + definition="SELECT * FROM users", + on_conflict=OnConflict.ERROR, + ) + + contents = list( + catalog.schema_contents( + attach_id=result.attach_id, transaction_id=None, name="main" + ) + ) + + names = [c.name for c in contents] + assert "users" in names + assert "user_view" in names + + +class TestInMemoryCatalogOnConflict: + """Test OnConflict behavior.""" + + def test_table_create_error_on_duplicate(self) -> None: + """OnConflict.ERROR raises on duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + with pytest.raises(ValueError, match="already exists"): + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_create_ignore_on_duplicate(self) -> None: + """OnConflict.IGNORE does nothing on duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + # Should not raise + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.IGNORE, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_create_replace_on_duplicate(self) -> None: + """OnConflict.REPLACE replaces duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns1 = SerializedSchema(schema(a=pa.int32()).serialize().to_pybytes()) + columns2 = SerializedSchema(schema(b=pa.string()).serialize().to_pybytes()) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns1, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns2, + on_conflict=OnConflict.REPLACE, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + ) + assert table is not None + assert table.columns == columns2 diff --git a/tests/catalog/test_serialization.py b/tests/catalog/test_serialization.py new file mode 100644 index 0000000..3431fe3 --- /dev/null +++ b/tests/catalog/test_serialization.py @@ -0,0 +1,398 @@ +"""Tests for catalog dataclass serialization/deserialization.""" + +import pyarrow as pa + +from vgi import schema +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + FunctionInfo, + FunctionType, + ScanFunctionResult, + SchemaInfo, + SerializedSchema, + TableInfo, + ViewInfo, +) +from vgi.ipc_utils import deserialize_record_batch + + +class TestCatalogAttachResultSerialization: + """Test CatalogAttachResult serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = CatalogAttachResult( + attach_id=AttachId(b"\x01\x02\x03\x04"), + supports_transactions=True, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=42, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.attach_id == original.attach_id + assert restored.supports_transactions == original.supports_transactions + assert restored.supports_time_travel == original.supports_time_travel + assert restored.catalog_version_frozen == original.catalog_version_frozen + assert restored.catalog_version == original.catalog_version + + def test_empty_attach_id(self) -> None: + """Test with empty attach_id bytes.""" + original = CatalogAttachResult( + attach_id=AttachId(b""), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=True, + catalog_version=0, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.attach_id == b"" + + def test_all_flags_true(self) -> None: + """Test with all boolean flags set to true.""" + original = CatalogAttachResult( + attach_id=AttachId(b"test"), + supports_transactions=True, + supports_time_travel=True, + catalog_version_frozen=True, + catalog_version=999, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.supports_transactions is True + assert restored.supports_time_travel is True + assert restored.catalog_version_frozen is True + + +class TestSchemaInfoSerialization: + """Test SchemaInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = SchemaInfo( + attach_id=AttachId(b"\x01\x02\x03\x04"), + name="main", + is_default=True, + comment="Test schema", + tags={"env": "test", "owner": "alice"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.attach_id == original.attach_id + assert restored.name == original.name + assert restored.is_default == original.is_default + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_none_comment(self) -> None: + """Test with None comment.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="schema1", + is_default=False, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.comment is None + + def test_empty_tags(self) -> None: + """Test with empty tags dictionary.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="schema1", + is_default=False, + comment="Comment", + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.tags == {} + + def test_empty_string_name(self) -> None: + """Test with empty string name.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="", + is_default=False, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.name == "" + + +class TestTableInfoSerialization: + """Test TableInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + columns_schema = schema(id=pa.int64(), name=pa.string()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="users", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[0], + unique_constraints=[[0]], + check_constraints=["id > 0"], + comment="Users table", + tags={"category": "core"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.schema_name == original.schema_name + assert restored.columns == original.columns + assert restored.not_null_constraints == original.not_null_constraints + assert restored.unique_constraints == original.unique_constraints + assert restored.check_constraints == original.check_constraints + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_empty_constraints(self) -> None: + """Test with empty constraint lists.""" + columns_schema = schema(x=pa.int32()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="simple", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.not_null_constraints == [] + assert restored.unique_constraints == [] + assert restored.check_constraints == [] + + def test_multiple_unique_constraints(self) -> None: + """Test with multiple unique constraints on multiple columns.""" + columns_schema = schema(a=pa.int32(), b=pa.int32(), c=pa.int32()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="multi", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[0, 1], + unique_constraints=[[0], [1, 2]], + check_constraints=["a > 0", "b < 100"], + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.unique_constraints == [[0], [1, 2]] + + +class TestViewInfoSerialization: + """Test ViewInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = ViewInfo( + name="user_summary", + schema_name="main", + definition="SELECT id, name FROM users", + comment="Summary view", + tags={"type": "summary"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ViewInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.schema_name == original.schema_name + assert restored.definition == original.definition + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_complex_definition(self) -> None: + """Test with complex SQL definition.""" + original = ViewInfo( + name="complex", + schema_name="analytics", + definition=""" + SELECT u.id, u.name, COUNT(o.id) as order_count + FROM users u + LEFT JOIN orders o ON u.id = o.user_id + WHERE u.active = true + GROUP BY u.id, u.name + HAVING COUNT(o.id) > 0 + """, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ViewInfo.deserialize(batch) + + assert restored.definition == original.definition + + +class TestFunctionInfoSerialization: + """Test FunctionInfo serialization round-trip.""" + + def test_scalar_function(self) -> None: + """Test with scalar function type.""" + args_schema = schema(value=pa.int64()) + output_schema = schema(result=pa.int64()) + + original = FunctionInfo( + name="double", + schema_name="main", + function_type=FunctionType.SCALAR, + arguments=SerializedSchema(args_schema.serialize().to_pybytes()), + output_schema=SerializedSchema(output_schema.serialize().to_pybytes()), + comment="Double the input", + tags={"category": "math"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = FunctionInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.function_type == FunctionType.SCALAR + assert restored.arguments == original.arguments + assert restored.output_schema == original.output_schema + + def test_table_function(self) -> None: + """Test with table function type.""" + args_schema = schema(count=pa.int32()) + output_schema = schema(n=pa.int64()) + + original = FunctionInfo( + name="sequence", + schema_name="main", + function_type=FunctionType.TABLE, + arguments=SerializedSchema(args_schema.serialize().to_pybytes()), + output_schema=SerializedSchema(output_schema.serialize().to_pybytes()), + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = FunctionInfo.deserialize(batch) + + assert restored.function_type == FunctionType.TABLE + + +class TestScanFunctionResultSerialization: + """Test ScanFunctionResult serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = ScanFunctionResult( + function_name="scan_table", + max_processes=4, + invocation_id=b"\x01\x02\x03\x04", + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ScanFunctionResult.deserialize(batch) + + assert restored.function_name == original.function_name + assert restored.max_processes == original.max_processes + assert restored.invocation_id == original.invocation_id + + def test_none_invocation_id(self) -> None: + """Test with None invocation_id.""" + original = ScanFunctionResult( + function_name="scan", + max_processes=1, + invocation_id=None, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ScanFunctionResult.deserialize(batch) + + assert restored.invocation_id is None + + +class TestArrowSchemaCorrectness: + """Test that Arrow schemas are correct for each type.""" + + def test_catalog_attach_result_schema(self) -> None: + """Verify CatalogAttachResult Arrow schema.""" + schema = CatalogAttachResult.ARROW_SCHEMA + assert len(schema) == 5 + assert schema.field("attach_id").type == pa.binary() + assert schema.field("supports_transactions").type == pa.bool_() + assert schema.field("supports_time_travel").type == pa.bool_() + assert schema.field("catalog_version_frozen").type == pa.bool_() + assert schema.field("catalog_version").type == pa.int64() + + def test_schema_info_schema(self) -> None: + """Verify SchemaInfo Arrow schema.""" + schema = SchemaInfo.ARROW_SCHEMA + assert len(schema) == 5 + assert schema.field("attach_id").type == pa.binary() + assert schema.field("name").type == pa.string() + assert schema.field("is_default").type == pa.bool_() + assert schema.field("comment").type == pa.string() + assert schema.field("comment").nullable is True + assert schema.field("tags").type == pa.map_(pa.string(), pa.string()) + + def test_table_info_schema(self) -> None: + """Verify TableInfo Arrow schema.""" + schema = TableInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("schema_name").type == pa.string() + assert schema.field("columns").type == pa.binary() + assert schema.field("not_null_constraints").type == pa.list_(pa.int32()) + assert schema.field("unique_constraints").type == pa.list_(pa.list_(pa.int32())) + assert schema.field("check_constraints").type == pa.list_(pa.string()) + + def test_view_info_schema(self) -> None: + """Verify ViewInfo Arrow schema.""" + schema = ViewInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("schema_name").type == pa.string() + assert schema.field("definition").type == pa.string() + + def test_function_info_schema(self) -> None: + """Verify FunctionInfo Arrow schema.""" + schema = FunctionInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("function_type").type == pa.string() + assert schema.field("arguments").type == pa.binary() + assert schema.field("output_schema").type == pa.binary() + + def test_scan_function_result_schema(self) -> None: + """Verify ScanFunctionResult Arrow schema.""" + schema = ScanFunctionResult.ARROW_SCHEMA + assert schema.field("function_name").type == pa.string() + assert schema.field("max_processes").type == pa.int32() + assert schema.field("invocation_id").type == pa.binary() + assert schema.field("invocation_id").nullable is True diff --git a/vgi/examples/catalog.py b/vgi/examples/catalog.py new file mode 100644 index 0000000..371b51e --- /dev/null +++ b/vgi/examples/catalog.py @@ -0,0 +1,591 @@ +"""In-memory catalog implementation for testing and examples. + +This module provides an in-memory implementation of CatalogInterface that can +be used for testing and as a reference implementation. +""" + +from __future__ import annotations + +import uuid +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import Any + +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + CatalogInterface, + FunctionInfo, + OnConflict, + SchemaInfo, + SerializedSchema, + TableInfo, + TransactionId, + ViewInfo, +) +from vgi.worker import Worker + + +@dataclass +class TableData: + """In-memory storage for table metadata.""" + + info: TableInfo + + +@dataclass +class ViewData: + """In-memory storage for view metadata.""" + + info: ViewInfo + + +@dataclass +class SchemaData: + """In-memory storage for schema metadata.""" + + info: SchemaInfo + tables: dict[str, TableData] = field(default_factory=dict) + views: dict[str, ViewData] = field(default_factory=dict) + + +@dataclass +class CatalogData: + """In-memory storage for catalog metadata.""" + + name: str + schemas: dict[str, SchemaData] = field(default_factory=dict) + version: int = 1 + + +class InMemoryCatalog(CatalogInterface): + """In-memory catalog implementation for testing. + + This implementation stores all catalog, schema, table, and view data + in memory using Python dictionaries. It supports basic DDL operations + but does not support transactions. + + Attach IDs are generated as random UUIDs. + """ + + def __init__(self) -> None: + """Initialize the in-memory catalog.""" + # Maps catalog name -> CatalogData + self._catalogs: dict[str, CatalogData] = {} + # Maps attach_id -> catalog_name + self._attachments: dict[AttachId, str] = {} + # Create default "memory" catalog with "main" schema + self._create_default_catalog() + + def _create_default_catalog(self) -> None: + """Create the default memory catalog with main schema.""" + catalog = CatalogData(name="memory") + # Create a placeholder attach_id for internal use + placeholder_attach_id = AttachId(b"\x00" * 16) + catalog.schemas["main"] = SchemaData( + info=SchemaInfo( + attach_id=placeholder_attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + ) + self._catalogs["memory"] = catalog + + def _get_catalog(self, attach_id: AttachId) -> CatalogData: + """Get the catalog for the given attach_id.""" + catalog_name = self._attachments.get(attach_id) + if catalog_name is None: + msg = f"No catalog attached with id {attach_id!r}" + raise ValueError(msg) + catalog = self._catalogs.get(catalog_name) + if catalog is None: + msg = f"Catalog {catalog_name!r} not found" + raise ValueError(msg) + return catalog + + def _get_schema(self, attach_id: AttachId, schema_name: str) -> SchemaData: + """Get the schema for the given attach_id and schema name.""" + catalog = self._get_catalog(attach_id) + schema = catalog.schemas.get(schema_name) + if schema is None: + msg = f"Schema {schema_name!r} not found in catalog" + raise ValueError(msg) + return schema + + def _increment_version(self, attach_id: AttachId) -> None: + """Increment the catalog version after a modification.""" + catalog = self._get_catalog(attach_id) + catalog.version += 1 + + # Required abstract methods + + def catalogs(self) -> Iterable[str]: + """Get a list of catalog names.""" + return list(self._catalogs.keys()) + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to a catalog with the given name.""" + if name not in self._catalogs: + msg = f"Catalog {name!r} not found" + raise ValueError(msg) + + # Generate a unique attach_id + attach_id = AttachId(uuid.uuid4().bytes) + self._attachments[attach_id] = name + + catalog = self._catalogs[name] + return CatalogAttachResult( + attach_id=attach_id, + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=catalog.version, + ) + + def catalog_detach(self, *, attach_id: AttachId) -> None: + """Detach from the catalog.""" + self._attachments.pop(attach_id, None) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get information about a schema.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(name) + if schema_data is None: + return None + # Update the attach_id in the returned info + return SchemaInfo( + attach_id=attach_id, + name=schema_data.info.name, + is_default=schema_data.info.is_default, + comment=schema_data.info.comment, + tags=schema_data.info.tags, + ) + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get information about a table.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(schema_name) + if schema_data is None: + return None + table_data = schema_data.tables.get(name) + if table_data is None: + return None + return table_data.info + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get information about a view.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(schema_name) + if schema_data is None: + return None + view_data = schema_data.views.get(name) + if view_data is None: + return None + return view_data.info + + # Optional methods with implementations + + def catalog_version( + self, *, attach_id: AttachId, transaction_id: TransactionId | None + ) -> int: + """Get the current catalog version.""" + catalog = self._get_catalog(attach_id) + return catalog.version + + def catalog_create( + self, *, name: str, on_conflict: OnConflict, options: dict[str, Any] + ) -> None: + """Create a new catalog.""" + if name in self._catalogs: + if on_conflict == OnConflict.ERROR: + msg = f"Catalog {name!r} already exists" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + catalog = CatalogData(name=name) + # Create a placeholder attach_id for internal use + placeholder_attach_id = AttachId(b"\x00" * 16) + catalog.schemas["main"] = SchemaData( + info=SchemaInfo( + attach_id=placeholder_attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + ) + self._catalogs[name] = catalog + + def catalog_drop(self, *, name: str) -> None: + """Drop a catalog.""" + if name not in self._catalogs: + msg = f"Catalog {name!r} not found" + raise ValueError(msg) + # Remove any attachments to this catalog + to_remove = [aid for aid, cname in self._attachments.items() if cname == name] + for aid in to_remove: + del self._attachments[aid] + del self._catalogs[name] + + def schemas( + self, *, attach_id: AttachId, transaction_id: TransactionId | None + ) -> Iterable[SchemaInfo]: + """Get a list of schemas in the catalog.""" + catalog = self._get_catalog(attach_id) + result = [] + for schema_data in catalog.schemas.values(): + # Update the attach_id in the returned info + result.append( + SchemaInfo( + attach_id=attach_id, + name=schema_data.info.name, + is_default=schema_data.info.is_default, + comment=schema_data.info.comment, + tags=schema_data.info.tags, + ) + ) + return result + + def schema_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + comment: str | None, + tags: dict[str, str], + ) -> None: + """Create a new schema.""" + catalog = self._get_catalog(attach_id) + if name in catalog.schemas: + msg = f"Schema {name!r} already exists" + raise ValueError(msg) + catalog.schemas[name] = SchemaData( + info=SchemaInfo( + attach_id=attach_id, + name=name, + is_default=False, + comment=comment, + tags=tags, + ) + ) + self._increment_version(attach_id) + + def schema_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ignore_not_found: bool, + cascade: bool, + ) -> None: + """Drop a schema.""" + catalog = self._get_catalog(attach_id) + if name not in catalog.schemas: + if ignore_not_found: + return + msg = f"Schema {name!r} not found" + raise ValueError(msg) + schema_data = catalog.schemas[name] + if not cascade and (schema_data.tables or schema_data.views): + msg = f"Schema {name!r} is not empty, use CASCADE to drop" + raise ValueError(msg) + del catalog.schemas[name] + self._increment_version(attach_id) + + def schema_contents( + self, *, attach_id: AttachId, transaction_id: TransactionId | None, name: str + ) -> Iterable[TableInfo | ViewInfo | FunctionInfo]: + """Get the contents of a schema.""" + schema_data = self._get_schema(attach_id, name) + result: list[TableInfo | ViewInfo | FunctionInfo] = [] + for table_data in schema_data.tables.values(): + result.append(table_data.info) + for view_data in schema_data.views.values(): + result.append(view_data.info) + return result + + def table_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + columns: SerializedSchema, + on_conflict: OnConflict, + not_null_constraints: list[int], + unique_constraints: list[list[int]], + check_constraints: list[str], + ) -> None: + """Create a new table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name in schema_data.tables: + if on_conflict == OnConflict.ERROR: + msg = f"Table {name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + schema_data.tables[name] = TableData( + info=TableInfo( + name=name, + schema_name=schema_name, + columns=columns, + not_null_constraints=not_null_constraints, + unique_constraints=unique_constraints, + check_constraints=check_constraints, + comment=None, + tags={}, + ) + ) + self._increment_version(attach_id) + + def table_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ignore_not_found: bool, + ) -> None: + """Drop a table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.tables: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + del schema_data.tables[name] + self._increment_version(attach_id) + + def table_comment_set( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + comment: str | None, + ignore_not_found: bool, + ) -> None: + """Set the comment for a table.""" + schema_data = self._get_schema(attach_id, schema_name) + table_data = schema_data.tables.get(name) + if table_data is None: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + # Create a new TableInfo with the updated comment + old_info = table_data.info + schema_data.tables[name] = TableData( + info=TableInfo( + name=old_info.name, + schema_name=old_info.schema_name, + columns=old_info.columns, + not_null_constraints=old_info.not_null_constraints, + unique_constraints=old_info.unique_constraints, + check_constraints=old_info.check_constraints, + comment=comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def table_rename( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + new_name: str, + ignore_not_found: bool, + ) -> None: + """Rename a table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.tables: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + if new_name in schema_data.tables: + msg = f"Table {new_name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + table_data = schema_data.tables.pop(name) + # Create new TableInfo with updated name + old_info = table_data.info + schema_data.tables[new_name] = TableData( + info=TableInfo( + name=new_name, + schema_name=old_info.schema_name, + columns=old_info.columns, + not_null_constraints=old_info.not_null_constraints, + unique_constraints=old_info.unique_constraints, + check_constraints=old_info.check_constraints, + comment=old_info.comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def view_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + definition: str, + on_conflict: OnConflict, + ) -> None: + """Create a new view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name in schema_data.views: + if on_conflict == OnConflict.ERROR: + msg = f"View {name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + schema_data.views[name] = ViewData( + info=ViewInfo( + name=name, + schema_name=schema_name, + definition=definition, + comment=None, + tags={}, + ) + ) + self._increment_version(attach_id) + + def view_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ignore_not_found: bool, + ) -> None: + """Drop a view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.views: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + del schema_data.views[name] + self._increment_version(attach_id) + + def view_rename( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + new_name: str, + ignore_not_found: bool, + ) -> None: + """Rename a view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.views: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + if new_name in schema_data.views: + msg = f"View {new_name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + view_data = schema_data.views.pop(name) + # Create new ViewInfo with updated name + old_info = view_data.info + schema_data.views[new_name] = ViewData( + info=ViewInfo( + name=new_name, + schema_name=old_info.schema_name, + definition=old_info.definition, + comment=old_info.comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def view_comment_set( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + comment: str | None, + ignore_not_found: bool, + ) -> None: + """Set the comment for a view.""" + schema_data = self._get_schema(attach_id, schema_name) + view_data = schema_data.views.get(name) + if view_data is None: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + # Create a new ViewInfo with the updated comment + old_info = view_data.info + schema_data.views[name] = ViewData( + info=ViewInfo( + name=old_info.name, + schema_name=old_info.schema_name, + definition=old_info.definition, + comment=comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + +class InMemoryCatalogWorker(Worker): + """Example worker with InMemoryCatalog support.""" + + catalog_interface = InMemoryCatalog + functions = [] # No functions, just catalog support + + +def main() -> None: + """Run the in-memory catalog worker process.""" + InMemoryCatalogWorker().run() + + +if __name__ == "__main__": + main()