diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index b5000a2..13ec848 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -42,7 +42,7 @@ {"id":"vgi-python-dvo","title":"Export AnyValue in vgi/__init__.py","description":"Import and add AnyValue to __all__ exports","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:41:41.65732-05:00","created_by":"rusty","updated_at":"2026-01-05T11:07:09.187969-05:00","closed_at":"2026-01-05T11:07:09.187969-05:00","close_reason":"Exported AnyArrow in vgi/__init__.py","dependencies":[{"issue_id":"vgi-python-dvo","depends_on_id":"vgi-python-ckg","type":"blocks","created_at":"2026-01-05T10:41:48.715634-05:00","created_by":"rusty"}]} {"id":"vgi-python-e37","title":"move Invocation from function.py out to own file","description":"The Invocation clas is kind of seperate from functions, so it should be in its own file. Move it and all of its other associated classes like InvocationType to its own file","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:18:46.605941-05:00","created_by":"rusty","updated_at":"2026-01-04T09:24:37.922675-05:00","closed_at":"2026-01-04T09:24:37.922675-05:00","close_reason":"Closed"} {"id":"vgi-python-e46","title":"Create vgi/catalog/client.py - CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFiles to create:\n- vgi/catalog/client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Context manager support (__enter__, __exit__)\n- start() / stop() methods\n\nCore methods (mirroring CatalogInterface):\n- catalogs() -\u003e list[str]\n- attach(name, options) -\u003e CatalogAttachResult\n- detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(attach_id, transaction_id, name) -\u003e SchemaInfo | None\n- schema_contents(attach_id, transaction_id, name) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- function_get(...) -\u003e FunctionInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (optional, may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_create, table_drop, table_rename, etc.\n- view_create, view_drop, view_rename, etc.\n\nTransaction methods:\n- transaction_begin(attach_id) -\u003e TransactionId | None\n- transaction_commit(attach_id, transaction_id)\n- transaction_rollback(attach_id, transaction_id)\n\nInternal methods:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation\n- _deserialize_result(batch, return_type) -\u003e Any\n\nHandle:\n- Streaming responses for Iterable returns\n- Exception propagation from worker\n- None returns (0-row/0-column batches)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:04.65125-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.055794-05:00","closed_at":"2026-01-05T19:21:50.055794-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.316642-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-fd2","type":"blocks","created_at":"2026-01-05T19:18:44.440065-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-4mg","type":"blocks","created_at":"2026-01-05T19:18:44.559963-05:00","created_by":"rusty"}]} -{"id":"vgi-python-e6o","title":"Implement CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFile: vgi/client/catalog_client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Each method call spawns new worker (matches VGI short-lived pattern)\n\nCore methods mirroring CatalogInterface:\n- catalogs() -\u003e list[str]\n- catalog_attach(name, options) -\u003e CatalogAttachResult\n- catalog_detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(...) -\u003e SchemaInfo | None\n- schema_contents(...) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_* methods, view_* methods\n\nTransaction methods:\n- catalog_transaction_begin/commit/rollback\n\nInternal:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation (with InvocationType.CATALOG)\n- Uses existing IPC utilities for communication","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.975309-05:00","created_by":"rusty","updated_at":"2026-01-05T19:26:57.975309-05:00","dependencies":[{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.730122-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.762036-05:00","created_by":"rusty"}]} +{"id":"vgi-python-e6o","title":"Implement CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFile: vgi/client/catalog_client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Each method call spawns new worker (matches VGI short-lived pattern)\n\nCore methods mirroring CatalogInterface:\n- catalogs() -\u003e list[str]\n- catalog_attach(name, options) -\u003e CatalogAttachResult\n- catalog_detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(...) -\u003e SchemaInfo | None\n- schema_contents(...) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_* methods, view_* methods\n\nTransaction methods:\n- catalog_transaction_begin/commit/rollback\n\nInternal:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation (with InvocationType.CATALOG)\n- Uses existing IPC utilities for communication","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:26:57.975309-05:00","created_by":"rusty","updated_at":"2026-01-05T19:44:28.358853-05:00","dependencies":[{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.730122-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e6o","depends_on_id":"vgi-python-po3","type":"blocks","created_at":"2026-01-05T19:27:50.762036-05:00","created_by":"rusty"}]} {"id":"vgi-python-e9q","title":"Unify ProtocolOutput classes with shared base","description":"ProtocolOutput classes in table_function.py:177-224 and table_in_out_function.py:144-207 share similar metadata() method and from_process_result() classmethod. The table_in_out version adds status field. Create shared base with table_in_out extending it for status support.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.45014-05:00","created_by":"rusty","updated_at":"2026-01-04T21:54:55.871986-05:00","closed_at":"2026-01-04T21:54:55.871986-05:00","close_reason":"Not warranted - dataclass inheritance with slots=True doesn't allow adding required field (status) between inherited fields. The classes have different semantics (table_in_out requires status for generator state tracking) making inheritance impractical."} {"id":"vgi-python-eg7","title":"Create InMemoryCatalog example implementation","description":"Create an in-memory catalog implementation for testing and as an example.\n\nFile: vgi/examples/catalog.py\n\nInMemoryCatalog(CatalogInterface):\n- In-memory storage using dicts\n- Implements all required abstract methods\n- Implements common optional methods (schema_create, table_create, etc.)\n- Generates attach_id as random UUID bytes\n- Does NOT support transactions (returns None)\n\nData structures:\n- _catalogs: dict[str, CatalogData]\n- _attachments: dict[AttachId, str] # attach_id -\u003e catalog_name\n\nCreate example worker:\n```python\nclass InMemoryCatalogWorker(Worker):\n catalog_interface = InMemoryCatalog\n```\n\nAdd entry point: vgi-example-catalog-worker","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:27.604912-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:27.604912-05:00","dependencies":[{"issue_id":"vgi-python-eg7","depends_on_id":"vgi-python-085","type":"blocks","created_at":"2026-01-05T19:27:50.87322-05:00","created_by":"rusty"}]} {"id":"vgi-python-f5z","title":"Create vgi/catalog/storage.py - Catalog persistence","description":"Create storage layer for catalog attach_id and transaction_id persistence.\n\nFiles to create:\n- vgi/catalog/storage.py\n\nCatalogStorage protocol (similar to FunctionStorage):\n- attach_put(attach_id, catalog_name, options) -\u003e None\n- attach_get(attach_id) -\u003e tuple[str, dict] | None\n- attach_delete(attach_id) -\u003e None\n- attach_list() -\u003e list[AttachId]\n\n- transaction_put(transaction_id, attach_id, state) -\u003e None\n- transaction_get(transaction_id) -\u003e tuple[AttachId, bytes] | None\n- transaction_delete(transaction_id) -\u003e None\n\nCatalogStorageSqlite implementation:\n- Default location: ~/.state/vgi/vgi_catalog.db\n- WAL mode for concurrent access\n- Schema:\n CREATE TABLE catalog_attachments (\n attach_id BLOB PRIMARY KEY,\n catalog_name TEXT NOT NULL,\n options TEXT, -- JSON\n created_at REAL DEFAULT (julianday('now'))\n )\n CREATE TABLE catalog_transactions (\n transaction_id BLOB PRIMARY KEY,\n attach_id BLOB NOT NULL,\n state BLOB,\n created_at REAL DEFAULT (julianday('now'))\n )\n\nInclude cleanup strategies for stale attachments/transactions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:18:04.531387-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.073983-05:00","closed_at":"2026-01-05T19:21:50.073983-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-f5z","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.194468-05:00","created_by":"rusty"}]} diff --git a/vgi/client/__init__.py b/vgi/client/__init__.py index b3909f6..df987ce 100644 --- a/vgi/client/__init__.py +++ b/vgi/client/__init__.py @@ -3,6 +3,8 @@ This package provides: - Client: A class for programmatic interaction with VGI workers - ClientError: Exception raised by Client operations +- CatalogClient: A class for catalog operations on VGI workers +- CatalogClientError: Exception raised by CatalogClient operations - OutputWriter: Helper for writing output in various formats - main: CLI entry point @@ -18,16 +20,25 @@ ): process(batch) +Usage (Catalog API): + from vgi.client import CatalogClient + + client = CatalogClient("./my_worker") + result = client.catalog_attach(name="my_catalog", options={}) + Usage (CLI): vgi-client --input data.parquet --function echo vgi-client --input data.parquet --function sum_all_columns """ +from vgi.client.catalog_client import CatalogClient, CatalogClientError from vgi.client.cli import OutputWriter, main from vgi.client.client import Client, ClientError __all__ = [ + "CatalogClient", + "CatalogClientError", "Client", "ClientError", "OutputWriter", diff --git a/vgi/client/catalog_client.py b/vgi/client/catalog_client.py new file mode 100644 index 0000000..f5c394c --- /dev/null +++ b/vgi/client/catalog_client.py @@ -0,0 +1,601 @@ +"""VGI CatalogClient for catalog operations. + +This module provides the CatalogClient class for invoking CatalogInterface methods +on VGI workers. Each method call spawns a new worker process for simplicity. + +QUICK START +----------- +Use CatalogClient for catalog operations: + + from vgi.client import CatalogClient + + client = CatalogClient("vgi-my-worker") + + # List available catalogs + catalogs = client.catalogs() + + # Attach to a catalog + result = client.catalog_attach(name="my_catalog", options={}) + + # List schemas + for schema in client.schemas(attach_id=result.attach_id, transaction_id=None): + print(schema.name) + +See Also +-------- +vgi.catalog.CatalogInterface : The interface that workers implement +vgi.worker.Worker : Workers with catalog_interface set + +""" + +from __future__ import annotations + +import io +import subprocess +import sys +from collections.abc import Iterator +from typing import Any, cast + +import pyarrow as pa +import structlog +import structlog.stdlib + +from vgi.arguments import Arguments +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + FunctionInfo, + OnConflict, + ScanFunctionResult, + SchemaInfo, + SerializedSchema, + TableInfo, + TransactionId, + ViewInfo, +) +from vgi.invocation import Invocation, InvocationType +from vgi.ipc_utils import read_ipc_batch + +# Configure structlog to write to stderr +structlog.configure( + processors=[ + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.dev.ConsoleRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger(0), + logger_factory=structlog.PrintLoggerFactory(file=sys.stderr), +) + +log: structlog.stdlib.BoundLogger = structlog.get_logger().bind( + component="catalog_client" +) + + +class CatalogClientError(Exception): + """Error raised by CatalogClient operations.""" + + +class CatalogClient: + """Client for invoking CatalogInterface methods on VGI workers. + + Each method call spawns a new worker process, matching VGI's short-lived + worker pattern. The catalog protocol is simplified compared to function + invocations: there's no bind/init phase, just invoke → stream. + + Example: + client = CatalogClient("./my_worker") + + # Attach to a catalog + result = client.catalog_attach(name="my_catalog", options={}) + + # List schemas + for schema in client.schemas(attach_id=result.attach_id, transaction_id=None): + print(schema.name) + + """ + + def __init__(self, worker_command: str | list[str]) -> None: + """Initialize the CatalogClient. + + Args: + worker_command: Command to spawn the worker. Can be a string + (shell command) or list of arguments. + + """ + if isinstance(worker_command, str): + self.server_path: list[str] = worker_command.split() + else: + self.server_path = worker_command + + def _invoke( + self, + method_name: str, + **kwargs: Any, + ) -> pa.RecordBatch | None: + """Invoke a catalog method and return the result batch. + + Spawns a worker, sends the invocation with method name and args, + reads the result, and returns the deserialized batch. + + Args: + method_name: CatalogInterface method name (e.g., 'catalog_attach'). + **kwargs: Method keyword arguments. + + Returns: + RecordBatch with the result, or None for methods that return None. + + """ + log.debug("catalog_invoke", method=method_name, kwargs=kwargs) + + # Start worker process + proc = subprocess.Popen( + self.server_path, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=False, + ) + + if proc.stdin is None or proc.stdout is None: + raise CatalogClientError("Failed to create pipes for worker process") + + stdout_buffered = io.BufferedReader(cast(io.RawIOBase, proc.stdout)) + + try: + # Create and send invocation + invocation = Invocation( + function_name=method_name, + input_schema=None, + function_type=InvocationType.CATALOG, + correlation_id="catalog", + invocation_id=None, + arguments=Arguments(), + ) + invocation_bytes = invocation.serialize() + proc.stdin.write(invocation_bytes) + + # Create and send arguments batch (1 row with kwargs as columns) + args_batch = self._create_args_batch(kwargs) + args_bytes = ( + args_batch.schema.serialize().to_pybytes() + + args_batch.serialize().to_pybytes() + ) + proc.stdin.write(args_bytes) + proc.stdin.flush() + proc.stdin.close() + + # Read result + try: + result_batch = read_ipc_batch(stdout_buffered, "catalog_result") + log.debug( + "catalog_result", + method=method_name, + num_rows=result_batch.num_rows, + num_columns=result_batch.num_columns, + ) + return result_batch + except Exception as e: + # Check if worker had an error + stderr_output = proc.stderr.read().decode() if proc.stderr else "" + if stderr_output: + log.error("worker_stderr", stderr=stderr_output) + raise CatalogClientError( + f"Failed to read catalog result: {e}\n{stderr_output}" + ) from e + + finally: + proc.wait() + + def _invoke_stream( + self, + method_name: str, + **kwargs: Any, + ) -> Iterator[pa.RecordBatch]: + """Invoke a catalog method and stream result batches. + + For methods that return iterables (schemas, schema_contents, etc.), + this yields each result batch. + + Args: + method_name: CatalogInterface method name. + **kwargs: Method keyword arguments. + + Yields: + RecordBatch for each result item. + + """ + log.debug("catalog_invoke_stream", method=method_name, kwargs=kwargs) + + # Start worker process + proc = subprocess.Popen( + self.server_path, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=False, + ) + + if proc.stdin is None or proc.stdout is None: + raise CatalogClientError("Failed to create pipes for worker process") + + stdout_buffered = io.BufferedReader(cast(io.RawIOBase, proc.stdout)) + + try: + # Create and send invocation + invocation = Invocation( + function_name=method_name, + input_schema=None, + function_type=InvocationType.CATALOG, + correlation_id="catalog", + invocation_id=None, + arguments=Arguments(), + ) + invocation_bytes = invocation.serialize() + proc.stdin.write(invocation_bytes) + + # Create and send arguments batch + args_batch = self._create_args_batch(kwargs) + args_bytes = ( + args_batch.schema.serialize().to_pybytes() + + args_batch.serialize().to_pybytes() + ) + proc.stdin.write(args_bytes) + proc.stdin.flush() + proc.stdin.close() + + # Stream results - read batches until EOF + while True: + try: + result_batch = read_ipc_batch(stdout_buffered, "catalog_result") + # Empty batch (0 rows, 0 columns) signals end + if result_batch.num_rows == 0 and result_batch.num_columns == 0: + break + yield result_batch + except Exception: + # EOF or error - stop iteration + break + + finally: + proc.wait() + + def _create_args_batch(self, kwargs: dict[str, Any]) -> pa.RecordBatch: + """Create a single-row batch from method keyword arguments.""" + if not kwargs: + return pa.RecordBatch.from_pydict({}) + + # Build column arrays from kwargs + data: dict[str, list[Any]] = {} + for name, value in kwargs.items(): + data[name] = [value] + + return pa.RecordBatch.from_pylist([kwargs]) + + # ========== Discovery Methods ========== + + def catalogs(self) -> list[str]: + """Get list of catalog names from the worker.""" + result = self._invoke("catalogs") + if result is None or result.num_rows == 0: + return [] + # Result should have a column with catalog names + return cast(list[str], result.column(0).to_pylist()) + + # ========== Catalog Lifecycle Methods ========== + + def catalog_attach( + self, *, name: str, options: dict[str, Any] | None = None + ) -> CatalogAttachResult: + """Attach to a catalog.""" + result = self._invoke("catalog_attach", name=name, options=options or {}) + if result is None: + raise CatalogClientError("catalog_attach returned no result") + return CatalogAttachResult.deserialize(result) + + def catalog_detach(self, *, attach_id: AttachId) -> None: + """Detach from a catalog.""" + self._invoke("catalog_detach", attach_id=attach_id) + + def catalog_create( + self, + *, + name: str, + on_conflict: OnConflict = OnConflict.ERROR, + options: dict[str, Any] | None = None, + ) -> None: + """Create a new catalog.""" + self._invoke( + "catalog_create", + name=name, + on_conflict=on_conflict.value, + options=options or {}, + ) + + def catalog_drop(self, *, name: str) -> None: + """Drop a catalog.""" + self._invoke("catalog_drop", name=name) + + def catalog_version( + self, *, attach_id: AttachId, transaction_id: TransactionId | None = None + ) -> int: + """Get the current catalog version.""" + result = self._invoke( + "catalog_version", attach_id=attach_id, transaction_id=transaction_id + ) + if result is None or result.num_rows == 0: + return 0 + return cast(int, result.column(0).to_pylist()[0]) + + # ========== Transaction Methods ========== + + def catalog_transaction_begin(self, *, attach_id: AttachId) -> TransactionId | None: + """Begin a new transaction.""" + result = self._invoke("catalog_transaction_begin", attach_id=attach_id) + if result is None or result.num_rows == 0: + return None + value = result.column(0).to_pylist()[0] + return TransactionId(value) if value else None + + def catalog_transaction_commit( + self, *, attach_id: AttachId, transaction_id: TransactionId + ) -> None: + """Commit a transaction.""" + self._invoke( + "catalog_transaction_commit", + attach_id=attach_id, + transaction_id=transaction_id, + ) + + def catalog_transaction_rollback( + self, *, attach_id: AttachId, transaction_id: TransactionId + ) -> None: + """Rollback a transaction.""" + self._invoke( + "catalog_transaction_rollback", + attach_id=attach_id, + transaction_id=transaction_id, + ) + + # ========== Schema Methods ========== + + def schemas( + self, *, attach_id: AttachId, transaction_id: TransactionId | None = None + ) -> Iterator[SchemaInfo]: + """List schemas in the catalog.""" + for batch in self._invoke_stream( + "schemas", attach_id=attach_id, transaction_id=transaction_id + ): + yield SchemaInfo.deserialize(batch) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + name: str, + ) -> SchemaInfo | None: + """Get information about a schema.""" + result = self._invoke( + "schema_get", + attach_id=attach_id, + transaction_id=transaction_id, + name=name, + ) + if result is None or result.num_rows == 0: + return None + return SchemaInfo.deserialize(result) + + def schema_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + name: str, + comment: str | None = None, + tags: dict[str, str] | None = None, + ) -> None: + """Create a new schema.""" + self._invoke( + "schema_create", + attach_id=attach_id, + transaction_id=transaction_id, + name=name, + comment=comment, + tags=tags or {}, + ) + + def schema_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + name: str, + ignore_not_found: bool = False, + cascade: bool = False, + ) -> None: + """Drop a schema.""" + self._invoke( + "schema_drop", + attach_id=attach_id, + transaction_id=transaction_id, + name=name, + ignore_not_found=ignore_not_found, + cascade=cascade, + ) + + def schema_contents( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + name: str, + ) -> Iterator[TableInfo | ViewInfo | FunctionInfo]: + """List contents of a schema (tables, views, functions).""" + for batch in self._invoke_stream( + "schema_contents", + attach_id=attach_id, + transaction_id=transaction_id, + name=name, + ): + # Determine type from batch schema or content + # For now, assume schema column indicates type + if "columns" in batch.schema.names: + yield TableInfo.deserialize(batch) + elif "definition" in batch.schema.names: + yield ViewInfo.deserialize(batch) + else: + yield FunctionInfo.deserialize(batch) + + # ========== Table Methods ========== + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get information about a table.""" + result = self._invoke( + "table_get", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + ) + if result is None or result.num_rows == 0: + return None + return TableInfo.deserialize(result) + + def table_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + columns: SerializedSchema, + on_conflict: OnConflict = OnConflict.ERROR, + not_null_constraints: list[int] | None = None, + unique_constraints: list[list[int]] | None = None, + check_constraints: list[str] | None = None, + ) -> None: + """Create a new table.""" + self._invoke( + "table_create", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + columns=columns, + on_conflict=on_conflict.value, + not_null_constraints=not_null_constraints or [], + unique_constraints=unique_constraints or [], + check_constraints=check_constraints or [], + ) + + def table_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + ignore_not_found: bool = False, + ) -> None: + """Drop a table.""" + self._invoke( + "table_drop", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + ignore_not_found=ignore_not_found, + ) + + def table_scan_function_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + at_unit: str | None = None, + at_value: str | None = None, + ) -> ScanFunctionResult: + """Get the scan function for a table.""" + result = self._invoke( + "table_scan_function_get", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + at_unit=at_unit, + at_value=at_value, + ) + if result is None: + raise CatalogClientError("table_scan_function_get returned no result") + return ScanFunctionResult.deserialize(result) + + # ========== View Methods ========== + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get information about a view.""" + result = self._invoke( + "view_get", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + ) + if result is None or result.num_rows == 0: + return None + return ViewInfo.deserialize(result) + + def view_create( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + definition: str, + on_conflict: OnConflict = OnConflict.ERROR, + ) -> None: + """Create a new view.""" + self._invoke( + "view_create", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + definition=definition, + on_conflict=on_conflict.value, + ) + + def view_drop( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None = None, + schema_name: str, + name: str, + ignore_not_found: bool = False, + ) -> None: + """Drop a view.""" + self._invoke( + "view_drop", + attach_id=attach_id, + transaction_id=transaction_id, + schema_name=schema_name, + name=name, + ignore_not_found=ignore_not_found, + )