From b415679b059b276980e6e0f97f58688d063a8def Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Mon, 5 Jan 2026 19:43:46 -0500 Subject: [PATCH] Add catalog dispatch to Worker class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Integrate CatalogInterface handling into Worker: - Add catalog_interface class attribute for configuring catalog support - Add dispatch for InvocationType.CATALOG in run() method - Implement _handle_catalog_invocation() for simplified catalog protocol Key features: - Catalog invocations use simplified protocol: invoke → stream (no bind/init phases) - function_name field contains CatalogInterface method name - Input batch has 1 row with columns matching method parameters - Handles None, serializable dataclasses, and iterables as return types 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .beads/issues.jsonl | 6 +-- vgi/worker.py | 104 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index b1817fa..b5000a2 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,4 +1,4 @@ -{"id":"vgi-python-085","title":"Add serialize/deserialize methods to catalog dataclasses","description":"Add Arrow IPC serialization directly to the dataclasses in vgi/catalog/catalog_interface.py.\n\nAdd to each dataclass:\n- serialize() -\u003e bytes method\n- @classmethod deserialize(batch: pa.RecordBatch) -\u003e Self method\n- Arrow schema class variable for each type\n\nDataclasses to update:\n- CatalogAttachResult\n- SchemaInfo \n- TableInfo\n- ViewInfo\n- FunctionInfo\n- ScanFunctionResult\n\nSerialization rules from plan:\n- Single-row batches for scalar returns\n- Multi-row batches for streaming (Iterable returns)\n- None = 0-row/0-column batch\n- Column names match field names exactly\n- SerializedSchema fields use pa.binary()\n- tags fields use pa.map_(pa.string(), pa.string())\n\nAlso create vgi/catalog/__init__.py with package exports.","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:40.362177-05:00","created_by":"rusty","updated_at":"2026-01-05T19:29:02.815022-05:00"} +{"id":"vgi-python-085","title":"Add serialize/deserialize methods to catalog dataclasses","description":"Add Arrow IPC serialization directly to the dataclasses in vgi/catalog/catalog_interface.py.\n\nAdd to each dataclass:\n- serialize() -\u003e bytes method\n- @classmethod deserialize(batch: pa.RecordBatch) -\u003e Self method\n- Arrow schema class variable for each type\n\nDataclasses to update:\n- CatalogAttachResult\n- SchemaInfo \n- TableInfo\n- ViewInfo\n- FunctionInfo\n- ScanFunctionResult\n\nSerialization rules from plan:\n- Single-row batches for scalar returns\n- Multi-row batches for streaming (Iterable returns)\n- None = 0-row/0-column batch\n- Column names match field names exactly\n- SerializedSchema fields use pa.binary()\n- tags fields use pa.map_(pa.string(), pa.string())\n\nAlso create vgi/catalog/__init__.py with package exports.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:40.362177-05:00","created_by":"rusty","updated_at":"2026-01-05T19:39:06.385062-05:00","closed_at":"2026-01-05T19:39:06.385062-05:00","close_reason":"PR #24 created with serialize/deserialize methods"} {"id":"vgi-python-0fe","title":"Add is_varargs to ParameterInfo and metadata extraction","description":"In vgi/metadata.py:\n- Add is_varargs: bool = False to ParameterInfo\n- Update to_dict() and from_dict()\n- Add is_varargs field to _PARAMETER_STRUCT for Arrow serialization\n- Extract varargs flag in extract_parameters()\n- Add _validate_varargs() with rules:\n - Only one varargs parameter allowed\n - Must be positional (not named)\n - Must be last positional (before TableInput if present)\n - Cannot have default value","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:49:20.141375-05:00","created_by":"rusty","updated_at":"2026-01-05T10:58:21.242603-05:00","closed_at":"2026-01-05T10:58:21.242603-05:00","close_reason":"Added is_varargs to ParameterInfo, _PARAMETER_STRUCT, extract_parameters(), and _validate_varargs()","dependencies":[{"issue_id":"vgi-python-0fe","depends_on_id":"vgi-python-jrf","type":"blocks","created_at":"2026-01-05T10:49:26.421664-05:00","created_by":"rusty"}]} {"id":"vgi-python-0hr","title":"Remove redundant InitInputType class attribute","description":"InitInputType class attribute duplicates the generic type parameter: 'class ScalarFunctionGenerator(Function[FunctionInitInput])' already specifies the type, but 'InitInputType = FunctionInitInput' repeats it. Investigate using get_type_hints or __orig_bases__ to infer the type and remove the redundant attribute.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.780529-05:00","created_by":"rusty","updated_at":"2026-01-04T22:00:40.221423-05:00","closed_at":"2026-01-04T22:00:40.221423-05:00","close_reason":"PR #10 created - uses _get_init_input_type() to infer type from generic parameter"} {"id":"vgi-python-1s5","title":"Move distributed state management to optional mixin","description":"The Function base class in function.py includes ~200 lines for distributed state management (store_state, collect_states, enqueue_work, dequeue_work, work queue storage). Not all functions need this. Extract to DistributedStateMixin that functions can opt into, keeping Function base class simpler for basic use cases.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:53.606614-05:00","created_by":"rusty","updated_at":"2026-01-04T21:22:09.772825-05:00","closed_at":"2026-01-04T21:22:09.772825-05:00","close_reason":"Analysis complete: extraction not recommended. The distributed state methods are tightly coupled with execution_identifier and storage, which are used by core initialization methods. Extraction would require moving initialize_global_state/load_global_state to the mixin, breaking the protocol and requiring multiple inheritance. Current API is already opt-in (just don't call the methods) and well-documented."} @@ -19,7 +19,7 @@ {"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T21:53:26.965345-05:00","closed_at":"2026-01-04T21:53:26.965345-05:00","close_reason":"PR #9 created - unified ProtocolInput with shared base in protocol_types.py"} {"id":"vgi-python-8gz","title":"VGI Catalog Interface Implementation","description":"Complete the VGI Catalog Interface implementation to enable DuckDB ATTACH support.\n\nThe CatalogInterface ABC is already implemented in vgi/catalog/catalog_interface.py.\n\nRemaining work:\n- Add serialize/deserialize methods to dataclasses\n- Add InvocationType.CATALOG to protocol\n- Worker integration for catalog dispatch \n- CatalogClient class (new worker per call pattern)\n- Optional SQLite-based catalog storage\n- Example InMemoryCatalog\n- Tests\n\nSee: catalog-plan.md","status":"open","priority":1,"issue_type":"feature","created_at":"2026-01-05T19:26:27.348627-05:00","created_by":"rusty","updated_at":"2026-01-05T19:26:27.348627-05:00"} {"id":"vgi-python-8ra","title":"Implement Arrow-based argument specification serialization","description":"## Overview\n\nImplement serialization and deserialization of function argument specifications using Apache Arrow schemas. This enables functions to describe their argument signatures (types, positions, special markers) in a format that can be transmitted over IPC and understood by DuckDB for function registration.\n\n## Design\n\nUses a **single Arrow schema** where:\n- Positional arguments come first (field order = position index)\n- Named arguments follow (marked with `vgi_arg=named` metadata)\n- Special types (TableInput, AnyArrow, varargs) use field metadata markers\n\n## Key Components\n\n1. `ArgumentSpec` dataclass - represents one argument's specification\n2. `argument_specs_to_schema()` - convert specs to Arrow schema\n3. `schema_to_argument_specs()` - convert schema back to specs\n4. `extract_argument_specs()` - extract specs from function class Arg descriptors\n\n## Metadata Keys\n\n| Key | Value | Meaning |\n|-----|-------|---------|\n| `vgi_arg` | `named` | Named argument (not positional) |\n| `vgi_type` | `table` | Receives table input (Arg[TableInput]) |\n| `vgi_type` | `any` | Accepts any Arrow type (Arg[AnyArrow]) |\n| `vgi_varargs` | `true` | Collects remaining positional args |\n\n## References\n\n- Plan file: `.claude/plans/purrfect-foraging-nygaard.md`\n- Arguments module: `vgi/arguments.py`","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-05T11:18:01.05631-05:00","created_by":"rusty","updated_at":"2026-01-05T11:34:12.712096-05:00","closed_at":"2026-01-05T11:34:12.712096-05:00","close_reason":"Implemented Arrow-based argument specification serialization with tests and documentation"} -{"id":"vgi-python-9j7","title":"Add catalog dispatch to Worker class","description":"Integrate CatalogInterface handling into Worker class.\n\nFile: vgi/worker.py\n\nChanges:\n1. Add catalog_interface class attribute: type[CatalogInterface] | None = None\n\n2. In run() method, detect InvocationType.CATALOG and dispatch to _handle_catalog_invocation()\n\n3. Implement _handle_catalog_invocation(invocation: Invocation):\n - Check catalog_interface is not None (raise ValueError if missing)\n - Instantiate catalog_interface class\n - Get method from function_name field (e.g., 'catalog_attach')\n - Deserialize arguments from input batch (column names → kwargs)\n - Call method with keyword arguments\n - Serialize and stream result back\n\n4. Key protocol difference: No bind→init→stream phases, just invoke→stream\n\n5. Handle different return types:\n - None → 0-row/0-column batch\n - Dataclass → serialize to single-row batch\n - Iterable → stream multiple batches\n\n6. Error handling: Return exceptions as EXCEPTION log messages (same as functions)","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.845071-05:00","created_by":"rusty","updated_at":"2026-01-05T19:26:57.845071-05:00","dependencies":[{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.589219-05:00","created_by":"rusty"},{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.620681-05:00","created_by":"rusty"}]} +{"id":"vgi-python-9j7","title":"Add catalog dispatch to Worker class","description":"Integrate CatalogInterface handling into Worker class.\n\nFile: vgi/worker.py\n\nChanges:\n1. Add catalog_interface class attribute: type[CatalogInterface] | None = None\n\n2. In run() method, detect InvocationType.CATALOG and dispatch to _handle_catalog_invocation()\n\n3. Implement _handle_catalog_invocation(invocation: Invocation):\n - Check catalog_interface is not None (raise ValueError if missing)\n - Instantiate catalog_interface class\n - Get method from function_name field (e.g., 'catalog_attach')\n - Deserialize arguments from input batch (column names → kwargs)\n - Call method with keyword arguments\n - Serialize and stream result back\n\n4. Key protocol difference: No bind→init→stream phases, just invoke→stream\n\n5. Handle different return types:\n - None → 0-row/0-column batch\n - Dataclass → serialize to single-row batch\n - Iterable → stream multiple batches\n\n6. Error handling: Return exceptions as EXCEPTION log messages (same as functions)","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.845071-05:00","created_by":"rusty","updated_at":"2026-01-05T19:41:11.77842-05:00","dependencies":[{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.589219-05:00","created_by":"rusty"},{"issue_id":"vgi-python-9j7","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.620681-05:00","created_by":"rusty"}]} {"id":"vgi-python-9ql","title":"VGI Catalog Interface Implementation","description":"Add CatalogInterface ABC that lets VGI workers expose catalogs (databases), schemas, tables, views, and functions. Enables DuckDB ATTACH command support via VGI workers.\n\nKey components:\n- Type aliases and dataclasses (AttachId, TransactionId, SchemaInfo, TableInfo, etc.)\n- CatalogInterface abstract base class with ~40 methods\n- InvocationType.CATALOG for protocol dispatch\n- CatalogClient for client-side operations\n- Worker integration for catalog invocation handling\n\nSee: catalog-plan.md and ~/.claude/plans/iterative-waddling-adleman.md","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-05T19:16:41.100846-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.071596-05:00","closed_at":"2026-01-05T19:21:50.071596-05:00","close_reason":"User requested closure"} {"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]} {"id":"vgi-python-a9i","title":"Add test coverage for worker error paths and edge cases","notes":"Coverage: 88% in vgi/worker.py. Missing tests for:\n- Lines 161, 310, 320, 322, 325, 333: Function lookup edge cases\n- Lines 394, 408: Validation error paths\n- Lines 442-448: Log message handling loop\n- Lines 618-621, 652, 660, 681: Table function edge cases\n\nThese are error handling and edge case paths in the worker protocol.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T22:15:33.888277-05:00","created_by":"rusty","updated_at":"2026-01-04T22:34:34.795304-05:00","closed_at":"2026-01-04T22:34:34.795304-05:00","close_reason":"Added tests for registry caching and _suggest_similar_names. Coverage improved from 88% to 92%. Remaining uncovered lines are deep protocol paths in subprocess handling."} @@ -68,7 +68,7 @@ {"id":"vgi-python-odi","title":"Change max_processes from method to property in Function hierarchy","description":"Refactor max_processes from a method to a property across the Function class hierarchy (Function, ScalarFunction, TableFunctionGenerator, TableInOutFunction, etc.). This makes the API more consistent since max_processes is effectively a constant per function class and properties are more idiomatic for such values.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T11:25:29.750648-05:00","created_by":"rusty","updated_at":"2026-01-04T11:50:57.566545-05:00","closed_at":"2026-01-04T11:50:57.566545-05:00","close_reason":"Closed"} {"id":"vgi-python-p91","title":"Move exception classes from function.py to own file","description":"Move InitIdentifierError and SchemaValidationError from vgi/function.py to a new vgi/exceptions.py file. Update imports in function.py and any other files that reference these exceptions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:12:28.058227-05:00","created_by":"rusty","updated_at":"2026-01-04T09:17:52.477661-05:00","closed_at":"2026-01-04T09:17:52.477661-05:00","close_reason":"Closed"} {"id":"vgi-python-pnm","title":"Create vgi/catalog/read_only_catalog.py - ReadOnlyCatalogInterface","description":"Create ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFiles to create:\n- vgi/catalog/read_only_catalog.py\n\nReadOnlyCatalogInterface(CatalogInterface):\n- Override all DDL methods to raise ReadOnlyError\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- All table_* DDL methods\n- All view_* DDL methods\n- Transaction methods (optional - could allow read-only transactions)\n\nProperties:\n- supports_transactions = False (class attribute)\n- catalog_version_frozen = True (class attribute)\n\nCreate ReadOnlyError exception class in vgi/exceptions.py.\n\nInclude tests that verify all DDL operations raise ReadOnlyError.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:17:30.998165-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.075345-05:00","closed_at":"2026-01-05T19:21:50.075345-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-pnm","depends_on_id":"vgi-python-ik9","type":"blocks","created_at":"2026-01-05T19:18:36.574236-05:00","created_by":"rusty"}]} -{"id":"vgi-python-po3","title":"Add InvocationType.CATALOG to protocol","description":"Extend InvocationType enum to support catalog invocations.\n\nFile: vgi/invocation.py\n\nChanges:\n1. Add CATALOG = 'catalog' to InvocationType enum\n2. Update docstring to document the new type\n\nThe CATALOG invocation type indicates:\n- function_name field contains a CatalogInterface method name (e.g., 'catalog_attach', 'schemas', 'table_get')\n- Simplified protocol: invoke → stream (no bind→init→stream phases)\n- Input batch has exactly 1 row with column names matching method parameters\n\nEnsure existing serialization/deserialization handles the new value.","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:40.477214-05:00","created_by":"rusty","updated_at":"2026-01-05T19:39:29.693569-05:00"} +{"id":"vgi-python-po3","title":"Add InvocationType.CATALOG to protocol","description":"Extend InvocationType enum to support catalog invocations.\n\nFile: vgi/invocation.py\n\nChanges:\n1. Add CATALOG = 'catalog' to InvocationType enum\n2. Update docstring to document the new type\n\nThe CATALOG invocation type indicates:\n- function_name field contains a CatalogInterface method name (e.g., 'catalog_attach', 'schemas', 'table_get')\n- Simplified protocol: invoke → stream (no bind→init→stream phases)\n- Input batch has exactly 1 row with column names matching method parameters\n\nEnsure existing serialization/deserialization handles the new value.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:40.477214-05:00","created_by":"rusty","updated_at":"2026-01-05T19:40:56.341493-05:00","closed_at":"2026-01-05T19:40:56.341493-05:00","close_reason":"PR #25 created with InvocationType.CATALOG"} {"id":"vgi-python-q1w","title":"Implement optional CatalogStorage with SQLite default","description":"Create optional storage layer for catalog attach_id and transaction_id persistence.\n\nFile: vgi/catalog/storage.py\n\nCatalogStorage protocol:\n- attach_put(attach_id, catalog_name, options) -\u003e None\n- attach_get(attach_id) -\u003e tuple[str, dict] | None\n- attach_delete(attach_id) -\u003e None\n- attach_list() -\u003e list[AttachId]\n- transaction_put(transaction_id, attach_id, state) -\u003e None\n- transaction_get(transaction_id) -\u003e tuple[AttachId, bytes] | None\n- transaction_delete(transaction_id) -\u003e None\n\nCatalogStorageSqlite implementation:\n- Default location: ~/.state/vgi/vgi_catalog.db\n- WAL mode for concurrent access\n- Similar pattern to FunctionStorageSqlite\n\nUsage:\n- CatalogInterface subclasses can optionally use storage\n- Simple catalogs can ignore (return empty attach_id bytes)\n- Catalogs needing persistence override storage attribute\n\nAdd storage class attribute to CatalogInterface with None default.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.084387-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:15.084387-05:00"} {"id":"vgi-python-qud","title":"Test FunctionStorageSqlite: global_delete, global_exists, queue_clear","notes":"Coverage: 83% in vgi/function_storage.py. Missing tests for:\n- Line 266: KeyError path in global_get (key not found)\n- Lines 273-278: global_delete method\n- Lines 282-290: global_exists method \n- Line 337: queue_push with empty list\n- Lines 376-385: queue_clear method\n\nThese storage operations need direct unit tests to ensure correctness.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T22:15:25.982124-05:00","created_by":"rusty","updated_at":"2026-01-04T22:30:05.625934-05:00","closed_at":"2026-01-04T22:30:05.625934-05:00","close_reason":"Added comprehensive tests for FunctionStorageSqlite. Coverage improved from 83% to 98%."} {"id":"vgi-python-r3t","title":"Consolidate test client infrastructure in testing.py","description":"testing.py has three test client classes (FunctionTestClient, TableFunctionTestClient, ScalarFunctionTestClient) with shared infrastructure patterns. Extend _BaseTestClient pattern to reduce code duplication. Consider using a single unified client with method dispatch based on function type.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.913912-05:00","created_by":"rusty","updated_at":"2026-01-04T22:02:51.368907-05:00","closed_at":"2026-01-04T22:02:51.368907-05:00","close_reason":"Not warranted - _BaseTestClient already provides shared infrastructure (context manager, log capture, logging). The three clients handle genuinely different protocols (TableInOut with finalize, TableFunction with no input, Scalar with different protocol). Unifying would add type detection complexity without real benefit."} diff --git a/vgi/worker.py b/vgi/worker.py index 9c4883c..2cf2f99 100644 --- a/vgi/worker.py +++ b/vgi/worker.py @@ -1,4 +1,4 @@ -"""VGI Worker base class for hosting user-defined functions. +"""VGI Worker base class for hosting user-defined functions and catalogs. A worker is a subprocess that communicates via stdin/stdout using Arrow IPC. Workers are spawned by Client for each function invocation. @@ -94,12 +94,13 @@ class MyWorker(Worker): import structlog.stdlib from pyarrow import ipc +from vgi.catalog import CatalogInterface from vgi.exceptions import SchemaValidationError from vgi.function import ( Function, OutputSpec, ) -from vgi.invocation import Invocation +from vgi.invocation import Invocation, InvocationType from vgi.ipc_utils import read_ipc_batch from vgi.scalar_function import ProtocolInput as ScalarProtocolInput from vgi.scalar_function import ScalarFunctionGenerator @@ -148,6 +149,7 @@ class MyWorker(Worker): """ functions: Sequence[type[Function[Any]]] = [] + catalog_interface: type[CatalogInterface] | None = None _registry: dict[str, list[type[Function[Any]]]] | None = None @classmethod @@ -592,6 +594,99 @@ def _generate_batches( total_output_rows=total_output_rows, ) + def _handle_catalog_invocation( + self, + invocation: Invocation, + fn_log: structlog.stdlib.BoundLogger, + ) -> None: + """Handle a CatalogInterface method invocation. + + Catalog invocations use a simplified protocol without bind→init→stream + phases. The function_name field contains the method name to call, and + the input batch (read from stdin) contains method parameters as columns. + + Args: + invocation: The catalog invocation with method name and parameters. + fn_log: Logger bound to the function context. + + Raises: + ValueError: If catalog_interface is not configured. + + """ + if self.catalog_interface is None: + raise ValueError( + "CatalogInterface invocation received but Worker.catalog_interface " + "is not configured. Set catalog_interface class attribute to a " + "CatalogInterface subclass." + ) + + # Instantiate the catalog interface + catalog = self.catalog_interface() + method_name = invocation.function_name + + # Get the method + if not hasattr(catalog, method_name): + raise ValueError( + f"Unknown catalog method: '{method_name}'. " + f"CatalogInterface does not have a method named '{method_name}'." + ) + method = getattr(catalog, method_name) + + # Read arguments from input batch (1 row with columns matching parameters) + args_batch = self._read_ipc_batch("catalog_args") + if args_batch.num_rows != 1: + raise ValueError( + f"Catalog invocation expects exactly 1 row in argument batch, " + f"got {args_batch.num_rows}" + ) + + # Convert batch columns to kwargs + kwargs: dict[str, Any] = {} + row = args_batch.to_pylist()[0] + for name, value in row.items(): + kwargs[name] = value + + fn_log.debug("catalog_method_call", method=method_name, kwargs=kwargs) + + # Call the method + result = method(**kwargs) + + fn_log.debug("catalog_method_result", result=result) + + # Serialize and stream result + # Result types: + # - None → empty batch (0 rows, 0 columns) + # - Dataclass with serialize() → serialize to bytes, write + # - Iterable → stream multiple serialized items + with ipc.new_stream(cast(IOBase, sys.stdout), pa.schema([])) as writer: + if result is None: + # Write empty batch to signal completion + writer.write_batch(pa.RecordBatch.from_pydict({})) + elif hasattr(result, "serialize"): + # Single dataclass result - write serialized bytes directly + result_bytes = result.serialize() + sys.stdout.write(result_bytes) + else: + # Try to iterate (for schema_contents, schemas, etc.) + try: + for item in result: + if hasattr(item, "serialize"): + item_bytes = item.serialize() + sys.stdout.write(item_bytes) + else: + raise TypeError( + f"Catalog result item has no serialize method: " + f"{type(item).__name__}" + ) + except TypeError: + raise TypeError( + f"Catalog method returned unsupported type: " + f"{type(result).__name__}. Expected None, a dataclass " + f"with serialize(), or an iterable of such dataclasses." + ) from None + + fn_log.info("catalog_invocation_complete", method=method_name) + def run(self) -> None: """Run the worker, reading from stdin and writing to stdout.""" self.log.info("worker_starting") @@ -604,6 +699,11 @@ def run(self) -> None: fn_log.info("init_received", arguments=invocation.arguments) fn_log.debug("input_schema_parsed", schema=str(invocation.input_schema)) + # Dispatch catalog invocations separately (simplified protocol) + if invocation.function_type == InvocationType.CATALOG: + self._handle_catalog_invocation(invocation, fn_log) + return + registry = self._build_registry() if invocation.function_name not in registry: available = sorted(registry.keys())