diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 09db3ad..1b02fc8 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -38,7 +38,7 @@ {"id":"vgi-python-cvj","title":"Add PYTHON_TO_ARROW type mapping to vgi/arguments.py","description":"Add the Python→Arrow type mapping dict after imports:\n```python\nPYTHON_TO_ARROW: dict[type, pa.DataType] = {\n int: pa.int64(),\n str: pa.utf8(),\n float: pa.float64(),\n bool: pa.bool_(),\n bytes: pa.binary(),\n}\n```\nExport in __all__.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:37.900421-05:00","created_by":"rusty","updated_at":"2026-01-05T15:48:42.422086-05:00","closed_at":"2026-01-05T15:48:42.422086-05:00","close_reason":"PR #18 created"} {"id":"vgi-python-d73","title":"Create docs/argument-serialization.md","description":"## Overview\n\nCreate LLM-friendly documentation explaining the argument specification serialization format. This document should enable future implementors (human or AI) to understand how function argument signatures are serialized to Arrow schemas.\n\n## File Location\n\n`docs/argument-serialization.md`\n\n## Document Structure\n\n### Title and Purpose\n\nExplain that this document describes how VGI function argument specifications are serialized to Apache Arrow schemas for IPC transmission and DuckDB function registration.\n\n### Quick Reference\n\nA concise summary table showing:\n- Metadata keys and their meanings\n- Special type representations\n\n### Schema Format\n\nExplain the single-schema design:\n1. All arguments are fields in one Arrow schema\n2. Positional arguments come first, in order (field index = position index)\n3. Named arguments follow, marked with metadata\n4. Field name = Python attribute name (or argument key for named)\n5. Field type = exact Arrow type\n\n### Metadata Keys Reference\n\nComplete table of all metadata keys:\n\n| Key | Value | Description |\n|-----|-------|-------------|\n| `vgi_arg` | `named` | Field is a named argument, not positional. The field name is the argument key. |\n| `vgi_type` | `table` | Argument receives streaming table input (Arg[TableInput]). Arrow type is pa.null(). |\n| `vgi_type` | `any` | Argument accepts any Arrow type (Arg[AnyArrow]). Arrow type is pa.null(). |\n| `vgi_varargs` | `true` | Argument collects all remaining positional args. Arrow type is the element type. |\n\n### Special Type Handling\n\nExplain how special argument types are represented:\n\n#### TableInput\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"table\"}`\n- Meaning: This position receives streaming RecordBatches, not a scalar value\n\n#### AnyArrow\n- Arrow type: `pa.null()`\n- Metadata: `{b\"vgi_type\": b\"any\"}`\n- Meaning: Accepts any valid Arrow scalar type at runtime\n\n#### Varargs\n- Arrow type: The element type (e.g., `pa.int64()` for `Arg[int](..., varargs=True)`)\n- Metadata: `{b\"vgi_varargs\": b\"true\"}`\n- Meaning: Collects all remaining positional arguments from this position onwards\n\n### Examples\n\n#### Example 1: Simple Function\n\n```python\nclass MyFunction(TableInOutFunction):\n count = Arg[int](0) # Positional 0\n name = Arg[str](1) # Positional 1\n verbose = Arg[bool](\"verbose\") # Named\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"name\", pa.utf8()),\n pa.field(\"verbose\", pa.bool_(), metadata={b\"vgi_arg\": b\"named\"}),\n])\n```\n\n#### Example 2: Function with Table Input\n\n```python\nclass TransformFunction(TableInOutFunction):\n multiplier = Arg[float](0)\n data = Arg[TableInput](1)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"multiplier\", pa.float64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n])\n```\n\n#### Example 3: Function with Varargs\n\n```python\nclass SumFunction(TableInOutFunction):\n columns = Arg[str](0, varargs=True)\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"columns\", pa.utf8(), metadata={b\"vgi_varargs\": b\"true\"}),\n])\n```\n\n#### Example 4: Complex Function\n\n```python\nclass ComplexFunction(TableInOutFunction):\n count = Arg[int](0)\n data = Arg[TableInput](1)\n extra = Arg[float](2, varargs=True)\n format = Arg[str](\"format\")\n threshold = Arg[AnyArrow](\"threshold\")\n\n# Serializes to:\nschema = pa.schema([\n pa.field(\"count\", pa.int64()),\n pa.field(\"data\", pa.null(), metadata={b\"vgi_type\": b\"table\"}),\n pa.field(\"extra\", pa.float64(), metadata={b\"vgi_varargs\": b\"true\"}),\n pa.field(\"format\", pa.utf8(), metadata={b\"vgi_arg\": b\"named\"}),\n pa.field(\"threshold\", pa.null(), metadata={b\"vgi_arg\": b\"named\", b\"vgi_type\": b\"any\"}),\n])\n```\n\n### Serialization Code\n\nShow how to serialize and deserialize:\n\n```python\n# Serialize to bytes\nschema_bytes = schema.serialize().to_pybytes()\n\n# Deserialize from bytes\nschema = pa.ipc.read_schema(pa.py_buffer(schema_bytes))\n```\n\n### Parsing Algorithm\n\nExplain how to parse a schema back to argument specs:\n\n1. Initialize position_index = 0\n2. For each field in schema:\n a. Check if field has `vgi_arg=named` metadata\n b. If named: position = field.name (string)\n c. If positional: position = position_index, then increment position_index\n d. Check for `vgi_type` metadata (table or any)\n e. Check for `vgi_varargs` metadata\n f. Create ArgumentSpec with extracted info\n\n### Not Included\n\nExplicitly state what is NOT serialized:\n- Default values\n- Validation constraints (ge, le, choices, pattern)\n- Documentation strings\n\nThese are Python-side concerns handled by the Arg descriptor at runtime.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:19:17.488877-05:00","created_by":"rusty","updated_at":"2026-01-05T11:33:29.168007-05:00","closed_at":"2026-01-05T11:33:29.168007-05:00","close_reason":"Created comprehensive LLM-friendly documentation","dependencies":[{"issue_id":"vgi-python-d73","depends_on_id":"vgi-python-8ra","type":"blocks","created_at":"2026-01-05T11:19:30.820384-05:00","created_by":"rusty"}]} {"id":"vgi-python-dv0","title":"Add arrow_type parameter to Arg class","description":"In vgi/arguments.py:\n1. Add 'arrow_type' to __slots__\n2. Add parameter: arrow_type: pa.DataType | None = None\n3. Store: self.arrow_type = arrow_type\n4. Update __repr__ to include arrow_type if set","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:38.020395-05:00","created_by":"rusty","updated_at":"2026-01-05T15:50:51.513273-05:00","closed_at":"2026-01-05T15:50:51.513273-05:00","close_reason":"PR #19 created","dependencies":[{"issue_id":"vgi-python-dv0","depends_on_id":"vgi-python-cvj","type":"blocks","created_at":"2026-01-05T15:45:13.696822-05:00","created_by":"rusty"}]} -{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:54:28.722579-05:00"} +{"id":"vgi-python-dvd","title":"Complete ReadOnlyCatalogInterface implementation","description":"Complete ReadOnlyCatalogInterface that prevents all DDL operations.\n\nFile: vgi/catalog/catalog_interface.py (already exists)\n\nCurrent state: Only has class attributes, no method overrides.\n\nChanges needed:\n1. Override all DDL methods to raise CatalogReadOnlyError:\n - catalog_create, catalog_drop\n - schema_create, schema_drop\n - table_create, table_drop, table_rename, table_comment_set\n - All table_column_* methods\n - table_not_null_set, table_not_null_drop\n - view_create, view_drop, view_rename, view_comment_set\n - Transaction methods (begin/commit/rollback)\n\n2. Create CatalogReadOnlyError exception in vgi/exceptions.py\n\n3. Ensure catalog_attach returns supports_transactions=False\n\nInclude tests verifying all DDL operations raise CatalogReadOnlyError.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T19:27:15.199453-05:00","created_by":"rusty","updated_at":"2026-01-05T19:57:59.2381-05:00","closed_at":"2026-01-05T19:57:59.2381-05:00","close_reason":"PR #28 created"} {"id":"vgi-python-dvo","title":"Export AnyValue in vgi/__init__.py","description":"Import and add AnyValue to __all__ exports","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:41:41.65732-05:00","created_by":"rusty","updated_at":"2026-01-05T11:07:09.187969-05:00","closed_at":"2026-01-05T11:07:09.187969-05:00","close_reason":"Exported AnyArrow in vgi/__init__.py","dependencies":[{"issue_id":"vgi-python-dvo","depends_on_id":"vgi-python-ckg","type":"blocks","created_at":"2026-01-05T10:41:48.715634-05:00","created_by":"rusty"}]} {"id":"vgi-python-e37","title":"move Invocation from function.py out to own file","description":"The Invocation clas is kind of seperate from functions, so it should be in its own file. Move it and all of its other associated classes like InvocationType to its own file","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:18:46.605941-05:00","created_by":"rusty","updated_at":"2026-01-04T09:24:37.922675-05:00","closed_at":"2026-01-04T09:24:37.922675-05:00","close_reason":"Closed"} {"id":"vgi-python-e46","title":"Create vgi/catalog/client.py - CatalogClient class","description":"Create CatalogClient for client-side catalog operations.\n\nFiles to create:\n- vgi/catalog/client.py\n\nCatalogClient class:\n- __init__(worker_command: str)\n- Context manager support (__enter__, __exit__)\n- start() / stop() methods\n\nCore methods (mirroring CatalogInterface):\n- catalogs() -\u003e list[str]\n- attach(name, options) -\u003e CatalogAttachResult\n- detach(attach_id) -\u003e None\n- schemas(attach_id, transaction_id) -\u003e Iterator[SchemaInfo]\n- schema_get(attach_id, transaction_id, name) -\u003e SchemaInfo | None\n- schema_contents(attach_id, transaction_id, name) -\u003e Iterator[TableInfo | ViewInfo | FunctionInfo]\n- table_get(...) -\u003e TableInfo | None\n- view_get(...) -\u003e ViewInfo | None\n- function_get(...) -\u003e FunctionInfo | None\n- table_scan_function_get(...) -\u003e ScanFunctionResult\n\nDDL methods (optional, may raise NotImplementedError from worker):\n- catalog_create, catalog_drop\n- schema_create, schema_drop\n- table_create, table_drop, table_rename, etc.\n- view_create, view_drop, view_rename, etc.\n\nTransaction methods:\n- transaction_begin(attach_id) -\u003e TransactionId | None\n- transaction_commit(attach_id, transaction_id)\n- transaction_rollback(attach_id, transaction_id)\n\nInternal methods:\n- _invoke(method_name, **kwargs) -\u003e pa.RecordBatch | Iterator[pa.RecordBatch]\n- _create_invocation(method_name, kwargs) -\u003e Invocation\n- _deserialize_result(batch, return_type) -\u003e Any\n\nHandle:\n- Streaming responses for Iterable returns\n- Exception propagation from worker\n- None returns (0-row/0-column batches)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T19:18:04.65125-05:00","created_by":"rusty","updated_at":"2026-01-05T19:21:50.055794-05:00","closed_at":"2026-01-05T19:21:50.055794-05:00","close_reason":"User requested closure","dependencies":[{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-tw7","type":"blocks","created_at":"2026-01-05T19:18:44.316642-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-fd2","type":"blocks","created_at":"2026-01-05T19:18:44.440065-05:00","created_by":"rusty"},{"issue_id":"vgi-python-e46","depends_on_id":"vgi-python-4mg","type":"blocks","created_at":"2026-01-05T19:18:44.559963-05:00","created_by":"rusty"}]} @@ -56,7 +56,7 @@ {"id":"vgi-python-j9k","title":"Add protocol types for IPC stream writers in cli.py","notes":"Line 53: self._writer: Any = None\n\nCould define a Protocol type for the IPC stream writer interface:\n```python\nclass IPCWriter(Protocol):\n def write_batch(self, batch: pa.RecordBatch) -\u003e None: ...\n def close(self) -\u003e None: ...\n```\n\nPart of 14.17% imprecision in cli.py (34 Anys total).","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-04T22:19:50.31711-05:00","created_by":"rusty","updated_at":"2026-01-04T22:37:01.488788-05:00","closed_at":"2026-01-04T22:37:01.488788-05:00","close_reason":"Replaced _writer: Any with _writer: pq.ParquetWriter | None. Removes 1 Any and provides proper type information."} {"id":"vgi-python-jrf","title":"Add varargs parameter to Arg descriptor","description":"In vgi/arguments.py:\n- Add varargs: bool = False to Arg.__init__ and __slots__\n- Update _resolve() to collect positional[position:] when varargs=True\n- Validate at least 1 value provided\n- Update _validate() to validate each element in tuple\n- Add Arguments.get_varargs(start, type=None) method\n- Update __repr__ to show varargs flag","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T10:49:20.012964-05:00","created_by":"rusty","updated_at":"2026-01-05T10:55:22.479344-05:00","closed_at":"2026-01-05T10:55:22.479344-05:00","close_reason":"Implemented varargs parameter in Arg descriptor with get_varargs() method and _validate_single()"} {"id":"vgi-python-k7x","title":"Use Mapping instead of dict in extract_argument_specs signature","description":"The arg_types parameter in extract_argument_specs() is typed as dict[str, pa.DataType]. Using Mapping[str, pa.DataType] from collections.abc would be more flexible, accepting any mapping type.","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-05T11:51:21.021496-05:00","created_by":"rusty","updated_at":"2026-01-05T12:03:51.771301-05:00","closed_at":"2026-01-05T12:03:51.771301-05:00","close_reason":"Closed"} -{"id":"vgi-python-kgm","title":"Create catalog interface tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_serialization.py:\n- Round-trip tests for all dataclass types\n- Edge cases: empty strings, empty lists, None values, empty tags\n- Verify Arrow schema correctness\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations (schemas(), catalog_version())\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Test CatalogClient with mock worker\n- Test each client method\n- Test error handling\n- Test streaming responses\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query schemas, query tables, detach\n- DDL operations: create/drop schema, create/drop table\n- Error propagation\n\nProtocol conformance tests:\n- Invalid input schemas\n- Missing required columns\n- Wrong column types\n- Multi-row input batches (should fail)","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T19:27:36.864383-05:00","created_by":"rusty","updated_at":"2026-01-05T19:27:36.864383-05:00","dependencies":[{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-e6o","type":"blocks","created_at":"2026-01-05T19:27:50.987057-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-9j7","type":"blocks","created_at":"2026-01-05T19:27:51.017259-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-eg7","type":"blocks","created_at":"2026-01-05T19:27:51.046187-05:00","created_by":"rusty"}]} +{"id":"vgi-python-kgm","title":"Create catalog interface tests","description":"Create comprehensive test suite for catalog interface.\n\nFiles to create:\n- tests/catalog/__init__.py\n- tests/catalog/test_serialization.py\n- tests/catalog/test_catalog_interface.py\n- tests/catalog/test_catalog_client.py\n- tests/catalog/test_integration.py\n\ntest_serialization.py:\n- Round-trip tests for all dataclass types\n- Edge cases: empty strings, empty lists, None values, empty tags\n- Verify Arrow schema correctness\n\ntest_catalog_interface.py:\n- Test abstract method enforcement\n- Test default implementations (schemas(), catalog_version())\n- Test NotImplementedError for optional methods\n- Test ReadOnlyCatalogInterface\n\ntest_catalog_client.py:\n- Test CatalogClient with mock worker\n- Test each client method\n- Test error handling\n- Test streaming responses\n\ntest_integration.py:\n- End-to-end client ↔ worker tests using InMemoryCatalog\n- Catalog lifecycle: attach, query schemas, query tables, detach\n- DDL operations: create/drop schema, create/drop table\n- Error propagation\n\nProtocol conformance tests:\n- Invalid input schemas\n- Missing required columns\n- Wrong column types\n- Multi-row input batches (should fail)","status":"in_progress","priority":1,"issue_type":"task","created_at":"2026-01-05T19:27:36.864383-05:00","created_by":"rusty","updated_at":"2026-01-05T20:02:24.403675-05:00","dependencies":[{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-e6o","type":"blocks","created_at":"2026-01-05T19:27:50.987057-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-9j7","type":"blocks","created_at":"2026-01-05T19:27:51.017259-05:00","created_by":"rusty"},{"issue_id":"vgi-python-kgm","depends_on_id":"vgi-python-eg7","type":"blocks","created_at":"2026-01-05T19:27:51.046187-05:00","created_by":"rusty"}]} {"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T21:43:58.141038-05:00","closed_at":"2026-01-04T21:43:58.141038-05:00","close_reason":"PR #7 created: https://github.com/Query-farm/vgi-python/pull/7"} {"id":"vgi-python-l1u","title":"Consider custom __repr__ for ArgumentSpec","description":"The default dataclass __repr__ includes the full Arrow type repr which can be verbose. Consider a custom __repr__ that's more concise for debugging, e.g., 'ArgumentSpec(name=\"count\", pos=0, type=int64)' instead of showing the full pa.DataType object.","status":"closed","priority":4,"issue_type":"task","created_at":"2026-01-05T11:51:21.415976-05:00","created_by":"rusty","updated_at":"2026-01-05T12:15:02.029743-05:00","closed_at":"2026-01-05T12:15:02.029743-05:00","close_reason":"Closed"} {"id":"vgi-python-l5z","title":"Update existing tests that use arg_types parameter","description":"In tests/test_argument_spec.py:\n- Update all calls to extract_argument_specs() that pass arg_types\n- Remove the arg_types parameter from test function calls\n- Ensure tests still pass with auto-inference","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T15:44:56.81929-05:00","created_by":"rusty","updated_at":"2026-01-05T15:56:39.371768-05:00","closed_at":"2026-01-05T15:56:39.371768-05:00","close_reason":"Completed as part of PR #20","dependencies":[{"issue_id":"vgi-python-l5z","depends_on_id":"vgi-python-coi","type":"blocks","created_at":"2026-01-05T15:45:13.980985-05:00","created_by":"rusty"}]} diff --git a/tests/catalog/__init__.py b/tests/catalog/__init__.py new file mode 100644 index 0000000..fe66b17 --- /dev/null +++ b/tests/catalog/__init__.py @@ -0,0 +1 @@ +"""Tests for the VGI catalog interface.""" diff --git a/tests/catalog/test_catalog_interface.py b/tests/catalog/test_catalog_interface.py new file mode 100644 index 0000000..76fb900 --- /dev/null +++ b/tests/catalog/test_catalog_interface.py @@ -0,0 +1,425 @@ +"""Tests for CatalogInterface ABC and default implementations.""" + +from collections.abc import Iterable +from typing import Any + +import pytest + +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + CatalogInterface, + OnConflict, + SchemaInfo, + SerializedSchema, + TableInfo, + TransactionId, + ViewInfo, +) +from vgi.catalog.catalog_interface import ReadOnlyCatalogInterface +from vgi.exceptions import CatalogReadOnlyError + + +class MinimalCatalog(CatalogInterface): + """Minimal implementation for testing abstract method requirements.""" + + def catalogs(self) -> Iterable[str]: + """Return list of catalogs.""" + return ["test"] + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to catalog.""" + return CatalogAttachResult( + attach_id=AttachId(b"test"), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=1, + ) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get schema info.""" + if name == "main": + return SchemaInfo( + attach_id=attach_id, + name="main", + is_default=True, + comment=None, + tags={}, + ) + return None + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get table info.""" + return None + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get view info.""" + return None + + +class TestCatalogInterfaceAbstract: + """Test abstract method enforcement.""" + + def test_cannot_instantiate_abstract_class(self) -> None: + """CatalogInterface cannot be instantiated directly.""" + with pytest.raises(TypeError): + CatalogInterface() # type: ignore[abstract] + + def test_minimal_implementation_works(self) -> None: + """A minimal implementation can be instantiated.""" + catalog = MinimalCatalog() + assert list(catalog.catalogs()) == ["test"] + + +class TestCatalogInterfaceDefaults: + """Test default method implementations.""" + + def test_schemas_returns_main(self) -> None: + """Default schemas() returns single 'main' schema.""" + catalog = MinimalCatalog() + attach_id = AttachId(b"test") + schemas = list(catalog.schemas(attach_id=attach_id, transaction_id=None)) + + assert len(schemas) == 1 + assert schemas[0].name == "main" + assert schemas[0].is_default is True + assert schemas[0].comment is None + assert schemas[0].tags == {} + + def test_catalog_version_returns_zero(self) -> None: + """Default catalog_version() returns 0.""" + catalog = MinimalCatalog() + version = catalog.catalog_version( + attach_id=AttachId(b"test"), transaction_id=None + ) + assert version == 0 + + def test_catalog_detach_does_nothing(self) -> None: + """Default catalog_detach() does nothing (no exception).""" + catalog = MinimalCatalog() + # Should not raise + catalog.catalog_detach(attach_id=AttachId(b"test")) + + def test_interface_feature_flags_empty(self) -> None: + """Default interface_feature_flags returns empty set.""" + catalog = MinimalCatalog() + assert catalog.interface_feature_flags == set() + + +class TestCatalogInterfaceNotImplemented: + """Test that optional methods raise NotImplementedError by default.""" + + def test_catalog_create_not_implemented(self) -> None: + """catalog_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Catalog create not implemented"): + catalog.catalog_create( + name="test", on_conflict=OnConflict.ERROR, options={} + ) + + def test_catalog_drop_not_implemented(self) -> None: + """catalog_drop raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Catalog drop not implemented"): + catalog.catalog_drop(name="test") + + def test_transaction_begin_not_implemented(self) -> None: + """catalog_transaction_begin raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_begin(attach_id=AttachId(b"test")) + + def test_transaction_commit_not_implemented(self) -> None: + """catalog_transaction_commit raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_commit( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_transaction_rollback_not_implemented(self) -> None: + """catalog_transaction_rollback raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Catalog transactions not implemented" + ): + catalog.catalog_transaction_rollback( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_schema_create_not_implemented(self) -> None: + """schema_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Schema create not implemented"): + catalog.schema_create( + attach_id=AttachId(b"test"), + transaction_id=None, + name="new_schema", + comment=None, + tags={}, + ) + + def test_schema_drop_not_implemented(self) -> None: + """schema_drop raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Schema drop not implemented"): + catalog.schema_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + name="schema", + ignore_not_found=False, + cascade=False, + ) + + def test_schema_contents_not_implemented(self) -> None: + """schema_contents raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises( + NotImplementedError, match="Schema contents not implemented" + ): + catalog.schema_contents( + attach_id=AttachId(b"test"), transaction_id=None, name="main" + ) + + def test_table_create_not_implemented(self) -> None: + """table_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="Table create not implemented"): + catalog.table_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + columns=SerializedSchema(b""), + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_view_create_not_implemented(self) -> None: + """view_create raises NotImplementedError.""" + catalog = MinimalCatalog() + with pytest.raises(NotImplementedError, match="View create not implemented"): + catalog.view_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + +class MinimalReadOnlyCatalog(ReadOnlyCatalogInterface): + """Minimal read-only implementation for testing.""" + + def catalogs(self) -> Iterable[str]: + """Return list of catalogs.""" + return ["readonly"] + + def catalog_attach( + self, *, name: str, options: dict[str, Any] + ) -> CatalogAttachResult: + """Attach to catalog.""" + return CatalogAttachResult( + attach_id=AttachId(b"readonly"), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=True, + catalog_version=1, + ) + + def schema_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + name: str, + ) -> SchemaInfo | None: + """Get schema info.""" + return None + + def table_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> TableInfo | None: + """Get table info.""" + return None + + def view_get( + self, + *, + attach_id: AttachId, + transaction_id: TransactionId | None, + schema_name: str, + name: str, + ) -> ViewInfo | None: + """Get view info.""" + return None + + +class TestReadOnlyCatalogInterface: + """Test ReadOnlyCatalogInterface DDL rejection.""" + + def test_catalog_create_raises_readonly_error(self) -> None: + """catalog_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_create( + name="test", on_conflict=OnConflict.ERROR, options={} + ) + + def test_catalog_drop_raises_readonly_error(self) -> None: + """catalog_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_drop(name="test") + + def test_transaction_begin_raises_readonly_error(self) -> None: + """catalog_transaction_begin raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_begin(attach_id=AttachId(b"test")) + + def test_transaction_commit_raises_readonly_error(self) -> None: + """catalog_transaction_commit raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_commit( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_transaction_rollback_raises_readonly_error(self) -> None: + """catalog_transaction_rollback raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.catalog_transaction_rollback( + attach_id=AttachId(b"test"), transaction_id=TransactionId(b"tx") + ) + + def test_schema_create_raises_readonly_error(self) -> None: + """schema_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.schema_create( + attach_id=AttachId(b"test"), + transaction_id=None, + name="new", + comment=None, + tags={}, + ) + + def test_schema_drop_raises_readonly_error(self) -> None: + """schema_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.schema_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + name="main", + ignore_not_found=False, + cascade=False, + ) + + def test_table_create_raises_readonly_error(self) -> None: + """table_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + columns=SerializedSchema(b""), + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_drop_raises_readonly_error(self) -> None: + """table_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="table", + ignore_not_found=False, + ) + + def test_table_rename_raises_readonly_error(self) -> None: + """table_rename raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.table_rename( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="old", + new_name="new", + ignore_not_found=False, + ) + + def test_view_create_raises_readonly_error(self) -> None: + """view_create raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.view_create( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + def test_view_drop_raises_readonly_error(self) -> None: + """view_drop raises CatalogReadOnlyError.""" + catalog = MinimalReadOnlyCatalog() + with pytest.raises(CatalogReadOnlyError, match="read-only"): + catalog.view_drop( + attach_id=AttachId(b"test"), + transaction_id=None, + schema_name="main", + name="view", + ignore_not_found=False, + ) + + def test_class_attributes(self) -> None: + """ReadOnlyCatalogInterface has correct class attributes.""" + assert ReadOnlyCatalogInterface.supports_transactions is False + assert ReadOnlyCatalogInterface.catalog_version_frozen is True diff --git a/tests/catalog/test_integration.py b/tests/catalog/test_integration.py new file mode 100644 index 0000000..fb30d95 --- /dev/null +++ b/tests/catalog/test_integration.py @@ -0,0 +1,621 @@ +"""Integration tests for catalog interface using InMemoryCatalog.""" + +import pyarrow as pa +import pytest + +from vgi import schema +from vgi.catalog import OnConflict, SerializedSchema +from vgi.examples.catalog import InMemoryCatalog + + +class TestInMemoryCatalogBasic: + """Test basic InMemoryCatalog functionality.""" + + def test_default_catalog_exists(self) -> None: + """Default 'memory' catalog exists on creation.""" + catalog = InMemoryCatalog() + catalogs = list(catalog.catalogs()) + assert "memory" in catalogs + + def test_attach_to_default_catalog(self) -> None: + """Can attach to the default memory catalog.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + assert result.attach_id is not None + assert len(result.attach_id) == 16 # UUID bytes + assert result.supports_transactions is False + assert result.supports_time_travel is False + assert result.catalog_version_frozen is False + + def test_attach_to_nonexistent_catalog_raises(self) -> None: + """Attaching to nonexistent catalog raises error.""" + catalog = InMemoryCatalog() + with pytest.raises(ValueError, match="not found"): + catalog.catalog_attach(name="nonexistent", options={}) + + def test_detach(self) -> None: + """Can detach from an attached catalog.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + # Should not raise + catalog.catalog_detach(attach_id=result.attach_id) + + +class TestInMemoryCatalogSchemas: + """Test schema operations.""" + + def test_default_main_schema_exists(self) -> None: + """Default 'main' schema exists after attach.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schemas = list(catalog.schemas(attach_id=result.attach_id, transaction_id=None)) + + assert len(schemas) == 1 + assert schemas[0].name == "main" + assert schemas[0].is_default is True + + def test_schema_get_main(self) -> None: + """Can get the main schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="main" + ) + + assert schema is not None + assert schema.name == "main" + + def test_schema_get_nonexistent(self) -> None: + """Getting nonexistent schema returns None.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="nonexistent" + ) + + assert schema is None + + def test_schema_create(self) -> None: + """Can create a new schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="analytics", + comment="Analytics schema", + tags={"team": "data"}, + ) + + schemas = list(catalog.schemas(attach_id=result.attach_id, transaction_id=None)) + schema_names = [s.name for s in schemas] + assert "analytics" in schema_names + + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="analytics" + ) + assert schema is not None + assert schema.comment == "Analytics schema" + assert schema.tags == {"team": "data"} + + def test_schema_create_duplicate_raises(self) -> None: + """Creating duplicate schema raises error.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + with pytest.raises(ValueError, match="already exists"): + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="main", # Already exists + comment=None, + tags={}, + ) + + def test_schema_drop(self) -> None: + """Can drop a schema.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="to_drop", + comment=None, + tags={}, + ) + + catalog.schema_drop( + attach_id=result.attach_id, + transaction_id=None, + name="to_drop", + ignore_not_found=False, + cascade=False, + ) + + schema = catalog.schema_get( + attach_id=result.attach_id, transaction_id=None, name="to_drop" + ) + assert schema is None + + def test_schema_drop_ignore_not_found(self) -> None: + """Dropping nonexistent schema with ignore_not_found=True succeeds.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + # Should not raise + catalog.schema_drop( + attach_id=result.attach_id, + transaction_id=None, + name="nonexistent", + ignore_not_found=True, + cascade=False, + ) + + +class TestInMemoryCatalogTables: + """Test table operations.""" + + def test_table_create_and_get(self) -> None: + """Can create and retrieve a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns_schema = schema(id=pa.int64(), name=pa.string()) + columns = SerializedSchema(columns_schema.serialize().to_pybytes()) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[0], + unique_constraints=[[0]], + check_constraints=[], + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + ) + + assert table is not None + assert table.name == "users" + assert table.schema_name == "main" + assert table.columns == columns + assert table.not_null_constraints == [0] + assert table.unique_constraints == [[0]] + + def test_table_get_nonexistent(self) -> None: + """Getting nonexistent table returns None.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="nonexistent", + ) + assert table is None + + def test_table_drop(self) -> None: + """Can drop a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_drop( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ignore_not_found=False, + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ) + assert table is None + + def test_table_rename(self) -> None: + """Can rename a table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_rename( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + new_name="new_name", + ignore_not_found=False, + ) + + old = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_name", + ) + new = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="new_name", + ) + + assert old is None + assert new is not None + assert new.name == "new_name" + + def test_table_comment_set(self) -> None: + """Can set table comment.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_comment_set( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + comment="This is a comment", + ignore_not_found=False, + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="commented", + ) + assert table is not None + assert table.comment == "This is a comment" + + +class TestInMemoryCatalogViews: + """Test view operations.""" + + def test_view_create_and_get(self) -> None: + """Can create and retrieve a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + definition="SELECT * FROM users", + on_conflict=OnConflict.ERROR, + ) + + view = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + ) + + assert view is not None + assert view.name == "user_view" + assert view.definition == "SELECT * FROM users" + + def test_view_drop(self) -> None: + """Can drop a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + catalog.view_drop( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ignore_not_found=False, + ) + + view = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="to_drop", + ) + assert view is None + + def test_view_rename(self) -> None: + """Can rename a view.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + definition="SELECT 1", + on_conflict=OnConflict.ERROR, + ) + + catalog.view_rename( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + new_name="new_view", + ignore_not_found=False, + ) + + old = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="old_view", + ) + new = catalog.view_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="new_view", + ) + + assert old is None + assert new is not None + + +class TestInMemoryCatalogVersioning: + """Test catalog versioning.""" + + def test_version_increments_on_schema_create(self) -> None: + """Version increments when schema is created.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + version1 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + catalog.schema_create( + attach_id=result.attach_id, + transaction_id=None, + name="new_schema", + comment=None, + tags={}, + ) + + version2 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + assert version2 > version1 + + def test_version_increments_on_table_create(self) -> None: + """Version increments when table is created.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + version1 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + version2 = catalog.catalog_version( + attach_id=result.attach_id, transaction_id=None + ) + + assert version2 > version1 + + +class TestInMemoryCatalogSchemaContents: + """Test schema_contents operation.""" + + def test_schema_contents_lists_tables_and_views(self) -> None: + """schema_contents returns tables and views.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="users", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.view_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="user_view", + definition="SELECT * FROM users", + on_conflict=OnConflict.ERROR, + ) + + contents = list( + catalog.schema_contents( + attach_id=result.attach_id, transaction_id=None, name="main" + ) + ) + + names = [c.name for c in contents] + assert "users" in names + assert "user_view" in names + + +class TestInMemoryCatalogOnConflict: + """Test OnConflict behavior.""" + + def test_table_create_error_on_duplicate(self) -> None: + """OnConflict.ERROR raises on duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + with pytest.raises(ValueError, match="already exists"): + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_create_ignore_on_duplicate(self) -> None: + """OnConflict.IGNORE does nothing on duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns = SerializedSchema(schema().serialize().to_pybytes()) + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + # Should not raise + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns, + on_conflict=OnConflict.IGNORE, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + def test_table_create_replace_on_duplicate(self) -> None: + """OnConflict.REPLACE replaces duplicate table.""" + catalog = InMemoryCatalog() + result = catalog.catalog_attach(name="memory", options={}) + + columns1 = SerializedSchema(schema(a=pa.int32()).serialize().to_pybytes()) + columns2 = SerializedSchema(schema(b=pa.string()).serialize().to_pybytes()) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns1, + on_conflict=OnConflict.ERROR, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + catalog.table_create( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + columns=columns2, + on_conflict=OnConflict.REPLACE, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + ) + + table = catalog.table_get( + attach_id=result.attach_id, + transaction_id=None, + schema_name="main", + name="table", + ) + assert table is not None + assert table.columns == columns2 diff --git a/tests/catalog/test_serialization.py b/tests/catalog/test_serialization.py new file mode 100644 index 0000000..3431fe3 --- /dev/null +++ b/tests/catalog/test_serialization.py @@ -0,0 +1,398 @@ +"""Tests for catalog dataclass serialization/deserialization.""" + +import pyarrow as pa + +from vgi import schema +from vgi.catalog import ( + AttachId, + CatalogAttachResult, + FunctionInfo, + FunctionType, + ScanFunctionResult, + SchemaInfo, + SerializedSchema, + TableInfo, + ViewInfo, +) +from vgi.ipc_utils import deserialize_record_batch + + +class TestCatalogAttachResultSerialization: + """Test CatalogAttachResult serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = CatalogAttachResult( + attach_id=AttachId(b"\x01\x02\x03\x04"), + supports_transactions=True, + supports_time_travel=False, + catalog_version_frozen=False, + catalog_version=42, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.attach_id == original.attach_id + assert restored.supports_transactions == original.supports_transactions + assert restored.supports_time_travel == original.supports_time_travel + assert restored.catalog_version_frozen == original.catalog_version_frozen + assert restored.catalog_version == original.catalog_version + + def test_empty_attach_id(self) -> None: + """Test with empty attach_id bytes.""" + original = CatalogAttachResult( + attach_id=AttachId(b""), + supports_transactions=False, + supports_time_travel=False, + catalog_version_frozen=True, + catalog_version=0, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.attach_id == b"" + + def test_all_flags_true(self) -> None: + """Test with all boolean flags set to true.""" + original = CatalogAttachResult( + attach_id=AttachId(b"test"), + supports_transactions=True, + supports_time_travel=True, + catalog_version_frozen=True, + catalog_version=999, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = CatalogAttachResult.deserialize(batch) + + assert restored.supports_transactions is True + assert restored.supports_time_travel is True + assert restored.catalog_version_frozen is True + + +class TestSchemaInfoSerialization: + """Test SchemaInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = SchemaInfo( + attach_id=AttachId(b"\x01\x02\x03\x04"), + name="main", + is_default=True, + comment="Test schema", + tags={"env": "test", "owner": "alice"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.attach_id == original.attach_id + assert restored.name == original.name + assert restored.is_default == original.is_default + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_none_comment(self) -> None: + """Test with None comment.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="schema1", + is_default=False, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.comment is None + + def test_empty_tags(self) -> None: + """Test with empty tags dictionary.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="schema1", + is_default=False, + comment="Comment", + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.tags == {} + + def test_empty_string_name(self) -> None: + """Test with empty string name.""" + original = SchemaInfo( + attach_id=AttachId(b"test"), + name="", + is_default=False, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = SchemaInfo.deserialize(batch) + + assert restored.name == "" + + +class TestTableInfoSerialization: + """Test TableInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + columns_schema = schema(id=pa.int64(), name=pa.string()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="users", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[0], + unique_constraints=[[0]], + check_constraints=["id > 0"], + comment="Users table", + tags={"category": "core"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.schema_name == original.schema_name + assert restored.columns == original.columns + assert restored.not_null_constraints == original.not_null_constraints + assert restored.unique_constraints == original.unique_constraints + assert restored.check_constraints == original.check_constraints + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_empty_constraints(self) -> None: + """Test with empty constraint lists.""" + columns_schema = schema(x=pa.int32()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="simple", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[], + unique_constraints=[], + check_constraints=[], + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.not_null_constraints == [] + assert restored.unique_constraints == [] + assert restored.check_constraints == [] + + def test_multiple_unique_constraints(self) -> None: + """Test with multiple unique constraints on multiple columns.""" + columns_schema = schema(a=pa.int32(), b=pa.int32(), c=pa.int32()) + columns_bytes = SerializedSchema(columns_schema.serialize().to_pybytes()) + + original = TableInfo( + name="multi", + schema_name="main", + columns=columns_bytes, + not_null_constraints=[0, 1], + unique_constraints=[[0], [1, 2]], + check_constraints=["a > 0", "b < 100"], + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = TableInfo.deserialize(batch) + + assert restored.unique_constraints == [[0], [1, 2]] + + +class TestViewInfoSerialization: + """Test ViewInfo serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = ViewInfo( + name="user_summary", + schema_name="main", + definition="SELECT id, name FROM users", + comment="Summary view", + tags={"type": "summary"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ViewInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.schema_name == original.schema_name + assert restored.definition == original.definition + assert restored.comment == original.comment + assert restored.tags == original.tags + + def test_complex_definition(self) -> None: + """Test with complex SQL definition.""" + original = ViewInfo( + name="complex", + schema_name="analytics", + definition=""" + SELECT u.id, u.name, COUNT(o.id) as order_count + FROM users u + LEFT JOIN orders o ON u.id = o.user_id + WHERE u.active = true + GROUP BY u.id, u.name + HAVING COUNT(o.id) > 0 + """, + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ViewInfo.deserialize(batch) + + assert restored.definition == original.definition + + +class TestFunctionInfoSerialization: + """Test FunctionInfo serialization round-trip.""" + + def test_scalar_function(self) -> None: + """Test with scalar function type.""" + args_schema = schema(value=pa.int64()) + output_schema = schema(result=pa.int64()) + + original = FunctionInfo( + name="double", + schema_name="main", + function_type=FunctionType.SCALAR, + arguments=SerializedSchema(args_schema.serialize().to_pybytes()), + output_schema=SerializedSchema(output_schema.serialize().to_pybytes()), + comment="Double the input", + tags={"category": "math"}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = FunctionInfo.deserialize(batch) + + assert restored.name == original.name + assert restored.function_type == FunctionType.SCALAR + assert restored.arguments == original.arguments + assert restored.output_schema == original.output_schema + + def test_table_function(self) -> None: + """Test with table function type.""" + args_schema = schema(count=pa.int32()) + output_schema = schema(n=pa.int64()) + + original = FunctionInfo( + name="sequence", + schema_name="main", + function_type=FunctionType.TABLE, + arguments=SerializedSchema(args_schema.serialize().to_pybytes()), + output_schema=SerializedSchema(output_schema.serialize().to_pybytes()), + comment=None, + tags={}, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = FunctionInfo.deserialize(batch) + + assert restored.function_type == FunctionType.TABLE + + +class TestScanFunctionResultSerialization: + """Test ScanFunctionResult serialization round-trip.""" + + def test_basic_round_trip(self) -> None: + """Test basic serialization and deserialization.""" + original = ScanFunctionResult( + function_name="scan_table", + max_processes=4, + invocation_id=b"\x01\x02\x03\x04", + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ScanFunctionResult.deserialize(batch) + + assert restored.function_name == original.function_name + assert restored.max_processes == original.max_processes + assert restored.invocation_id == original.invocation_id + + def test_none_invocation_id(self) -> None: + """Test with None invocation_id.""" + original = ScanFunctionResult( + function_name="scan", + max_processes=1, + invocation_id=None, + ) + serialized = original.serialize() + batch = deserialize_record_batch(serialized) + restored = ScanFunctionResult.deserialize(batch) + + assert restored.invocation_id is None + + +class TestArrowSchemaCorrectness: + """Test that Arrow schemas are correct for each type.""" + + def test_catalog_attach_result_schema(self) -> None: + """Verify CatalogAttachResult Arrow schema.""" + schema = CatalogAttachResult.ARROW_SCHEMA + assert len(schema) == 5 + assert schema.field("attach_id").type == pa.binary() + assert schema.field("supports_transactions").type == pa.bool_() + assert schema.field("supports_time_travel").type == pa.bool_() + assert schema.field("catalog_version_frozen").type == pa.bool_() + assert schema.field("catalog_version").type == pa.int64() + + def test_schema_info_schema(self) -> None: + """Verify SchemaInfo Arrow schema.""" + schema = SchemaInfo.ARROW_SCHEMA + assert len(schema) == 5 + assert schema.field("attach_id").type == pa.binary() + assert schema.field("name").type == pa.string() + assert schema.field("is_default").type == pa.bool_() + assert schema.field("comment").type == pa.string() + assert schema.field("comment").nullable is True + assert schema.field("tags").type == pa.map_(pa.string(), pa.string()) + + def test_table_info_schema(self) -> None: + """Verify TableInfo Arrow schema.""" + schema = TableInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("schema_name").type == pa.string() + assert schema.field("columns").type == pa.binary() + assert schema.field("not_null_constraints").type == pa.list_(pa.int32()) + assert schema.field("unique_constraints").type == pa.list_(pa.list_(pa.int32())) + assert schema.field("check_constraints").type == pa.list_(pa.string()) + + def test_view_info_schema(self) -> None: + """Verify ViewInfo Arrow schema.""" + schema = ViewInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("schema_name").type == pa.string() + assert schema.field("definition").type == pa.string() + + def test_function_info_schema(self) -> None: + """Verify FunctionInfo Arrow schema.""" + schema = FunctionInfo.ARROW_SCHEMA + assert schema.field("name").type == pa.string() + assert schema.field("function_type").type == pa.string() + assert schema.field("arguments").type == pa.binary() + assert schema.field("output_schema").type == pa.binary() + + def test_scan_function_result_schema(self) -> None: + """Verify ScanFunctionResult Arrow schema.""" + schema = ScanFunctionResult.ARROW_SCHEMA + assert schema.field("function_name").type == pa.string() + assert schema.field("max_processes").type == pa.int32() + assert schema.field("invocation_id").type == pa.binary() + assert schema.field("invocation_id").nullable is True