From ac2dc549e17db6ed411793a9de113da3aa5ce069 Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Mon, 5 Jan 2026 20:01:29 -0500 Subject: [PATCH] Add InMemoryCatalog example implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create an in-memory catalog implementation for testing and as a reference: - InMemoryCatalog class implementing CatalogInterface - In-memory storage using Python dictionaries for catalogs, schemas, tables, views - UUID-based attach_id generation - Supports DDL operations: catalog/schema/table/view create/drop/rename - Does not support transactions (returns None) - Catalog versioning for change tracking Also adds: - InMemoryCatalogWorker class with catalog_interface set - Entry point: vgi-example-catalog-worker 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .beads/issues.jsonl | 10 +- pyproject.toml | 1 + vgi/examples/catalog.py | 591 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 597 insertions(+), 5 deletions(-) create mode 100644 vgi/examples/catalog.py diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 13ec848..84ce119 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -8,7 +8,7 @@ {"id":"vgi-python-34l","title":"Create catalog integration tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_types.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_types.py:\n- Test all dataclass instantiation\n- Test frozen immutability\n- Test type alias usage\n\ntest_serialization.py:\n- Round-trip tests for all types\n- Edge cases: empty strings, empty lists, None values\n- Invalid schema rejection\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Mock worker tests for each client method\n- Error handling tests\n- Streaming response tests\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query, detach\n- Schema operations\n- Table operations\n- Error propagation\n\nProtocol conformance tests:\n- Invalid schemas (wrong column types)\n- Missing required columns\n- Multi-row input batches (should fail)\n- Extra columns (should be ignored)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:24.163718-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.050722-05:00","closed_at":"2026-01-05T19:21:50.050722-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-e46","type":"blocks","created_at":"2026-01-05T19:18:44.818195-05:00","created_by":"rusty"},{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-vxy","type":"blocks","created_at":"2026-01-05T19:18:48.578947-05:00","created_by":"rusty"},{"issue_id":"vgi-python-34l","depends_on_id":"vgi-python-nju","type":"blocks","created_at":"2026-01-05T19:18:48.686098-05:00","created_by":"rusty"}]} {"id":"vgi-python-35i","title":"Test SchemaValidationError detailed message paths","notes":"Coverage: 67% in vgi/exceptions.py. Missing tests for:\n- Lines 116-123: Type mismatch detection in _build_detailed_message\n- Lines 128-131: Field order difference detection \n- Lines 149-151: Type mismatch reporting\n- Lines 155-157: Field order difference reporting\n\nTest scenarios needed:\n1. Schema with same fields but different types\n2. Schema with nullable vs non-nullable mismatch\n3. Schema with same fields in different order","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T22:15:25.858704-05:00","created_by":"rusty","updated_at":"2026-01-04T22:25:58.697852-05:00","closed_at":"2026-01-04T22:25:58.697852-05:00","close_reason":"Added comprehensive tests for SchemaValidationError. Coverage improved from 67% to 99%."} {"id":"vgi-python-36f","title":"Split metadata.py Arrow serialization into separate module","description":"metadata.py is 932 lines with two distinct concerns: 1) metadata resolution (enums, dataclasses, parameter extraction, resolve_metadata) and 2) Arrow serialization (schema definitions, to_arrow/from_arrow functions). Split Arrow serialization into metadata_serialization.py or metadata_arrow.py for better separation of concerns.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.481364-05:00","created_by":"rusty","updated_at":"2026-01-04T22:01:52.100108-05:00","closed_at":"2026-01-04T22:01:52.100108-05:00","close_reason":"Not warranted - metadata.py is well-organized with clear section headers. Arrow serialization (~165 lines) is tightly coupled to data classes (uses their to_dict/from_dict methods). Splitting would add import complexity without significant benefit."} -{"id":"vgi-python-3bn","title":"Fix schema_contents() default implementation","description":"Implement the default schema_contents() method that has a FIXME comment.\n\nFile: vgi/catalog/catalog_interface.py\n\nCurrent state: \n```python\ndef schema_contents(...) -\u003e Iterable[TableInfo | ViewInfo | FunctionInfo]:\n # FIXME: write this implementation for the worker.\n```\n\nThe default implementation should:\n1. Access the Worker's registered functions\n2. Convert each function's metadata to FunctionInfo\n3. Return them as the contents of the 'main' schema\n\nThis requires:\n- Access to Worker.functions registry\n- Converting function metadata to FunctionInfo format\n- May need to pass worker reference to CatalogInterface\n\nAlternative: Just raise NotImplementedError and require subclasses to implement.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.482694-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:27.482694-05:00"} +{"id":"vgi-python-3bn","title":"Fix schema_contents() default implementation","description":"Implement the default schema_contents() method that has a FIXME comment.\n\nFile: vgi/catalog/catalog_interface.py\n\nCurrent state: \n```python\ndef schema_contents(...) -\u003e Iterable[TableInfo | ViewInfo | FunctionInfo]:\n # FIXME: write this implementation for the worker.\n```\n\nThe default implementation should:\n1. Access the Worker's registered functions\n2. Convert each function's metadata to FunctionInfo\n3. Return them as the contents of the 'main' schema\n\nThis requires:\n- Access to Worker.functions registry\n- Converting function metadata to FunctionInfo format\n- May need to pass worker reference to CatalogInterface\n\nAlternative: Just raise NotImplementedError and require subclasses to implement.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.482694-05:00","created_by":"rusty","updated_at":"2026-01-05T19:59:02.922654-05:00","closed_at":"2026-01-05T19:59:02.922654-05:00","close_reason":"Already fixed in PR #24 - now raises NotImplementedError"} {"id":"vgi-python-3fq","title":"Abstract common worker batch processing logic","description":"Worker batch processing methods _process_scalar_batches (377-466), _process_batches (468-550), and _generate_batches (552-593) share significant structure: IPC writer/reader setup, batch counting/logging, main processing loop. Extract common logic to reduce duplication - consider a BatchProcessor helper class or template method pattern.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:53.350497-05:00","created_by":"rusty","updated_at":"2026-01-04T21:20:24.509785-05:00","closed_at":"2026-01-04T21:20:24.509785-05:00","close_reason":"Analysis complete: abstraction not warranted. The three methods have sufficiently different logic (input handling, log message loops, protocol types) that abstracting them would add complexity without meaningful benefit. Current code is already readable at ~70-90 lines each."} {"id":"vgi-python-4mg","title":"Add InvocationType.CATALOG to vgi/invocation.py","description":"Extend InvocationType enum to support catalog invocations.\n\nFiles to modify:\n- vgi/invocation.py\n\nChanges:\n1. Add CATALOG = 'catalog' to InvocationType enum\n2. Update docstrings to document the new type\n3. Ensure serialization/deserialization handles the new value\n\nThe CATALOG invocation type indicates the function_name field contains a CatalogInterface method name (e.g., 'catalog_attach', 'schemas', 'table_get').\n\nTest that CATALOG type serializes and deserializes correctly.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:17:45.649509-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.062449-05:00","closed_at":"2026-01-05T19:21:50.062449-05:00","close_reason":"User requested closure"} {"id":"vgi-python-5er","title":"Extract _should_terminate into shared base class","description":"Identical _should_terminate method is copy-pasted in all three function modules. Implementation is always: check if log_message exists and level is EXCEPTION. Move to shared base class (Function or new ProcessingMixin) to eliminate duplication.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.190482-05:00","created_by":"rusty","updated_at":"2026-01-04T21:49:59.765614-05:00","closed_at":"2026-01-04T21:49:59.765614-05:00","close_reason":"Completed as part of PR #8 - _should_terminate moved to Function base class","dependencies":[{"issue_id":"vgi-python-5er","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.283865-05:00","created_by":"rusty"}]} @@ -19,7 +19,7 @@ {"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T21:53:26.965345-05:00","closed_at":"2026-01-04T21:53:26.965345-05:00","close_reason":"PR #9 created - unified ProtocolInput with shared base in protocol_types.py"} {"id":"vgi-python-8gz","title":"VGI Catalog Interface Implementation","description":"Complete the VGI Catalog Interface implementation to enable DuckDB ATTACH support.\n\nThe CatalogInterface ABC is already implemented in vgi/catalog/catalog_interface.py.\n\nRemaining work:\n- Add serialize/deserialize methods to dataclasses\n- Add InvocationType.CATALOG to protocol\n- Worker integration for catalog dispatch \n- CatalogClient class (new worker per call pattern)\n- Optional SQLite-based catalog storage\n- Example InMemoryCatalog\n- Tests\n\nSee: catalog-plan.md","status":"open","priority":1,"issue_type":"feature","created_at":"2026-01-05T19:26:27.348627-05:00","created_by":"rusty","updated_at":"2026-01-05T19:26:27.348627-05:00"} {"id":"vgi-python-8ra","title":"Implement Arrow-based argument specification serialization","description":"## Overview\n\nImplement serialization and deserialization of function argument specifications using Apache Arrow schemas. This enables functions to describe their argument signatures (types, positions, special markers) in a format that can be transmitted over IPC and understood by DuckDB for function registration.\n\n## Design\n\nUses a **single Arrow schema** where:\n- Positional arguments come first (field order = position index)\n- Named arguments follow (marked with `vgi_arg=named` metadata)\n- Special types (TableInput, AnyArrow, varargs) use field metadata markers\n\n## Key Components\n\n1. `ArgumentSpec` dataclass - represents one argument's specification\n2. `argument_specs_to_schema()` - convert specs to Arrow schema\n3. `schema_to_argument_specs()` - convert schema back to specs\n4. `extract_argument_specs()` - extract specs from function class Arg descriptors\n\n## Metadata Keys\n\n| Key | Value | Meaning |\n|-----|-------|---------|\n| `vgi_arg` | `named` | Named argument (not positional) |\n| `vgi_type` | `table` | Receives table input (Arg[TableInput]) |\n| `vgi_type` | `any` | Accepts any Arrow type (Arg[AnyArrow]) |\n| `vgi_varargs` | `true` | Collects remaining positional args |\n\n## References\n\n- Plan file: `.claude/plans/purrfect-foraging-nygaard.md`\n- Arguments module: `vgi/arguments.py`","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-05T11:18:01.05631-05:00","created_by":"rusty","updated_at":"2026-01-05T11:34:12.712096-05:00","closed_at":"2026-01-05T11:34:12.712096-05:00","close_reason":"Implemented Arrow-based argument specification serialization with tests and documentation"} -{"id":"vgi-python-9j7","title":"Add catalog dispatch to Worker class","description":"Integrate CatalogInterface handling into Worker class.\n\nFile: vgi/worker.py\n\nChanges:\n1. Add catalog_interface class attribute: type[CatalogInterface] | None = None\n\n2. In run() method, detect InvocationType.CATALOG and dispatch to _handle_catalog_invocation()\n\n3. Implement _handle_catalog_invocation(invocation: Invocation):\n - Check catalog_interface is not None (raise ValueError if missing)\n - Instantiate catalog_interface class\n - Get method from function_name field (e.g., 'catalog_attach')\n - Deserialize arguments from input batch (column names → kwargs)\n - Call method with keyword arguments\n - Serialize and stream result back\n\n4. Key protocol difference: No bind→init→stream phases, just invoke→stream\n\n5. Handle different return types:\n - None → 0-row/0-column batch\n - Dataclass → serialize to single-row batch\n - Iterable → stream multiple batches\n\n6. Error handling: Return exceptions as EXCEPTION log messages (same as functions)","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.845071-05:00","created_by":"rusty","updated_at":"2026-01-05T19:41:11.77842-05:00","dependencies":[{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.589219-05:00","created_by":"rusty"},{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.620681-05:00","created_by":"rusty"}]} +{"id":"vgi-python-9j7","title":"Add catalog dispatch to Worker class","description":"Integrate CatalogInterface handling into Worker class.\n\nFile: vgi/worker.py\n\nChanges:\n1. Add catalog_interface class attribute: type[CatalogInterface] | None = None\n\n2. In run() method, detect InvocationType.CATALOG and dispatch to _handle_catalog_invocation()\n\n3. Implement _handle_catalog_invocation(invocation: Invocation):\n - Check catalog_interface is not None (raise ValueError if missing)\n - Instantiate catalog_interface class\n - Get method from function_name field (e.g., 'catalog_attach')\n - Deserialize arguments from input batch (column names → kwargs)\n - Call method with keyword arguments\n - Serialize and stream result back\n\n4. Key protocol difference: No bind→init→stream phases, just invoke→stream\n\n5. Handle different return types:\n - None → 0-row/0-column batch\n - Dataclass → serialize to single-row batch\n - Iterable → stream multiple batches\n\n6. Error handling: Return exceptions as EXCEPTION log messages (same as functions)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.845071-05:00","created_by":"rusty","updated_at":"2026-01-05T19:44:05.99412-05:00","closed_at":"2026-01-05T19:44:05.99412-05:00","close_reason":"PR #26 created with Worker catalog dispatch","dependencies":[{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.589219-05:00","created_by":"rusty"},{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.620681-05:00","created_by":"rusty"}]} {"id":"vgi-python-9ql","title":"VGI Catalog Interface Implementation","description":"Add CatalogInterface ABC that lets VGI workers expose catalogs (databases), schemas, tables, views, and functions. Enables DuckDB ATTACH command support via VGI workers.\n\nKey components:\n- Type aliases and dataclasses (AttachId, TransactionId, SchemaInfo, TableInfo, etc.)\n- CatalogInterface abstract base class with ~40 methods\n- InvocationType.CATALOG for protocol dispatch\n- CatalogClient for client-side operations\n- Worker integration for catalog invocation handling\n\nSee: catalog-plan.md and ~/.claude/plans/iterative-waddling-adleman.md","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-05T19:16:41.100846-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.071596-05:00","closed_at":"2026-01-05T19:21:50.071596-05:00","close_reason":"User requested closure"} {"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]} {"id":"vgi-python-a9i","title":"Add test coverage for worker error paths and edge cases","notes":"Coverage: 88% in vgi/worker.py. Missing tests for:\n- Lines 161, 310, 320, 322, 325, 333: Function lookup edge cases\n- Lines 394, 408: Validation error paths\n- Lines 442-448: Log message handling loop\n- Lines 618-621, 652, 660, 681: Table function edge cases\n\nThese are error handling and edge case paths in the worker protocol.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T22:15:33.888277-05:00","created_by":"rusty","updated_at":"2026-01-04T22:34:34.795304-05:00","closed_at":"2026-01-04T22:34:34.795304-05:00","close_reason":"Added tests for registry caching and _suggest_similar_names. Coverage improved from 88% to 92%. Remaining uncovered lines are deep protocol paths in subprocess handling."} @@ -38,13 +38,13 @@ {"id":"vgi-python-cvj","title":"Add PYTHON_TO_ARROW type mapping to vgi/arguments.py","description":"Add the Python→Arrow type mapping dict after imports:\n```python\nPYTHON_TO_ARROW: dict[type, pa.DataType] = {\n int: pa.int64(),\n str: pa.utf8(),\n float: pa.float64(),\n bool: pa.bool_(),\n bytes: pa.binary(),\n}\n```\nExport in __all__.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:37.900421-05:00","created_by":"rusty","updated_at":"2026-01-05T15:48:42.422086-05:00","closed_at":"2026-01-05T15:48:42.422086-05:00","close_reason":"PR #18 created"} {"id":"vgi-python-d73","title":"Create docs/argument-serialization.md","description":"## Overview\n\nCreate LLM-friendly documentation explaining the argument specification serialization format. This document should enable future implementors (human or AI) to understand how function argument signatures are serialized to Arrow schemas.\n\n## File Location\n\n`docs/argument-serialization.md`\n\n## Document Structure\n\n### Title and Purpose\n\nExplain that this document describes how VGI function argument specifications are serialized to Apache Arrow schemas for IPC transmission and DuckDB function registration.\n\n### Quick Reference\n\nA concise summary table showing:\n- Metadata keys and their meanings\n- Special type representations\n\n### Schema Format\n\nExplain the single-schema design:\n1. All arguments are fields in one Arrow schema\n2. Positional arguments come first, in order (field index = position index)\n3. Named arguments follow, marked with metadata\n4. Field name = Python attribute name (or argument key for named)\n5. Field type = exact Arrow type\n\n### Metadata Keys Reference\n\nComplete table of all metadata keys:\n\n| Key | Value | Description |\n|-----|-------|-------------|\n| `vgi_arg` | `named` | Field is a named argument, not positional. The field name is the argument key. |\n| `vgi_type` | `table` | Argument receives streaming table input (Arg[TableInput]). Arrow type is pa.null(). |\n| `vgi_type` | `any` | Argument accepts any Arrow type (Arg[AnyArrow]). Arrow type is pa.null(). |\n| `vgi_varargs` | `true` | Argument collects all remaining positional args. Arrow type is the element type. |\n\n### Special Type Handling\n\nExplain how special argument types are represented:\n\n#### TableInput\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"table\"}`\n- Meaning: This position receives streaming RecordBatches, not a scalar value\n\n#### AnyArrow\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"any\"}`\n- Meaning: Accepts any valid Arrow scalar type at runtime\n\n#### Varargs\n- Arrow type: The element type (e.g., `pa.int64()` for `Arg[int](..., varargs=True)`)\n- Metadata: `{b\"vgi_varargs\": b\"true\"}`\n- Meaning: Collects all remaining positional arguments from this position onwards\n\n### Examples\n\n#### Example 1: Simple Function\n\n```python\nclass MyFunction(TableInOutFunction):\n count = Arg[int](0) # Positional 0\n name = Arg[str](1) # Positional 1\n verbose = Arg[bool](\"verbose\") # Named\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"name\", pa.utf8()),\n pa.field(\"verbose\", pa.bool_(), metadata={b\"vgi_arg\": b\"named\"}),\n])\n```\n\n#### Example 2: Function with Table Input\n\n```python\nclass TransformFunction(TableInOutFunction):\n multiplier = Arg[float](0)\n data = Arg[TableInput](1)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"multiplier\", pa.float64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n])\n```\n\n#### Example 3: Function with Varargs\n\n```python\nclass SumFunction(TableInOutFunction):\n columns = Arg[str](0, varargs=True)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"columns\", pa.utf8(), metadata={b\"vgi_varargs\": b\"true\"}),\n])\n```\n\n#### Example 4: Complex Function\n\n```python\nclass ComplexFunction(TableInOutFunction):\n count = Arg[int](0)\n data = Arg[TableInput](1)\n extra = Arg[float](2, varargs=True)\n format = Arg[str](\"format\")\n threshold = Arg[AnyArrow](\"threshold\")\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n pa.field(\"extra\", pa.float64(), metadata={b\"vgi_varargs\": b\"true\"}),\n pa.field(\"format\", pa.utf8(), metadata={b\"vgi_arg\": b\"named\"}),\n pa.field(\"threshold\", pa.null(), metadata={b\"vgi_arg\": b\"named\", b\"vgi_type\": b\"any\"}),\n])\n```\n\n### Serialization Code\n\nShow how to serialize and deserialize:\n\n```python\n# Serialize to bytes\nschema_bytes = schema.serialize().to_pybytes()\n\n# Deserialize from bytes\nschema = pa.ipc.read_schema(pa.py_buffer(schema_bytes))\n```\n\n### Parsing Algorithm\n\nExplain how to parse a schema back to argument specs:\n\n1. Initialize position_index = 0\n2. For each field in schema:\n a. Check if field has `vgi_arg=named` metadata\n b. If named: position = field.name (string)\n c. If positional: position = position_index, then increment position_index\n d. Check for `vgi_type` metadata (table or any)\n e. Check for `vgi_varargs` metadata\n f. Create ArgumentSpec with extracted info\n\n### Not Included\n\nExplicitly state what is NOT serialized:\n- Default values\n- Validation constraints (ge, le, choices, pattern)\n- Documentation strings\n\nThese are Python-side concerns handled by the Arg descriptor at runtime.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:19:17.488877-05:00","created_by":"rusty","updated_at":"2026-01-05T11:33:29.168007-05:00","closed_at":"2026-01-05T11:33:29.168007-05:00","close_reason":"Created comprehensive LLM-friendly documentation","dependencies":[{"issue_id":"vgi-python-d73","depends_on_id":"vgi-python-8ra","type":"blocks","created_at":"2026-01-05T11:19:30.820384-05:00","created_by":"rusty"}]} {"id":"vgi-python-dv0","title":"Add arrow_type parameter to Arg class","description":"In vgi/arguments.py:\n1. Add 'arrow_type' to __slots__\n2. Add parameter: arrow_type: pa.DataType | None = None\n3. Store: self.arrow_type = arrow_type\n4. Update __repr__ to include arrow_type if set","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:38.020395-05:00","created_by":"rusty","updated_at":"2026-01-05T15:50:51.513273-05:00","closed_at":"2026-01-05T15:50:51.513273-05:00","close_reason":"PR #19 created","dependencies":[{"issue_id":"vgi-python-dv0","depends_on_id":"vgi-python-cvj","type":"blocks","created_at":"2026-01-05T15:45:13.696822-05:00","created_by":"rusty"}]} -{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:15.199453-05:00"} +{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:57:59.2381-05:00","closed_at":"2026-01-05T19:57:59.2381-05:00","close_reason":"PR #28 created"} {"id":"vgi-python-dvo","title":"Export AnyValue in vgi/__init__.py","description":"Import and add AnyValue to __all__ exports","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:41:41.65732-05:00","created_by":"rusty","updated_at":"2026-01-05T11:07:09.187969-05:00","closed_at":"2026-01-05T11:07:09.187969-05:00","close_reason":"Exported AnyArrow in vgi/__init__.py","dependencies":[{"issue_id":"vgi-python-dvo","depends_on_id":"vgi-python-ckg","type":"blocks","created_at":"2026-01-05T10:41:48.715634-05:00","created_by":"rusty"}]} {"id":"vgi-python-e37","title":"move Invocation from function.py out to own file","description":"The Invocation clas is kind of seperate from functions, so it should be in its own file. Move it and all of its other associated classes like InvocationType to its own file","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:18:46.605941-05:00","created_by":"rusty","updated_at":"2026-01-04T09:24:37.922675-05:00","closed_at":"2026-01-04T09:24:37.922675-05:00","close_reason":"Closed"} {"id":"vgi-python-e46","title":"Create vgi/catalog/client.py - CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFiles to create:\n- vgi/catalog/client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Context manager support (__enter__, __exit__)\n- start() / stop() methods\n\nCore methods (mirroring CatalogInterface):\n- catalogs() -\u003e list[str]\n- attach(name, options) -\u003e CatalogAttachResult\n- detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(attach_id, transaction_id, name) -\u003e SchemaInfo | None\n- schema_contents(attach_id, transaction_id, name) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- function_get(...) -\u003e FunctionInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (optional, may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_create, table_drop, table_rename, etc.\n- view_create, view_drop, view_rename, etc.\n\nTransaction methods:\n- transaction_begin(attach_id) -\u003e TransactionId | None\n- transaction_commit(attach_id, transaction_id)\n- transaction_rollback(attach_id, transaction_id)\n\nInternal methods:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation\n- _deserialize_result(batch, return_type) -\u003e Any\n\nHandle:\n- Streaming responses for Iterable returns\n- Exception propagation from worker\n- None returns (0-row/0-column batches)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:04.65125-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.055794-05:00","closed_at":"2026-01-05T19:21:50.055794-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.316642-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-fd2","type":"blocks","created_at":"2026-01-05T19:18:44.440065-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-4mg","type":"blocks","created_at":"2026-01-05T19:18:44.559963-05:00","created_by":"rusty"}]} -{"id":"vgi-python-e6o","title":"Implement CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFile: vgi/client/catalog_client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Each method call spawns new worker (matches VGI short-lived pattern)\n\nCore methods mirroring CatalogInterface:\n- catalogs() -\u003e list[str]\n- catalog_attach(name, options) -\u003e CatalogAttachResult\n- catalog_detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(...) -\u003e SchemaInfo | None\n- schema_contents(...) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_* methods, view_* methods\n\nTransaction methods:\n- catalog_transaction_begin/commit/rollback\n\nInternal:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation (with InvocationType.CATALOG)\n- Uses existing IPC utilities for communication","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.975309-05:00","created_by":"rusty","updated_at":"2026-01-05T19:44:28.358853-05:00","dependencies":[{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.730122-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.762036-05:00","created_by":"rusty"}]} +{"id":"vgi-python-e6o","title":"Implement CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFile: vgi/client/catalog_client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Each method call spawns new worker (matches VGI short-lived pattern)\n\nCore methods mirroring CatalogInterface:\n- catalogs() -\u003e list[str]\n- catalog_attach(name, options) -\u003e CatalogAttachResult\n- catalog_detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(...) -\u003e SchemaInfo | None\n- schema_contents(...) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_* methods, view_* methods\n\nTransaction methods:\n- catalog_transaction_begin/commit/rollback\n\nInternal:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation (with InvocationType.CATALOG)\n- Uses existing IPC utilities for communication","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.975309-05:00","created_by":"rusty","updated_at":"2026-01-05T19:48:33.366915-05:00","closed_at":"2026-01-05T19:48:33.366915-05:00","close_reason":"PR #27 created with CatalogClient implementation","dependencies":[{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.730122-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.762036-05:00","created_by":"rusty"}]} {"id":"vgi-python-e9q","title":"Unify ProtocolOutput classes with shared base","description":"ProtocolOutput classes in table_function.py:177-224 and table_in_out_function.py:144-207 share similar metadata() method and from_process_result() classmethod. The table_in_out version adds status field. Create shared base with table_in_out extending it for status support.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.45014-05:00","created_by":"rusty","updated_at":"2026-01-04T21:54:55.871986-05:00","closed_at":"2026-01-04T21:54:55.871986-05:00","close_reason":"Not warranted - dataclass inheritance with slots=True doesn't allow adding required field (status) between inherited fields. The classes have different semantics (table_in_out requires status for generator state tracking) making inheritance impractical."} -{"id":"vgi-python-eg7","title":"Create InMemoryCatalog example implementation","description":"Create an in-memory catalog implementation for testing and as an example.\n\nFile: vgi/examples/catalog.py\n\nInMemoryCatalog(CatalogInterface):\n- In-memory storage using dicts\n- Implements all required abstract methods\n- Implements common optional methods (schema_create, table_create, etc.)\n- Generates attach_id as random UUID bytes\n- Does NOT support transactions (returns None)\n\nData structures:\n- _catalogs: dict[str, CatalogData]\n- _attachments: dict[AttachId, str] # attach_id -\u003e catalog_name\n\nCreate example worker:\n```python\nclass InMemoryCatalogWorker(Worker):\n catalog_interface = InMemoryCatalog\n```\n\nAdd entry point: vgi-example-catalog-worker","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.604912-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:27.604912-05:00","dependencies":[{"issue_id":"vgi-python-eg7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.87322-05:00","created_by":"rusty"}]} +{"id":"vgi-python-eg7","title":"Create InMemoryCatalog example implementation","description":"Create an in-memory catalog implementation for testing and as an example.\n\nFile: vgi/examples/catalog.py\n\nInMemoryCatalog(CatalogInterface):\n- In-memory storage using dicts\n- Implements all required abstract methods\n- Implements common optional methods (schema_create, table_create, etc.)\n- Generates attach_id as random UUID bytes\n- Does NOT support transactions (returns None)\n\nData structures:\n- _catalogs: dict[str, CatalogData]\n- _attachments: dict[AttachId, str] # attach_id -\u003e catalog_name\n\nCreate example worker:\n```python\nclass InMemoryCatalogWorker(Worker):\n catalog_interface = InMemoryCatalog\n```\n\nAdd entry point: vgi-example-catalog-worker","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.604912-05:00","created_by":"rusty","updated_at":"2026-01-05T19:59:13.842949-05:00","dependencies":[{"issue_id":"vgi-python-eg7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.87322-05:00","created_by":"rusty"}]} {"id":"vgi-python-f5z","title":"Create vgi/catalog/storage.py - Catalog persistence","description":"Create storage layer for catalog attach_id and transaction_id persistence.\n\nFiles to create:\n- vgi/catalog/storage.py\n\nCatalogStorage protocol (similar to FunctionStorage):\n- attach_put(attach_id, catalog_name, options) -\u003e None\n- attach_get(attach_id) -\u003e tuple[str, dict] | None\n- attach_delete(attach_id) -\u003e None\n- attach_list() -\u003e list[AttachId]\n\n- transaction_put(transaction_id, attach_id, state) -\u003e None\n- transaction_get(transaction_id) -\u003e tuple[AttachId, bytes] | None\n- transaction_delete(transaction_id) -\u003e None\n\nCatalogStorageSqlite implementation:\n- Default location: ~/.state/vgi/vgi_catalog.db\n- WAL mode for concurrent access\n- Schema:\n CREATE TABLE catalog_attachments (\n attach_id BLOB PRIMARY KEY,\n catalog_name TEXT NOT NULL,\n options TEXT, -- JSON\n created_at REAL DEFAULT (julianday('now'))\n )\n CREATE TABLE catalog_transactions (\n transaction_id BLOB PRIMARY KEY,\n attach_id BLOB NOT NULL,\n state BLOB,\n created_at REAL DEFAULT (julianday('now'))\n )\n\nInclude cleanup strategies for stale attachments/transactions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:18:04.531387-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.073983-05:00","closed_at":"2026-01-05T19:21:50.073983-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-f5z","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.194468-05:00","created_by":"rusty"}]} {"id":"vgi-python-fd2","title":"Create vgi/catalog/serialization.py - Arrow serialization","description":"Create Arrow IPC serialization for all catalog types.\n\nFiles to create:\n- vgi/catalog/serialization.py\n\nArrow schemas for:\n- CatalogAttachResult: attach_id, supports_transactions, supports_time_travel, catalog_version_frozen, catalog_version\n- SchemaInfo: attach_id, name, is_default, comment, tags\n- TableInfo: name, schema_name, columns, primary_key_columns, not_null_constraints, unique_constraints, check_constraints, comment, tags\n- ViewInfo: name, schema_name, definition, comment, tags\n- FunctionInfo: name, schema_name, function_type, arguments, output_schema, comment, tags\n- ScanFunctionResult: function_name, max_processes, invocation_id\n\nFunctions:\n- serialize_\u003ctype\u003e() -\u003e bytes for each type\n- deserialize_\u003ctype\u003e(batch) -\u003e Type for each type\n- Arrow schema constants for each type\n\nSerialization convention:\n- Single-row batches for scalar returns\n- Multi-row batches for streaming (Iterable returns)\n- None = 0-row/0-column batch\n- Empty list = 0-row batch with schema\n\nInclude round-trip serialization tests for all types.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:17:15.404739-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.068663-05:00","closed_at":"2026-01-05T19:21:50.068663-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-fd2","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:36.318762-05:00","created_by":"rusty"}]} {"id":"vgi-python-g1m","title":"Use sentinel type pattern instead of Any for _MISSING in arguments.py","notes":"Line 33: _MISSING: Any = object()\n\nReplace with proper sentinel type pattern:\n```python\nfrom typing import Final\n\nclass _Missing:\n __slots__ = ()\n def __repr__(self) -\u003e str:\n return '\u003cMISSING\u003e'\n\nMISSING: Final = _Missing()\n```\n\nThis removes 1 Any and provides better type safety for default value checking.\nPart of 26.89% imprecision in arguments.py (59 Anys total).","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-04T22:19:50.079174-05:00","created_by":"rusty","updated_at":"2026-01-04T22:35:56.508153-05:00","closed_at":"2026-01-04T22:35:56.508153-05:00","close_reason":"Replaced _MISSING: Any = object() with proper _MissingType sentinel class. Improves type safety and removes 1 Any."} diff --git a/pyproject.toml b/pyproject.toml index 6c39f3b..f06edb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dev = ["mypy", "pyarrow-stubs", "pytest", "pytest-cov", "pytest-mypy", "pytest-r [project.scripts] vgi-client = "vgi.client.cli:main" vgi-example-worker = "vgi.examples.worker:main" +vgi-example-catalog-worker = "vgi.examples.catalog:main" [build-system] requires = ["hatchling"] diff --git a/vgi/examples/catalog.py b/vgi/examples/catalog.py new file mode 100644 index 0000000..371b51e --- /dev/null +++ b/vgi/examples/catalog.py @@ -0,0 +1,591 @@ +"""In-memory catalog implementation for testing and examples. + +This module provides an in-memory implementation of CatalogInterface that can +be used for testing and as a reference implementation. +""" + +from __future__ import annotations + +import uuid +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import Any + +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + CatalogInterface, + FunctionInfo, + OnConflict, + SchemaInfo, + SerializedSchema, + TableInfo, + TransactionId, + ViewInfo, +) +from vgi.worker import Worker + + +@dataclass +class TableData: + """In-memory storage for table metadata.""" + + info: TableInfo + + +@dataclass +class ViewData: + """In-memory storage for view metadata.""" + + info: ViewInfo + + +@dataclass +class SchemaData: + """In-memory storage for schema metadata.""" + + info: SchemaInfo + tables: dict[str, TableData] = field(default_factory=dict) + views: dict[str, ViewData] = field(default_factory=dict) + + +@dataclass +class CatalogData: + """In-memory storage for catalog metadata.""" + + name: str + schemas: dict[str, SchemaData] = field(default_factory=dict) + version: int = 1 + + +class InMemoryCatalog(CatalogInterface): + """In-memory catalog implementation for testing. + + This implementation stores all catalog, schema, table, and view data + in memory using Python dictionaries. It supports basic DDL operations + but does not support transactions. + + Attach IDs are generated as random UUIDs. + """ + + def __init__(self) -> None: + """Initialize the in-memory catalog.""" + # Maps catalog name -> CatalogData + self._catalogs: dict[str, CatalogData] = {} + # Maps attach_id -> catalog_name + self._attachments: dict[AttachId, str] = {} + # Create default "memory" catalog with "main" schema + self._create_default_catalog() + + def _create_default_catalog(self) -> None: + """Create the default memory catalog with main schema.""" + catalog = CatalogData(name="memory") + # Create a placeholder attach_id for internal use + placeholder_attach_id = AttachId(b"\x00" * 16) + catalog.schemas["main"] = SchemaData( + info=SchemaInfo( + attach_id=placeholder_attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + ) + self._catalogs["memory"] = catalog + + def _get_catalog(self, attach_id: AttachId) -> CatalogData: + """Get the catalog for the given attach_id.""" + catalog_name = self._attachments.get(attach_id) + if catalog_name is None: + msg = f"No catalog attached with id {attach_id!r}" + raise ValueError(msg) + catalog = self._catalogs.get(catalog_name) + if catalog is None: + msg = f"Catalog {catalog_name!r} not found" + raise ValueError(msg) + return catalog + + def _get_schema(self, attach_id: AttachId, schema_name: str) -> SchemaData: + """Get the schema for the given attach_id and schema name.""" + catalog = self._get_catalog(attach_id) + schema = catalog.schemas.get(schema_name) + if schema is None: + msg = f"Schema {schema_name!r} not found in catalog" + raise ValueError(msg) + return schema + + def _increment_version(self, attach_id: AttachId) -> None: + """Increment the catalog version after a modification.""" + catalog = self._get_catalog(attach_id) + catalog.version += 1 + + # Required abstract methods + + def catalogs(self) -> Iterable[str]: + """Get a list of catalog names.""" + return list(self._catalogs.keys()) + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to a catalog with the given name.""" + if name not in self._catalogs: + msg = f"Catalog {name!r} not found" + raise ValueError(msg) + + # Generate a unique attach_id + attach_id = AttachId(uuid.uuid4().bytes) + self._attachments[attach_id] = name + + catalog = self._catalogs[name] + return CatalogAttachResult( + attach_id=attach_id, + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=catalog.version, + ) + + def catalog_detach(self, *, attach_id: AttachId) -> None: + """Detach from the catalog.""" + self._attachments.pop(attach_id, None) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get information about a schema.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(name) + if schema_data is None: + return None + # Update the attach_id in the returned info + return SchemaInfo( + attach_id=attach_id, + name=schema_data.info.name, + is_default=schema_data.info.is_default, + comment=schema_data.info.comment, + tags=schema_data.info.tags, + ) + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get information about a table.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(schema_name) + if schema_data is None: + return None + table_data = schema_data.tables.get(name) + if table_data is None: + return None + return table_data.info + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get information about a view.""" + catalog = self._get_catalog(attach_id) + schema_data = catalog.schemas.get(schema_name) + if schema_data is None: + return None + view_data = schema_data.views.get(name) + if view_data is None: + return None + return view_data.info + + # Optional methods with implementations + + def catalog_version( + self, *, attach_id: AttachId, transaction_id: TransactionId | None + ) -> int: + """Get the current catalog version.""" + catalog = self._get_catalog(attach_id) + return catalog.version + + def catalog_create( + self, *, name: str, on_conflict: OnConflict, options: dict[str, Any] + ) -> None: + """Create a new catalog.""" + if name in self._catalogs: + if on_conflict == OnConflict.ERROR: + msg = f"Catalog {name!r} already exists" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + catalog = CatalogData(name=name) + # Create a placeholder attach_id for internal use + placeholder_attach_id = AttachId(b"\x00" * 16) + catalog.schemas["main"] = SchemaData( + info=SchemaInfo( + attach_id=placeholder_attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + ) + self._catalogs[name] = catalog + + def catalog_drop(self, *, name: str) -> None: + """Drop a catalog.""" + if name not in self._catalogs: + msg = f"Catalog {name!r} not found" + raise ValueError(msg) + # Remove any attachments to this catalog + to_remove = [aid for aid, cname in self._attachments.items() if cname == name] + for aid in to_remove: + del self._attachments[aid] + del self._catalogs[name] + + def schemas( + self, *, attach_id: AttachId, transaction_id: TransactionId | None + ) -> Iterable[SchemaInfo]: + """Get a list of schemas in the catalog.""" + catalog = self._get_catalog(attach_id) + result = [] + for schema_data in catalog.schemas.values(): + # Update the attach_id in the returned info + result.append( + SchemaInfo( + attach_id=attach_id, + name=schema_data.info.name, + is_default=schema_data.info.is_default, + comment=schema_data.info.comment, + tags=schema_data.info.tags, + ) + ) + return result + + def schema_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + comment: str | None, + tags: dict[str, str], + ) -> None: + """Create a new schema.""" + catalog = self._get_catalog(attach_id) + if name in catalog.schemas: + msg = f"Schema {name!r} already exists" + raise ValueError(msg) + catalog.schemas[name] = SchemaData( + info=SchemaInfo( + attach_id=attach_id, + name=name, + is_default=False, + comment=comment, + tags=tags, + ) + ) + self._increment_version(attach_id) + + def schema_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ignore_not_found: bool, + cascade: bool, + ) -> None: + """Drop a schema.""" + catalog = self._get_catalog(attach_id) + if name not in catalog.schemas: + if ignore_not_found: + return + msg = f"Schema {name!r} not found" + raise ValueError(msg) + schema_data = catalog.schemas[name] + if not cascade and (schema_data.tables or schema_data.views): + msg = f"Schema {name!r} is not empty, use CASCADE to drop" + raise ValueError(msg) + del catalog.schemas[name] + self._increment_version(attach_id) + + def schema_contents( + self, *, attach_id: AttachId, transaction_id: TransactionId | None, name: str + ) -> Iterable[TableInfo | ViewInfo | FunctionInfo]: + """Get the contents of a schema.""" + schema_data = self._get_schema(attach_id, name) + result: list[TableInfo | ViewInfo | FunctionInfo] = [] + for table_data in schema_data.tables.values(): + result.append(table_data.info) + for view_data in schema_data.views.values(): + result.append(view_data.info) + return result + + def table_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + columns: SerializedSchema, + on_conflict: OnConflict, + not_null_constraints: list[int], + unique_constraints: list[list[int]], + check_constraints: list[str], + ) -> None: + """Create a new table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name in schema_data.tables: + if on_conflict == OnConflict.ERROR: + msg = f"Table {name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + schema_data.tables[name] = TableData( + info=TableInfo( + name=name, + schema_name=schema_name, + columns=columns, + not_null_constraints=not_null_constraints, + unique_constraints=unique_constraints, + check_constraints=check_constraints, + comment=None, + tags={}, + ) + ) + self._increment_version(attach_id) + + def table_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ignore_not_found: bool, + ) -> None: + """Drop a table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.tables: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + del schema_data.tables[name] + self._increment_version(attach_id) + + def table_comment_set( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + comment: str | None, + ignore_not_found: bool, + ) -> None: + """Set the comment for a table.""" + schema_data = self._get_schema(attach_id, schema_name) + table_data = schema_data.tables.get(name) + if table_data is None: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + # Create a new TableInfo with the updated comment + old_info = table_data.info + schema_data.tables[name] = TableData( + info=TableInfo( + name=old_info.name, + schema_name=old_info.schema_name, + columns=old_info.columns, + not_null_constraints=old_info.not_null_constraints, + unique_constraints=old_info.unique_constraints, + check_constraints=old_info.check_constraints, + comment=comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def table_rename( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + new_name: str, + ignore_not_found: bool, + ) -> None: + """Rename a table.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.tables: + if ignore_not_found: + return + msg = f"Table {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + if new_name in schema_data.tables: + msg = f"Table {new_name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + table_data = schema_data.tables.pop(name) + # Create new TableInfo with updated name + old_info = table_data.info + schema_data.tables[new_name] = TableData( + info=TableInfo( + name=new_name, + schema_name=old_info.schema_name, + columns=old_info.columns, + not_null_constraints=old_info.not_null_constraints, + unique_constraints=old_info.unique_constraints, + check_constraints=old_info.check_constraints, + comment=old_info.comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def view_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + definition: str, + on_conflict: OnConflict, + ) -> None: + """Create a new view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name in schema_data.views: + if on_conflict == OnConflict.ERROR: + msg = f"View {name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + if on_conflict == OnConflict.IGNORE: + return + # REPLACE: fall through to create + + schema_data.views[name] = ViewData( + info=ViewInfo( + name=name, + schema_name=schema_name, + definition=definition, + comment=None, + tags={}, + ) + ) + self._increment_version(attach_id) + + def view_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ignore_not_found: bool, + ) -> None: + """Drop a view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.views: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + del schema_data.views[name] + self._increment_version(attach_id) + + def view_rename( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + new_name: str, + ignore_not_found: bool, + ) -> None: + """Rename a view.""" + schema_data = self._get_schema(attach_id, schema_name) + if name not in schema_data.views: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + if new_name in schema_data.views: + msg = f"View {new_name!r} already exists in schema {schema_name!r}" + raise ValueError(msg) + view_data = schema_data.views.pop(name) + # Create new ViewInfo with updated name + old_info = view_data.info + schema_data.views[new_name] = ViewData( + info=ViewInfo( + name=new_name, + schema_name=old_info.schema_name, + definition=old_info.definition, + comment=old_info.comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + def view_comment_set( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + comment: str | None, + ignore_not_found: bool, + ) -> None: + """Set the comment for a view.""" + schema_data = self._get_schema(attach_id, schema_name) + view_data = schema_data.views.get(name) + if view_data is None: + if ignore_not_found: + return + msg = f"View {name!r} not found in schema {schema_name!r}" + raise ValueError(msg) + # Create a new ViewInfo with the updated comment + old_info = view_data.info + schema_data.views[name] = ViewData( + info=ViewInfo( + name=old_info.name, + schema_name=old_info.schema_name, + definition=old_info.definition, + comment=comment, + tags=old_info.tags, + ) + ) + self._increment_version(attach_id) + + +class InMemoryCatalogWorker(Worker): + """Example worker with InMemoryCatalog support.""" + + catalog_interface = InMemoryCatalog + functions = [] # No functions, just catalog support + + +def main() -> None: + """Run the in-memory catalog worker process.""" + InMemoryCatalogWorker().run() + + +if __name__ == "__main__": + main()