Skip to content

Commit fa2f61b

Browse files
rustyconoverclaude
andcommitted
Improve catalog streaming and refactor CatalogClientMixin
Catalog streaming improvements: - Add to_row_dict() methods to SchemaInfo, TableInfo, ViewInfo, FunctionInfo - Use RecordBatchStreamWriter in worker for list-returning methods - Produces 1 schema + 1 batch (N rows) + 1 EOS instead of N complete streams Type fixes: - Change overload return types from list to Sequence (covariant) - Fix tags parameter type: set[str] -> dict[str, str] - Fix list_schemas() generator to return actual list CatalogClientMixin refactoring: - Extract shared subprocess logic into _send_catalog_invocation helper - Remove duplicate transaction_begin/commit/rollback methods - Remove unused TransactionBeginResult class - Improve docstrings with complete examples and error handling docs - Update cli_transaction.py to use catalog_transaction_* methods Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent d980ab9 commit fa2f61b

12 files changed

Lines changed: 864 additions & 554 deletions

tests/catalog/test_catalog_interface.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Tests for CatalogInterface ABC and default implementations."""
22

3-
from collections.abc import Callable, Iterable
3+
from collections.abc import Callable
44
from typing import Any
55

66
import pyarrow as pa
@@ -61,7 +61,7 @@ def readonly_catalog() -> "MinimalReadOnlyCatalog":
6161
class MinimalCatalog(CatalogInterface):
6262
"""Minimal implementation for testing abstract method requirements."""
6363

64-
def catalogs(self) -> Iterable[str]:
64+
def catalogs(self) -> list[str]:
6565
"""Return list of catalogs."""
6666
return ["test"]
6767

@@ -227,7 +227,10 @@ def _not_implemented_test_cases() -> list[
227227
"schema_contents",
228228
"Schema contents not implemented",
229229
lambda c: c.schema_contents(
230-
attach_id=TEST_ATTACH_ID, transaction_id=None, name="main"
230+
attach_id=TEST_ATTACH_ID,
231+
transaction_id=None,
232+
name="main",
233+
type=SchemaObjectType.TABLE,
231234
),
232235
),
233236
(
@@ -283,7 +286,7 @@ def test_not_implemented(
283286
class MinimalReadOnlyCatalog(ReadOnlyCatalogInterface):
284287
"""Minimal read-only implementation for testing."""
285288

286-
def catalogs(self) -> Iterable[str]:
289+
def catalogs(self) -> list[str]:
287290
"""Return list of catalogs."""
288291
return ["readonly"]
289292

@@ -774,20 +777,36 @@ class TestCatalog(ReadOnlyCatalogInterface):
774777

775778
return TestCatalog()
776779

777-
def test_no_filter_returns_all(
780+
def test_fetch_all_function_types(
778781
self, catalog_with_functions: ReadOnlyCatalogInterface
779782
) -> None:
780-
"""schema_contents with no type filter returns all functions."""
783+
"""Can fetch both scalar and table functions with separate calls."""
781784
attach_result = catalog_with_functions.catalog_attach(name="test", options={})
782-
contents = list(
785+
786+
# Get scalar functions
787+
scalar_contents = list(
788+
catalog_with_functions.schema_contents(
789+
attach_id=attach_result.attach_id,
790+
transaction_id=None,
791+
name="main",
792+
type=SchemaObjectType.SCALAR_FUNCTION,
793+
)
794+
)
795+
796+
# Get table functions
797+
table_contents = list(
783798
catalog_with_functions.schema_contents(
784799
attach_id=attach_result.attach_id,
785800
transaction_id=None,
786801
name="main",
802+
type=SchemaObjectType.TABLE_FUNCTION,
787803
)
788804
)
789-
assert len(contents) == 2
790-
names = {obj.name for obj in contents}
805+
806+
# Combined should have both functions
807+
all_contents = scalar_contents + table_contents
808+
assert len(all_contents) == 2
809+
names = {obj.name for obj in all_contents}
791810
# Names are derived from class names: MyScalarFunction -> my_scalar
792811
assert "my_scalar" in names
793812
assert "my_table" in names
@@ -870,21 +889,7 @@ def test_wrong_schema_returns_empty(
870889
attach_id=attach_result.attach_id,
871890
transaction_id=None,
872891
name="nonexistent",
892+
type=SchemaObjectType.TABLE_FUNCTION,
873893
)
874894
)
875895
assert len(contents) == 0
876-
877-
def test_type_filter_with_none_explicitly(
878-
self, catalog_with_functions: ReadOnlyCatalogInterface
879-
) -> None:
880-
"""schema_contents with type=None explicitly returns all functions."""
881-
attach_result = catalog_with_functions.catalog_attach(name="test", options={})
882-
contents = list(
883-
catalog_with_functions.schema_contents(
884-
attach_id=attach_result.attach_id,
885-
transaction_id=None,
886-
name="main",
887-
type=None,
888-
)
889-
)
890-
assert len(contents) == 2

tests/catalog/test_example_worker_catalog.py

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66

77
import pyarrow as pa
88

9-
from vgi.catalog import FunctionInfo, FunctionType, TableInfo, ViewInfo
9+
from vgi.catalog import (
10+
AttachId,
11+
FunctionInfo,
12+
FunctionType,
13+
SchemaObjectType,
14+
TableInfo,
15+
ViewInfo,
16+
)
1017
from vgi.client import Client
1118
from vgi.examples.worker import ExampleWorker
1219

@@ -30,6 +37,26 @@ def _get_functions(
3037
return [item for item in contents if isinstance(item, FunctionInfo)]
3138

3239

40+
def _get_all_functions(client: Client, attach_id: AttachId) -> list[FunctionInfo]:
41+
"""Get both table and scalar functions from the catalog."""
42+
table_funcs = list(
43+
client.schema_contents(
44+
attach_id=attach_id,
45+
name="main",
46+
type=SchemaObjectType.TABLE_FUNCTION,
47+
)
48+
)
49+
scalar_funcs = list(
50+
client.schema_contents(
51+
attach_id=attach_id,
52+
name="main",
53+
type=SchemaObjectType.SCALAR_FUNCTION,
54+
)
55+
)
56+
# Combine to list - the overloads guarantee FunctionInfo for function types
57+
return list(table_funcs) + list(scalar_funcs)
58+
59+
3360
class TestExampleWorkerCatalog:
3461
"""Test ExampleWorker's catalog interface."""
3562

@@ -49,15 +76,19 @@ def test_catalog_attach_works(self) -> None:
4976
assert result.catalog_version_frozen is True
5077

5178
def test_schema_contents_returns_functions(self) -> None:
52-
"""schema_contents() returns FunctionInfo for all example functions."""
79+
"""schema_contents() returns FunctionInfo for table functions."""
5380
client = Client(EXAMPLE_WORKER)
5481

5582
# Attach to catalog
5683
attach_result = client.catalog_attach(name="example", options={})
5784
attach_id = attach_result.attach_id
5885

59-
# Get schema contents
60-
contents = list(client.schema_contents(attach_id=attach_id, name="main"))
86+
# Get table functions
87+
contents = list(
88+
client.schema_contents(
89+
attach_id=attach_id, name="main", type=SchemaObjectType.TABLE_FUNCTION
90+
)
91+
)
6192

6293
# Should have functions
6394
assert len(contents) > 0
@@ -70,12 +101,30 @@ def test_all_example_functions_listed(self) -> None:
70101
"""All example functions are listed in the catalog."""
71102
client = Client(EXAMPLE_WORKER)
72103

73-
# Attach and get contents
104+
# Attach and get both table and scalar functions
74105
attach_result = client.catalog_attach(name="example", options={})
75-
contents = list(
76-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
106+
107+
# Get table functions
108+
table_funcs = list(
109+
client.schema_contents(
110+
attach_id=attach_result.attach_id,
111+
name="main",
112+
type=SchemaObjectType.TABLE_FUNCTION,
113+
)
77114
)
78115

116+
# Get scalar functions
117+
scalar_funcs = list(
118+
client.schema_contents(
119+
attach_id=attach_result.attach_id,
120+
name="main",
121+
type=SchemaObjectType.SCALAR_FUNCTION,
122+
)
123+
)
124+
125+
# Combine all functions
126+
contents = table_funcs + scalar_funcs
127+
79128
# Get function names
80129
function_names = {item.name for item in contents}
81130

@@ -95,10 +144,7 @@ def test_function_info_has_correct_types(self) -> None:
95144
client = Client(EXAMPLE_WORKER)
96145

97146
attach_result = client.catalog_attach(name="example", options={})
98-
contents = list(
99-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
100-
)
101-
functions = _get_functions(contents)
147+
functions = _get_all_functions(client, attach_result.attach_id)
102148

103149
# Create lookup by name
104150
by_name = {fn.name: fn for fn in functions}
@@ -117,10 +163,7 @@ def test_function_info_has_arguments(self) -> None:
117163
client = Client(EXAMPLE_WORKER)
118164

119165
attach_result = client.catalog_attach(name="example", options={})
120-
contents = list(
121-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
122-
)
123-
functions = _get_functions(contents)
166+
functions = _get_all_functions(client, attach_result.attach_id)
124167

125168
# Create lookup by name
126169
by_name = {fn.name: fn for fn in functions}
@@ -142,10 +185,13 @@ def test_function_info_has_description(self) -> None:
142185
client = Client(EXAMPLE_WORKER)
143186

144187
attach_result = client.catalog_attach(name="example", options={})
145-
contents = list(
146-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
188+
functions = list(
189+
client.schema_contents(
190+
attach_id=attach_result.attach_id,
191+
name="main",
192+
type=SchemaObjectType.TABLE_FUNCTION,
193+
)
147194
)
148-
functions = _get_functions(contents)
149195

150196
# Create lookup by name
151197
by_name = {fn.name: fn for fn in functions}
@@ -160,23 +206,24 @@ def test_function_info_schema_name(self) -> None:
160206
client = Client(EXAMPLE_WORKER)
161207

162208
attach_result = client.catalog_attach(name="example", options={})
163-
contents = list(
164-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
165-
)
209+
functions = _get_all_functions(client, attach_result.attach_id)
166210

167211
# All functions should be in 'main' schema
168-
for item in contents:
212+
for item in functions:
169213
assert item.schema_name == "main"
170214

171215
def test_scalar_function_has_output_schema(self) -> None:
172216
"""Scalar functions with static output types have output_schema populated."""
173217
client = Client(EXAMPLE_WORKER)
174218

175219
attach_result = client.catalog_attach(name="example", options={})
176-
contents = list(
177-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
220+
functions = list(
221+
client.schema_contents(
222+
attach_id=attach_result.attach_id,
223+
name="main",
224+
type=SchemaObjectType.SCALAR_FUNCTION,
225+
)
178226
)
179-
functions = _get_functions(contents)
180227

181228
# Create lookup by name
182229
by_name = {fn.name: fn for fn in functions}
@@ -195,10 +242,13 @@ def test_scalar_function_with_dynamic_output_has_any_type(self) -> None:
195242
client = Client(EXAMPLE_WORKER)
196243

197244
attach_result = client.catalog_attach(name="example", options={})
198-
contents = list(
199-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
245+
functions = list(
246+
client.schema_contents(
247+
attach_id=attach_result.attach_id,
248+
name="main",
249+
type=SchemaObjectType.SCALAR_FUNCTION,
250+
)
200251
)
201-
functions = _get_functions(contents)
202252

203253
# Create lookup by name
204254
by_name = {fn.name: fn for fn in functions}
@@ -218,10 +268,13 @@ def test_table_function_has_empty_output_schema(self) -> None:
218268
client = Client(EXAMPLE_WORKER)
219269

220270
attach_result = client.catalog_attach(name="example", options={})
221-
contents = list(
222-
client.schema_contents(attach_id=attach_result.attach_id, name="main")
271+
functions = list(
272+
client.schema_contents(
273+
attach_id=attach_result.attach_id,
274+
name="main",
275+
type=SchemaObjectType.TABLE_FUNCTION,
276+
)
223277
)
224-
functions = _get_functions(contents)
225278

226279
# Create lookup by name
227280
by_name = {fn.name: fn for fn in functions}

tests/catalog/test_integration.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,9 @@ class TestInMemoryCatalogSchemaContents:
476476
"""Test schema_contents operation."""
477477

478478
def test_schema_contents_lists_tables_and_views(self) -> None:
479-
"""schema_contents returns tables and views."""
479+
"""schema_contents returns tables and views when queried by type."""
480+
from vgi.catalog import SchemaObjectType
481+
480482
catalog = InMemoryCatalog()
481483
result = catalog.catalog_attach(name="memory", options={})
482484

@@ -502,15 +504,30 @@ def test_schema_contents_lists_tables_and_views(self) -> None:
502504
on_conflict=OnConflict.ERROR,
503505
)
504506

505-
contents = list(
507+
# Get tables
508+
tables = list(
506509
catalog.schema_contents(
507-
attach_id=result.attach_id, transaction_id=None, name="main"
510+
attach_id=result.attach_id,
511+
transaction_id=None,
512+
name="main",
513+
type=SchemaObjectType.TABLE,
514+
)
515+
)
516+
517+
# Get views
518+
views = list(
519+
catalog.schema_contents(
520+
attach_id=result.attach_id,
521+
transaction_id=None,
522+
name="main",
523+
type=SchemaObjectType.VIEW,
508524
)
509525
)
510526

511-
names = [c.name for c in contents]
512-
assert "users" in names
513-
assert "user_view" in names
527+
table_names = [t.name for t in tables]
528+
view_names = [v.name for v in views]
529+
assert "users" in table_names
530+
assert "user_view" in view_names
514531

515532

516533
class TestInMemoryCatalogOnConflict:

tests/ci/test_catalog_ddl.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import pytest
66

7-
from vgi.catalog import AttachId, OnConflict, SerializedSchema
7+
from vgi.catalog import AttachId, OnConflict, SchemaObjectType, SerializedSchema
88
from vgi.ci.catalog import CICatalog
99

1010

@@ -188,7 +188,10 @@ def test_contents_empty(self, attached_catalog: tuple[CICatalog, AttachId]) -> N
188188

189189
contents = list(
190190
catalog.schema_contents(
191-
attach_id=attach_id, transaction_id=None, name="main"
191+
attach_id=attach_id,
192+
transaction_id=None,
193+
name="main",
194+
type=SchemaObjectType.TABLE,
192195
)
193196
)
194197
assert contents == []
@@ -214,7 +217,10 @@ def test_contents_with_table(
214217

215218
contents = list(
216219
catalog.schema_contents(
217-
attach_id=attach_id, transaction_id=None, name="main"
220+
attach_id=attach_id,
221+
transaction_id=None,
222+
name="main",
223+
type=SchemaObjectType.TABLE,
218224
)
219225
)
220226
assert len(contents) == 1

0 commit comments

Comments
 (0)