diff --git a/.gitignore b/.gitignore index 505a3b1..04c48d2 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,7 @@ wheels/ # Virtual environments .venv + +# Mypy reports +mypy-reports/ +mypy-html-report/ diff --git a/CLAUDE.md b/CLAUDE.md index 16bb5cb..c91287b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -12,6 +12,11 @@ uv run ruff format . # Format uv run mypy vgi/ # Type check ``` +**Before committing**, always run lint and format checks: +```bash +uv run ruff check --fix . && uv run ruff format . && uv run mypy vgi/ +``` + ## Project Overview VGI (Vector Gateway Interface) provides an Apache Arrow-based protocol for connecting DuckDB to external programs. It enables user-defined functions to run in separate processes, communicating via stdin/stdout using Arrow IPC streaming. diff --git a/pyproject.toml b/pyproject.toml index 8fd3d2f..dc7756f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.12.4" dependencies = ["click", "pyarrow", "structlog", "platformdirs"] [project.optional-dependencies] -dev = ["mypy", "pytest", "pytest-mypy", "pytest-ruff", "pytest-xdist", "ruff"] +dev = ["mypy", "pyarrow-stubs", "pytest", "pytest-mypy", "pytest-ruff", "pytest-xdist", "ruff"] [project.scripts] vgi-client = "vgi.client.cli:main" @@ -38,10 +38,6 @@ strict = true warn_return_any = true warn_unused_ignores = true -[[tool.mypy.overrides]] -module = "pyarrow.*" -ignore_missing_imports = true - [[tool.mypy.overrides]] module = "structlog.*" ignore_missing_imports = true @@ -49,3 +45,9 @@ ignore_missing_imports = true [tool.pytest.ini_options] addopts = "--mypy --ruff" testpaths = ["tests"] + +[dependency-groups] +dev = [ + "lxml>=6.0.2", + "pytest-timeout>=2.4.0", +] diff --git a/tests/conftest.py b/tests/conftest.py index 0852ffe..df68e89 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ """Shared fixtures for VGI tests.""" +from typing import Any + import pyarrow as pa import pytest @@ -13,13 +15,12 @@ def example_worker() -> str: @pytest.fixture def simple_batches() -> list[pa.RecordBatch]: """Create simple test batches with integer and string columns.""" - schema = pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("value", pa.int64()), - pa.field("name", pa.string()), - ] - ) + fields: list[pa.Field[Any]] = [ + pa.field("id", pa.int64()), + pa.field("value", pa.int64()), + pa.field("name", pa.string()), + ] + schema = pa.schema(fields) batch1 = pa.RecordBatch.from_pydict( {"id": [1, 2], "value": [10, 20], "name": ["a", "b"]}, schema=schema, @@ -34,12 +35,11 @@ def simple_batches() -> list[pa.RecordBatch]: @pytest.fixture def numeric_batches() -> list[pa.RecordBatch]: """Create test batches with only numeric columns for sum tests.""" - schema = pa.schema( - [ - pa.field("a", pa.int32()), - pa.field("b", pa.float64()), - ] - ) + fields: list[pa.Field[Any]] = [ + pa.field("a", pa.int32()), + pa.field("b", pa.float64()), + ] + schema = pa.schema(fields) batch1 = pa.RecordBatch.from_pydict( {"a": [1, 2, 3], "b": [1.5, 2.5, 3.0]}, schema=schema, diff --git a/tests/table/generator/test_partitioned_function.py b/tests/table/generator/test_partitioned_function.py index 6f9f00b..3e066cd 100644 --- a/tests/table/generator/test_partitioned_function.py +++ b/tests/table/generator/test_partitioned_function.py @@ -1,5 +1,9 @@ """Tests for the PartitionedRangeFunction with multi-worker support.""" +from __future__ import annotations + +from typing import Any + import pyarrow as pa from vgi.client import Client @@ -10,6 +14,11 @@ from .conftest import RunnerWithMode +def _sorted_non_null(values: list[Any | None]) -> list[Any]: + """Return sorted list, filtering out None values for type safety.""" + return sorted(v for v in values if v is not None) + + class TestPartitionedRangeFunctionInProcess: """In-process tests for the partitioned_range function.""" @@ -23,7 +32,7 @@ def test_generates_full_range_single_worker(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 10 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(10)) def test_metadata(self) -> None: @@ -64,7 +73,7 @@ def test_values_are_sequential( outputs, logs = runner(PartitionedRangeFunction, (50,)) table = pa.Table.from_batches(outputs) - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(50)) @@ -84,7 +93,7 @@ def test_two_workers_produce_complete_range(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 20 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(20)) def test_three_workers_produce_complete_range(self) -> None: @@ -100,7 +109,7 @@ def test_three_workers_produce_complete_range(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 30 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(30)) def test_workers_produce_large_range(self) -> None: @@ -116,7 +125,7 @@ def test_workers_produce_large_range(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 10000 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(10000)) def test_uneven_distribution(self) -> None: @@ -134,7 +143,7 @@ def test_uneven_distribution(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 7 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(7)) def test_single_worker_fallback(self) -> None: @@ -150,5 +159,5 @@ def test_single_worker_fallback(self) -> None: table = pa.Table.from_batches(outputs) assert table.num_rows == 15 - values = sorted(table.column("value").to_pylist()) + values = _sorted_non_null(table.column("value").to_pylist()) assert values == list(range(15)) diff --git a/tests/table/generator/test_random_sample_function.py b/tests/table/generator/test_random_sample_function.py index eef94c4..a97612d 100644 --- a/tests/table/generator/test_random_sample_function.py +++ b/tests/table/generator/test_random_sample_function.py @@ -95,4 +95,4 @@ def test_values_in_range(self, run_table_function_mode: RunnerWithMode) -> None: table = pa.Table.from_batches(outputs) values = table.column("value").to_pylist() - assert all(0 <= v < 1 for v in values) + assert all(v is not None and 0 <= v < 1 for v in values) diff --git a/tests/table/test_function.py b/tests/table/test_function.py index c4c3ed0..566c561 100644 --- a/tests/table/test_function.py +++ b/tests/table/test_function.py @@ -1,9 +1,12 @@ """Generic tests for TableFunctionGenerator behavior.""" +from __future__ import annotations + import pyarrow as pa import pytest import structlog +from tests.utils import make_schema from vgi.function import Arguments, Invocation from vgi.table_function import ( CardinalityInfo, @@ -23,7 +26,7 @@ def test_empty_process_generator(self) -> None: class EmptyFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) with TableFunctionTestClient(EmptyFunction) as client: outputs = list(client.table_function()) @@ -37,7 +40,7 @@ def test_single_batch_output(self) -> None: class SingleBatchFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def process(self) -> OutputGenerator: yield Output( @@ -58,7 +61,7 @@ def test_multiple_batch_output(self) -> None: class MultiBatchFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("n", pa.int64())]) + return make_schema([pa.field("n", pa.int64())]) def process(self) -> OutputGenerator: for i in range(3): @@ -86,7 +89,7 @@ def test_setup_called_before_process(self) -> None: class LifecycleFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def setup(self) -> None: call_order.append("setup") @@ -112,7 +115,7 @@ def test_teardown_called_on_exception(self) -> None: class ExceptionFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def process(self) -> OutputGenerator: raise ValueError("test error") @@ -140,7 +143,7 @@ def test_valid_schema_passes(self) -> None: class ValidSchemaFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def process(self) -> OutputGenerator: yield Output( @@ -158,11 +161,11 @@ def test_invalid_schema_raises(self) -> None: class InvalidSchemaFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def process(self) -> OutputGenerator: # Return batch with wrong column name - wrong_schema = pa.schema([pa.field("y", pa.int64())]) + wrong_schema = make_schema([pa.field("y", pa.int64())]) wrong_batch = pa.RecordBatch.from_pydict( {"y": [1]}, schema=wrong_schema ) @@ -187,7 +190,7 @@ def test_default_cardinality_is_none(self) -> None: class NoCardinalityFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) invocation = Invocation( function_name="test", @@ -209,7 +212,7 @@ def test_custom_cardinality(self) -> None: class CardinalityFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("x", pa.int64())]) + return make_schema([pa.field("x", pa.int64())]) def cardinality(self) -> CardinalityInfo: return CardinalityInfo(estimate=100, max=1000) @@ -244,7 +247,7 @@ class ArgFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("n", pa.int64())]) + return make_schema([pa.field("n", pa.int64())]) def process(self) -> OutputGenerator: yield Output( @@ -270,7 +273,7 @@ class NamedArgFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema([pa.field("result", pa.int64())]) + return make_schema([pa.field("result", pa.int64())]) def process(self) -> OutputGenerator: yield Output( @@ -299,7 +302,7 @@ def test_empty_output_batch_property(self) -> None: class TestFunction(TableFunctionGenerator): @property def output_schema(self) -> pa.Schema: - return pa.schema( + return make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), diff --git a/tests/table_in_out/generator/test_repeat_inputs_function.py b/tests/table_in_out/generator/test_repeat_inputs_function.py index d4d1719..148c51d 100644 --- a/tests/table_in_out/generator/test_repeat_inputs_function.py +++ b/tests/table_in_out/generator/test_repeat_inputs_function.py @@ -1,7 +1,10 @@ """Tests for the RepeatInputsFunction (explosion).""" +from __future__ import annotations + import pyarrow as pa +from tests.utils import make_schema from vgi.client import Client from vgi.function import Arguments @@ -18,7 +21,7 @@ def test_repeat_custom_count( output_batches = list( client.table_in_out_function( function_name="repeat_inputs", - arguments=Arguments(positional=tuple([repeat_count]), named={}), + arguments=Arguments(positional=(pa.scalar(repeat_count),)), input=iter(simple_batches), ) ) @@ -35,7 +38,7 @@ def test_repeat_single_time( output_batches = list( client.table_in_out_function( function_name="repeat_inputs", - arguments=Arguments(positional=tuple([1]), named={}), + arguments=Arguments(positional=(pa.scalar(1),), named={}), input=iter(simple_batches), ) ) @@ -46,7 +49,7 @@ def test_repeat_single_time( def test_repeat_distributed_many_batches(self, example_worker: str) -> None: """Should correctly repeat across many batches with multiple workers.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) # Create 100 batches, each with 50 rows num_batches = 100 @@ -69,7 +72,7 @@ def test_repeat_distributed_many_batches(self, example_worker: str) -> None: output_batches = list( client.table_in_out_function( function_name="repeat_inputs", - arguments=Arguments(positional=tuple([repeat_count]), named={}), + arguments=Arguments(positional=(pa.scalar(repeat_count),)), input=iter(batches), ) ) @@ -80,7 +83,9 @@ def test_repeat_distributed_many_batches(self, example_worker: str) -> None: def test_repeat_distributed_preserves_data(self, example_worker: str) -> None: """Should preserve data correctly when repeated across workers.""" - schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.string())]) + schema = make_schema( + [pa.field("id", pa.int64()), pa.field("value", pa.string())] + ) # Create batches with distinct values to verify data integrity batches = [ @@ -101,7 +106,7 @@ def test_repeat_distributed_preserves_data(self, example_worker: str) -> None: output_batches = list( client.table_in_out_function( function_name="repeat_inputs", - arguments=Arguments(positional=tuple([repeat_count]), named={}), + arguments=Arguments(positional=(pa.scalar(repeat_count),)), input=iter(batches), ) ) diff --git a/tests/table_in_out/generator/test_sum_all_columns_function.py b/tests/table_in_out/generator/test_sum_all_columns_function.py index c955ed3..5c7f2ac 100644 --- a/tests/table_in_out/generator/test_sum_all_columns_function.py +++ b/tests/table_in_out/generator/test_sum_all_columns_function.py @@ -1,8 +1,11 @@ """Tests for the SumAllColumnsFunction (aggregation).""" +from __future__ import annotations + import pyarrow as pa import pytest +from tests.utils import make_schema from vgi.client import Client @@ -146,7 +149,7 @@ class TestSumAllColumnsFunctionDistributed: def test_sum_distributed_many_batches(self, example_worker: str) -> None: """Should correctly sum across multiple batches.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) # Create 20 batches, each with 100 rows (enough to exercise distributed path) num_batches = 20 @@ -209,7 +212,7 @@ def test_sum_distributed_excludes_non_numeric( def test_sum_distributed_empty_batch(self, example_worker: str) -> None: """Should handle empty batch correctly.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) empty_batch = pa.RecordBatch.from_pydict({"a": [], "b": []}, schema=schema) with Client(example_worker) as client: @@ -255,7 +258,7 @@ def test_sum_simple_distributed_basic( def test_sum_simple_distributed_many_batches(self, example_worker: str) -> None: """Should correctly sum across multiple batches.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) # Create 20 batches, each with 100 rows (enough to exercise distributed path) num_batches = 20 @@ -317,7 +320,7 @@ def test_sum_simple_distributed_excludes_non_numeric( def test_sum_simple_distributed_empty_batch(self, example_worker: str) -> None: """Should handle empty batch correctly.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) empty_batch = pa.RecordBatch.from_pydict({"a": [], "b": []}, schema=schema) with Client(example_worker) as client: diff --git a/tests/table_in_out/test_client.py b/tests/table_in_out/test_client.py index 9f82799..39896ae 100644 --- a/tests/table_in_out/test_client.py +++ b/tests/table_in_out/test_client.py @@ -1,9 +1,12 @@ """Tests for Client lifecycle, edge cases, and stderr capture.""" +from __future__ import annotations + import time import pyarrow as pa +from tests.utils import make_schema from vgi.client import Client @@ -23,7 +26,9 @@ class TestEdgeCases: def test_empty_batch(self, example_worker: str) -> None: """Empty batch (zero rows) should process correctly.""" - schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + schema = make_schema( + [pa.field("id", pa.int64()), pa.field("value", pa.int64())] + ) empty_batch = pa.RecordBatch.from_pydict({"id": [], "value": []}, schema=schema) with Client(example_worker) as client: @@ -42,7 +47,7 @@ def test_empty_batch(self, example_worker: str) -> None: def test_empty_batch_with_aggregation(self, example_worker: str) -> None: """Aggregation with empty batch should handle zero rows.""" - schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) + schema = make_schema([pa.field("a", pa.int64()), pa.field("b", pa.float64())]) empty_batch = pa.RecordBatch.from_pydict({"a": [], "b": []}, schema=schema) with Client(example_worker) as client: @@ -65,7 +70,9 @@ def test_empty_batch_with_aggregation(self, example_worker: str) -> None: def test_single_row_batch(self, example_worker: str) -> None: """Single row batch should process correctly.""" - schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + schema = make_schema( + [pa.field("id", pa.int64()), pa.field("value", pa.int64())] + ) single_row_batch = pa.RecordBatch.from_pydict( {"id": [1], "value": [100]}, schema=schema ) @@ -84,7 +91,7 @@ def test_single_row_batch(self, example_worker: str) -> None: def test_large_batch_count(self, example_worker: str) -> None: """Many small batches should process correctly.""" - schema = pa.schema([pa.field("id", pa.int64())]) + schema = make_schema([pa.field("id", pa.int64())]) batches = [ pa.RecordBatch.from_pydict({"id": [i]}, schema=schema) for i in range(50) ] diff --git a/tests/table_in_out/test_function.py b/tests/table_in_out/test_function.py index 55c6859..d624da8 100644 --- a/tests/table_in_out/test_function.py +++ b/tests/table_in_out/test_function.py @@ -275,7 +275,8 @@ def output_schema(self) -> pa.Schema: return pa.schema([("length", pa.int64())]) def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch: - lengths = pc.utf8_length(batch.column("name")) + # Type ignore: stubs don't properly type cast result as StringArray + lengths = pc.utf8_length(batch.column("name")) # type: ignore[call-overload] return pa.RecordBatch.from_arrays([lengths], schema=self.output_schema) schema = pa.schema([("name", pa.string())]) @@ -302,8 +303,12 @@ def test_transform_returns_empty_batch(self) -> None: class FilterOddFunction(TableInOutFunction): def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch: # Filter to only even values - mask = pc.equal(pc.bit_wise_and(batch.column("x"), 1), 0) - filtered = pc.filter(batch.column("x"), mask) + x_col = batch.column("x") + mask = pc.equal( + pc.bit_wise_and(x_col, pa.scalar(1, type=pa.int64())), + pa.scalar(0, type=pa.int64()), + ) + filtered = pc.filter(x_col, mask) if len(filtered) == 0: return self.empty_output_batch return pa.RecordBatch.from_arrays([filtered], schema=batch.schema) diff --git a/tests/table_in_out/test_streaming_decorator.py b/tests/table_in_out/test_streaming_decorator.py index b8bdbc8..0d15a04 100644 --- a/tests/table_in_out/test_streaming_decorator.py +++ b/tests/table_in_out/test_streaming_decorator.py @@ -21,8 +21,9 @@ class EchoStreamingFunction(TableInOutGeneratorFunction): @streaming def process(self, b: pa.RecordBatch) -> StreamingGenerator: """Process batches without priming yield.""" - while b is not None: - b = yield Output(b) + current: pa.RecordBatch | None = b + while current is not None: + current = yield Output(current) class CountingStreamingFunction(TableInOutGeneratorFunction): @@ -38,9 +39,10 @@ def __init__( @streaming def process(self, b: pa.RecordBatch) -> StreamingGenerator: """Process batches and count them.""" - while b is not None: + current: pa.RecordBatch | None = b + while current is not None: self.batch_count += 1 - b = yield Output(b) + current = yield Output(current) class AccumulatingStreamingFunction(TableInOutGeneratorFunction): @@ -61,12 +63,14 @@ def output_schema(self) -> pa.Schema: @streaming def process(self, b: pa.RecordBatch) -> StreamingGenerator: """Accumulate values from batches.""" - while b is not None: + current: pa.RecordBatch | None = b + while current is not None: # Accumulate - col = b.column(0) + col = current.column(0) for val in col.to_pylist(): - self.total += val - b = yield Output(self.empty_output_batch) + if val is not None: + self.total += val + current = yield Output(self.empty_output_batch) def finalize(self) -> OutputGenerator: """Emit final aggregation result.""" @@ -82,9 +86,10 @@ class LoggingStreamingFunction(TableInOutGeneratorFunction): @streaming def process(self, b: pa.RecordBatch) -> StreamingGenerator: """Process batches with logging.""" - while b is not None: - yield Message(Level.INFO, f"Processing {b.num_rows} rows") - b = yield Output(b) + current: pa.RecordBatch | None = b + while current is not None: + yield Message(Level.INFO, f"Processing {current.num_rows} rows") + current = yield Output(current) class TestStreamingDecorator: @@ -170,10 +175,11 @@ class ManualEcho(TableInOutGeneratorFunction): def process(self, b: pa.RecordBatch) -> OutputGenerator: """Manual process without decorator.""" _ = yield None + current: pa.RecordBatch | None = b while True: # Combined yield-and-receive (correct pattern) - b = yield Output(b) - if b is None: + current = yield Output(current) + if current is None: break # Need to create separate lists for each client since iter() is consumed diff --git a/tests/test_patterns.py b/tests/test_patterns.py index af1abd1..4857f41 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -1,5 +1,9 @@ """Tests for vgi.table_in_out_function_patterns base classes.""" +from __future__ import annotations + +from typing import Any + import pyarrow as pa import pyarrow.compute as pc import structlog @@ -38,12 +42,12 @@ def __init__( ) -> None: """Initialize with empty sums dict.""" super().__init__(invocation, logger) - self._sums: dict[str, pa.Scalar] = {} + self._sums: dict[str, pa.Scalar[Any]] = {} @property def output_schema(self) -> pa.Schema: """Build schema with numeric columns promoted to int64/float64.""" - fields = [] + fields: list[pa.Field[Any]] = [] for field in self.input_schema: if pa.types.is_integer(field.type): fields.append(pa.field(field.name, pa.int64())) @@ -177,9 +181,9 @@ def test_logs_accumulation(self) -> None: class PositiveFilter(FilterFunction): """Test filter: keep rows where 'value' column is positive.""" - def predicate(self, batch: pa.RecordBatch) -> pa.Array: + def predicate(self, batch: pa.RecordBatch) -> pa.Array[Any]: """Return True for positive values.""" - return pc.greater(batch.column("value"), 0) + return pc.greater(batch.column("value"), pa.scalar(0)) class RangeFilter(FilterFunction): @@ -188,11 +192,11 @@ class RangeFilter(FilterFunction): min_val = Arg[int](0) max_val = Arg[int](1) - def predicate(self, batch: pa.RecordBatch) -> pa.Array: + def predicate(self, batch: pa.RecordBatch) -> pa.Array[Any]: """Return True for values in the specified range.""" col = batch.column("value") - above_min = pc.greater_equal(col, self.min_val) - below_max = pc.less_equal(col, self.max_val) + above_min = pc.greater_equal(col, pa.scalar(self.min_val)) + below_max = pc.less_equal(col, pa.scalar(self.max_val)) return pc.and_(above_min, below_max) @@ -304,7 +308,7 @@ def test_filter_logs_stats(self) -> None: class DoubleValues(MapFunction): """Test map: double the 'value' column.""" - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: """Double the value column.""" return {"value": pc.multiply(batch.column("value"), 2)} @@ -312,7 +316,7 @@ def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: class MultiColumnMap(MapFunction): """Test map: transform multiple columns.""" - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: """Multiply a by 10, add 100 to b.""" return { "a": pc.multiply(batch.column("a"), 10), @@ -323,7 +327,7 @@ def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: class UpperCaseMap(MapFunction): """Test map: convert string column to uppercase.""" - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: """Convert name to uppercase.""" return {"name": pc.utf8_upper(batch.column("name"))} @@ -334,14 +338,13 @@ class CastToFloat(MapFunction): @property def output_schema(self) -> pa.Schema: """Output schema with value as float64.""" - return pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("value", pa.float64()), - ] - ) + fields: list[pa.Field[Any]] = [ + pa.field("id", pa.int64()), + pa.field("value", pa.float64()), + ] + return pa.schema(fields) - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: """Cast value to float64.""" return {"value": batch.column("value").cast(pa.float64())} diff --git a/tests/test_protocol_classes.py b/tests/test_protocol_classes.py index 17b1c62..eba677f 100644 --- a/tests/test_protocol_classes.py +++ b/tests/test_protocol_classes.py @@ -3,9 +3,12 @@ Tests cover Invocation, Arguments, GlobalInitResult, and table_function classes. """ +from __future__ import annotations + import pyarrow as pa import pytest +from tests.utils import make_schema from vgi.function import ( Arg, Arguments, @@ -31,7 +34,7 @@ def test_empty_arguments(self) -> None: assert encoded == {} # Decode from empty struct - schema = pa.schema([pa.field("args", pa.struct([]))]) + schema = make_schema([pa.field("args", pa.struct([]))]) batch = pa.RecordBatch.from_pylist([{"args": {}}], schema=schema) decoded = Arguments.decode(batch.column("args")[0]) assert decoded.positional == () @@ -153,7 +156,7 @@ def test_basic_round_trip(self) -> None: original = Invocation( function_name="test_function", arguments=Arguments(positional=(pa.scalar(42),), named={}), - in_out_function_input_schema=pa.schema([pa.field("col1", pa.int64())]), + in_out_function_input_schema=make_schema([pa.field("col1", pa.int64())]), correlation_id="test-123", invocation_id=b"bind-id-bytes", ) @@ -180,7 +183,7 @@ def test_basic_round_trip(self) -> None: assert deserialized.arguments.positional[0] is not None assert deserialized.arguments.positional[0].as_py() == 42 - def test_null_schema(self) -> None: + def test_nullmake_schema(self) -> None: """Invocation with null input schema should round-trip correctly.""" original = Invocation( function_name="scalar_function", @@ -200,9 +203,9 @@ def test_null_schema(self) -> None: assert deserialized.in_out_function_input_schema is None assert deserialized.invocation_id is None - def test_complex_schema(self) -> None: + def test_complexmake_schema(self) -> None: """Invocation with complex schema should round-trip correctly.""" - complex_schema = pa.schema( + complex_schema = make_schema( [ pa.field("int_col", pa.int32()), pa.field("float_col", pa.float64()), @@ -333,7 +336,7 @@ def test_has_identifier_false(self) -> None: """has_identifier should return False when field doesn't exist.""" batch = pa.RecordBatch.from_pylist( [{"other_field": "value"}], - schema=pa.schema([pa.field("other_field", pa.string())]), + schema=make_schema([pa.field("other_field", pa.string())]), ) assert GlobalInitResult.has_identifier(batch) is False @@ -540,7 +543,7 @@ class TestTableOutputSpec: def test_serialization_with_cardinality(self) -> None: """OutputSpec with cardinality should serialize correctly.""" spec = OutputSpec( - output_schema=pa.schema([pa.field("col1", pa.int64())]), + output_schema=make_schema([pa.field("col1", pa.int64())]), max_processes=4, invocation_id=b"test-id", cardinality=CardinalityInfo(estimate=100, max=1000), @@ -553,7 +556,7 @@ def test_serialization_with_cardinality(self) -> None: def test_serialization_without_cardinality(self) -> None: """OutputSpec without cardinality should serialize correctly.""" spec = OutputSpec( - output_schema=pa.schema([pa.field("col1", pa.int64())]), + output_schema=make_schema([pa.field("col1", pa.int64())]), max_processes=1, invocation_id=b"test-id", cardinality=None, @@ -565,7 +568,7 @@ def test_serialization_without_cardinality(self) -> None: def test_serialize_schema_includes_cardinality_fields(self) -> None: """Serialize schema should include cardinality fields.""" spec = OutputSpec( - output_schema=pa.schema([pa.field("col1", pa.int64())]), + output_schema=make_schema([pa.field("col1", pa.int64())]), max_processes=1, invocation_id=b"test-id", cardinality=CardinalityInfo(estimate=50, max=100), @@ -578,7 +581,7 @@ def test_serialize_schema_includes_cardinality_fields(self) -> None: def test_serialize_dict_includes_cardinality_values(self) -> None: """Serialize dict should include cardinality values.""" spec = OutputSpec( - output_schema=pa.schema([pa.field("col1", pa.int64())]), + output_schema=make_schema([pa.field("col1", pa.int64())]), max_processes=1, invocation_id=b"test-id", cardinality=CardinalityInfo(estimate=50, max=100), @@ -591,7 +594,7 @@ def test_serialize_dict_includes_cardinality_values(self) -> None: def test_serialize_dict_null_cardinality(self) -> None: """Serialize dict should handle null cardinality.""" spec = OutputSpec( - output_schema=pa.schema([pa.field("col1", pa.int64())]), + output_schema=make_schema([pa.field("col1", pa.int64())]), max_processes=1, invocation_id=b"test-id", cardinality=None, diff --git a/tests/test_schema_utils.py b/tests/test_schema_utils.py index 36a03fa..568f5a6 100644 --- a/tests/test_schema_utils.py +++ b/tests/test_schema_utils.py @@ -1,8 +1,11 @@ """Tests for vgi.schema_utils module.""" +from __future__ import annotations + import pyarrow as pa import pytest +from tests.utils import make_schema from vgi import schema, schema_like @@ -17,12 +20,12 @@ def test_empty_schema(self) -> None: def test_single_field(self) -> None: """schema() with one field.""" s = schema(x=pa.int64()) - assert s == pa.schema([pa.field("x", pa.int64())]) + assert s == make_schema([pa.field("x", pa.int64())]) def test_multiple_fields(self) -> None: """schema() with multiple fields preserves order.""" s = schema(a=pa.int64(), b=pa.string(), c=pa.float64()) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), @@ -33,9 +36,9 @@ def test_multiple_fields(self) -> None: def test_from_dict(self) -> None: """schema() accepts a dict as first positional argument.""" - fields = {"x": pa.int64(), "y": pa.string()} + fields: dict[str, pa.DataType] = {"x": pa.int64(), "y": pa.string()} s = schema(fields) - expected = pa.schema( + expected = make_schema( [ pa.field("x", pa.int64()), pa.field("y", pa.string()), @@ -46,7 +49,7 @@ def test_from_dict(self) -> None: def test_dict_plus_kwargs(self) -> None: """schema() combines dict and kwargs.""" s = schema({"a": pa.int64()}, b=pa.string()) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), @@ -57,7 +60,7 @@ def test_dict_plus_kwargs(self) -> None: def test_kwargs_override_dict(self) -> None: """Keyword args override dict values for same key.""" s = schema({"x": pa.int64()}, x=pa.string()) - assert s == pa.schema([pa.field("x", pa.string())]) + assert s == make_schema([pa.field("x", pa.string())]) def test_common_types(self) -> None: """schema() works with common Arrow types.""" @@ -82,7 +85,7 @@ def test_common_types(self) -> None: def test_invalid_type_raises(self) -> None: """schema() raises TypeError for non-DataType values.""" with pytest.raises(TypeError) as exc_info: - schema(x="int64") # Invalid type + schema(x="int64") # type: ignore[arg-type] # Intentionally invalid assert "Field 'x'" in str(exc_info.value) assert "expected pa.DataType" in str(exc_info.value) assert "got str" in str(exc_info.value) @@ -90,7 +93,7 @@ def test_invalid_type_raises(self) -> None: def test_invalid_type_in_dict_raises(self) -> None: """schema() raises TypeError for invalid types in dict.""" with pytest.raises(TypeError) as exc_info: - schema({"x": 123}) # Invalid type + schema({"x": 123}) # type: ignore[dict-item] # Intentionally invalid assert "Field 'x'" in str(exc_info.value) @@ -100,7 +103,7 @@ class TestSchemaLike: @pytest.fixture def base_schema(self) -> pa.Schema: """Fixture providing a base schema for tests.""" - return pa.schema( + return make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), @@ -116,7 +119,7 @@ def test_passthrough(self, base_schema: pa.Schema) -> None: def test_add_single_field(self, base_schema: pa.Schema) -> None: """schema_like() adds field at the end.""" result = schema_like(base_schema, add={"d": pa.bool_()}) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), @@ -136,7 +139,7 @@ def test_add_multiple_fields(self, base_schema: pa.Schema) -> None: def test_remove_single_field(self, base_schema: pa.Schema) -> None: """schema_like() removes a field.""" result = schema_like(base_schema, remove=["b"]) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int64()), pa.field("c", pa.float64()), @@ -147,13 +150,13 @@ def test_remove_single_field(self, base_schema: pa.Schema) -> None: def test_remove_multiple_fields(self, base_schema: pa.Schema) -> None: """schema_like() removes multiple fields.""" result = schema_like(base_schema, remove=["a", "c"]) - expected = pa.schema([pa.field("b", pa.string())]) + expected = make_schema([pa.field("b", pa.string())]) assert result == expected def test_rename_single_field(self, base_schema: pa.Schema) -> None: """schema_like() renames a field.""" result = schema_like(base_schema, rename={"a": "alpha"}) - expected = pa.schema( + expected = make_schema( [ pa.field("alpha", pa.int64()), pa.field("b", pa.string()), @@ -170,7 +173,7 @@ def test_rename_multiple_fields(self, base_schema: pa.Schema) -> None: def test_replace_type(self, base_schema: pa.Schema) -> None: """schema_like() replaces a field's type.""" result = schema_like(base_schema, replace={"a": pa.int32()}) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int32()), pa.field("b", pa.string()), @@ -194,7 +197,7 @@ def test_combined_operations(self, base_schema: pa.Schema) -> None: replace={"b": pa.large_string()}, add={"new_col": pa.bool_()}, ) - expected = pa.schema( + expected = make_schema( [ pa.field("id", pa.int64()), pa.field("b", pa.large_string()), @@ -212,7 +215,7 @@ def test_operation_order(self, base_schema: pa.Schema) -> None: rename={"b": "a"}, add={"b": pa.bool_()}, ) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.string()), # was 'b', renamed to 'a' pa.field("c", pa.float64()), @@ -250,7 +253,7 @@ def test_add_existing_raises(self, base_schema: pa.Schema) -> None: def test_add_invalid_type_raises(self, base_schema: pa.Schema) -> None: """schema_like() raises TypeError for invalid add type.""" with pytest.raises(TypeError) as exc_info: - schema_like(base_schema, add={"d": "int64"}) # Invalid type + schema_like(base_schema, add={"d": "int64"}) # type: ignore[dict-item] # Intentionally invalid assert "Field 'd'" in str(exc_info.value) assert "expected pa.DataType" in str(exc_info.value) @@ -306,7 +309,7 @@ def output_schema(self) -> pa.Schema: add={"total": pa.int64()}, ) - input_schema = pa.schema( + input_schema = make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), @@ -323,7 +326,7 @@ def output_schema(self) -> pa.Schema: logger = structlog.get_logger() func = TestFunction(invocation=invocation, logger=logger) - expected = pa.schema( + expected = make_schema( [ pa.field("a", pa.int64()), pa.field("b", pa.string()), diff --git a/tests/test_testing.py b/tests/test_testing.py index 2907f5b..98644fb 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -1,8 +1,11 @@ """Tests for vgi.testing.FunctionTestClient using example functions.""" +from __future__ import annotations + import pyarrow as pa import pytest +from tests.utils import make_schema from vgi.examples.table_in_out import ( BufferInputFunction, EchoFunction, @@ -381,7 +384,7 @@ def test_batch_creates_record_batch(self) -> None: def test_batch_with_explicit_schema(self) -> None: """batch() should respect explicit schema.""" - schema = pa.schema([("x", pa.int64()), ("y", pa.string())]) + schema = make_schema([("x", pa.int64()), ("y", pa.string())]) b = batch(schema, x=[1, 2, 3], y=["a", "b", "c"]) assert b.schema == schema @@ -389,7 +392,7 @@ def test_batch_with_explicit_schema(self) -> None: def test_batch_empty(self) -> None: """batch() should handle empty columns.""" - schema = pa.schema([("x", pa.int64())]) + schema = make_schema([("x", pa.int64())]) b = batch(schema, x=[]) assert b.num_rows == 0 diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..c31f844 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,14 @@ +"""Shared test utilities for VGI tests.""" + +from typing import Any + +import pyarrow as pa + + +def make_schema(fields: list[Any]) -> pa.Schema: + """Create schema with proper typing for field list. + + This is a helper to avoid mypy errors when creating schemas from + field tuples like [("name", pa.string())]. + """ + return pa.schema(fields) diff --git a/uv.lock b/uv.lock index 38549f8..673f465 100644 --- a/uv.lock +++ b/uv.lock @@ -102,6 +102,86 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/49/cb/940431d9410fda74f941f5cd7f0e5a22c63be7b0c10fa98b2b7022b48cb1/librt-0.7.5-cp314-cp314t-win_arm64.whl", hash = "sha256:08153ea537609d11f774d2bfe84af39d50d5c9ca3a4d061d946e0c9d8bce04a1", size = 39728, upload-time = "2025-12-25T03:53:03.306Z" }, ] +[[package]] +name = "lxml" +version = "6.0.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/aa/88/262177de60548e5a2bfc46ad28232c9e9cbde697bd94132aeb80364675cb/lxml-6.0.2.tar.gz", hash = "sha256:cd79f3367bd74b317dda655dc8fcfa304d9eb6e4fb06b7168c5cf27f96e0cd62", size = 4073426, upload-time = "2025-09-22T04:04:59.287Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f3/c8/8ff2bc6b920c84355146cd1ab7d181bc543b89241cfb1ebee824a7c81457/lxml-6.0.2-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:a59f5448ba2ceccd06995c95ea59a7674a10de0810f2ce90c9006f3cbc044456", size = 8661887, upload-time = "2025-09-22T04:01:17.265Z" }, + { url = "https://files.pythonhosted.org/packages/37/6f/9aae1008083bb501ef63284220ce81638332f9ccbfa53765b2b7502203cf/lxml-6.0.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:e8113639f3296706fbac34a30813929e29247718e88173ad849f57ca59754924", size = 4667818, upload-time = "2025-09-22T04:01:19.688Z" }, + { url = "https://files.pythonhosted.org/packages/f1/ca/31fb37f99f37f1536c133476674c10b577e409c0a624384147653e38baf2/lxml-6.0.2-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:a8bef9b9825fa8bc816a6e641bb67219489229ebc648be422af695f6e7a4fa7f", size = 4950807, upload-time = "2025-09-22T04:01:21.487Z" }, + { url = "https://files.pythonhosted.org/packages/da/87/f6cb9442e4bada8aab5ae7e1046264f62fdbeaa6e3f6211b93f4c0dd97f1/lxml-6.0.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:65ea18d710fd14e0186c2f973dc60bb52039a275f82d3c44a0e42b43440ea534", size = 5109179, upload-time = "2025-09-22T04:01:23.32Z" }, + { url = "https://files.pythonhosted.org/packages/c8/20/a7760713e65888db79bbae4f6146a6ae5c04e4a204a3c48896c408cd6ed2/lxml-6.0.2-cp312-cp312-manylinux_2_26_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c371aa98126a0d4c739ca93ceffa0fd7a5d732e3ac66a46e74339acd4d334564", size = 5023044, upload-time = "2025-09-22T04:01:25.118Z" }, + { url = "https://files.pythonhosted.org/packages/a2/b0/7e64e0460fcb36471899f75831509098f3fd7cd02a3833ac517433cb4f8f/lxml-6.0.2-cp312-cp312-manylinux_2_26_i686.manylinux_2_28_i686.whl", hash = "sha256:700efd30c0fa1a3581d80a748157397559396090a51d306ea59a70020223d16f", size = 5359685, upload-time = "2025-09-22T04:01:27.398Z" }, + { url = "https://files.pythonhosted.org/packages/b9/e1/e5df362e9ca4e2f48ed6411bd4b3a0ae737cc842e96877f5bf9428055ab4/lxml-6.0.2-cp312-cp312-manylinux_2_26_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:c33e66d44fe60e72397b487ee92e01da0d09ba2d66df8eae42d77b6d06e5eba0", size = 5654127, upload-time = "2025-09-22T04:01:29.629Z" }, + { url = "https://files.pythonhosted.org/packages/c6/d1/232b3309a02d60f11e71857778bfcd4acbdb86c07db8260caf7d008b08f8/lxml-6.0.2-cp312-cp312-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:90a345bbeaf9d0587a3aaffb7006aa39ccb6ff0e96a57286c0cb2fd1520ea192", size = 5253958, upload-time = "2025-09-22T04:01:31.535Z" }, + { url = "https://files.pythonhosted.org/packages/35/35/d955a070994725c4f7d80583a96cab9c107c57a125b20bb5f708fe941011/lxml-6.0.2-cp312-cp312-manylinux_2_31_armv7l.whl", hash = "sha256:064fdadaf7a21af3ed1dcaa106b854077fbeada827c18f72aec9346847cd65d0", size = 4711541, upload-time = "2025-09-22T04:01:33.801Z" }, + { url = "https://files.pythonhosted.org/packages/1e/be/667d17363b38a78c4bd63cfd4b4632029fd68d2c2dc81f25ce9eb5224dd5/lxml-6.0.2-cp312-cp312-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:fbc74f42c3525ac4ffa4b89cbdd00057b6196bcefe8bce794abd42d33a018092", size = 5267426, upload-time = "2025-09-22T04:01:35.639Z" }, + { url = "https://files.pythonhosted.org/packages/ea/47/62c70aa4a1c26569bc958c9ca86af2bb4e1f614e8c04fb2989833874f7ae/lxml-6.0.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6ddff43f702905a4e32bc24f3f2e2edfe0f8fde3277d481bffb709a4cced7a1f", size = 5064917, upload-time = "2025-09-22T04:01:37.448Z" }, + { url = "https://files.pythonhosted.org/packages/bd/55/6ceddaca353ebd0f1908ef712c597f8570cc9c58130dbb89903198e441fd/lxml-6.0.2-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:6da5185951d72e6f5352166e3da7b0dc27aa70bd1090b0eb3f7f7212b53f1bb8", size = 4788795, upload-time = "2025-09-22T04:01:39.165Z" }, + { url = "https://files.pythonhosted.org/packages/cf/e8/fd63e15da5e3fd4c2146f8bbb3c14e94ab850589beab88e547b2dbce22e1/lxml-6.0.2-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:57a86e1ebb4020a38d295c04fc79603c7899e0df71588043eb218722dabc087f", size = 5676759, upload-time = "2025-09-22T04:01:41.506Z" }, + { url = "https://files.pythonhosted.org/packages/76/47/b3ec58dc5c374697f5ba37412cd2728f427d056315d124dd4b61da381877/lxml-6.0.2-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:2047d8234fe735ab77802ce5f2297e410ff40f5238aec569ad7c8e163d7b19a6", size = 5255666, upload-time = "2025-09-22T04:01:43.363Z" }, + { url = "https://files.pythonhosted.org/packages/19/93/03ba725df4c3d72afd9596eef4a37a837ce8e4806010569bedfcd2cb68fd/lxml-6.0.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6f91fd2b2ea15a6800c8e24418c0775a1694eefc011392da73bc6cef2623b322", size = 5277989, upload-time = "2025-09-22T04:01:45.215Z" }, + { url = "https://files.pythonhosted.org/packages/c6/80/c06de80bfce881d0ad738576f243911fccf992687ae09fd80b734712b39c/lxml-6.0.2-cp312-cp312-win32.whl", hash = "sha256:3ae2ce7d6fedfb3414a2b6c5e20b249c4c607f72cb8d2bb7cc9c6ec7c6f4e849", size = 3611456, upload-time = "2025-09-22T04:01:48.243Z" }, + { url = "https://files.pythonhosted.org/packages/f7/d7/0cdfb6c3e30893463fb3d1e52bc5f5f99684a03c29a0b6b605cfae879cd5/lxml-6.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:72c87e5ee4e58a8354fb9c7c84cbf95a1c8236c127a5d1b7683f04bed8361e1f", size = 4011793, upload-time = "2025-09-22T04:01:50.042Z" }, + { url = "https://files.pythonhosted.org/packages/ea/7b/93c73c67db235931527301ed3785f849c78991e2e34f3fd9a6663ffda4c5/lxml-6.0.2-cp312-cp312-win_arm64.whl", hash = "sha256:61cb10eeb95570153e0c0e554f58df92ecf5109f75eacad4a95baa709e26c3d6", size = 3672836, upload-time = "2025-09-22T04:01:52.145Z" }, + { url = "https://files.pythonhosted.org/packages/53/fd/4e8f0540608977aea078bf6d79f128e0e2c2bba8af1acf775c30baa70460/lxml-6.0.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:9b33d21594afab46f37ae58dfadd06636f154923c4e8a4d754b0127554eb2e77", size = 8648494, upload-time = "2025-09-22T04:01:54.242Z" }, + { url = "https://files.pythonhosted.org/packages/5d/f4/2a94a3d3dfd6c6b433501b8d470a1960a20ecce93245cf2db1706adf6c19/lxml-6.0.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:6c8963287d7a4c5c9a432ff487c52e9c5618667179c18a204bdedb27310f022f", size = 4661146, upload-time = "2025-09-22T04:01:56.282Z" }, + { url = "https://files.pythonhosted.org/packages/25/2e/4efa677fa6b322013035d38016f6ae859d06cac67437ca7dc708a6af7028/lxml-6.0.2-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1941354d92699fb5ffe6ed7b32f9649e43c2feb4b97205f75866f7d21aa91452", size = 4946932, upload-time = "2025-09-22T04:01:58.989Z" }, + { url = "https://files.pythonhosted.org/packages/ce/0f/526e78a6d38d109fdbaa5049c62e1d32fdd70c75fb61c4eadf3045d3d124/lxml-6.0.2-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:bb2f6ca0ae2d983ded09357b84af659c954722bbf04dea98030064996d156048", size = 5100060, upload-time = "2025-09-22T04:02:00.812Z" }, + { url = "https://files.pythonhosted.org/packages/81/76/99de58d81fa702cc0ea7edae4f4640416c2062813a00ff24bd70ac1d9c9b/lxml-6.0.2-cp313-cp313-manylinux_2_26_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:eb2a12d704f180a902d7fa778c6d71f36ceb7b0d317f34cdc76a5d05aa1dd1df", size = 5019000, upload-time = "2025-09-22T04:02:02.671Z" }, + { url = "https://files.pythonhosted.org/packages/b5/35/9e57d25482bc9a9882cb0037fdb9cc18f4b79d85df94fa9d2a89562f1d25/lxml-6.0.2-cp313-cp313-manylinux_2_26_i686.manylinux_2_28_i686.whl", hash = "sha256:6ec0e3f745021bfed19c456647f0298d60a24c9ff86d9d051f52b509663feeb1", size = 5348496, upload-time = "2025-09-22T04:02:04.904Z" }, + { url = "https://files.pythonhosted.org/packages/a6/8e/cb99bd0b83ccc3e8f0f528e9aa1f7a9965dfec08c617070c5db8d63a87ce/lxml-6.0.2-cp313-cp313-manylinux_2_26_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:846ae9a12d54e368933b9759052d6206a9e8b250291109c48e350c1f1f49d916", size = 5643779, upload-time = "2025-09-22T04:02:06.689Z" }, + { url = "https://files.pythonhosted.org/packages/d0/34/9e591954939276bb679b73773836c6684c22e56d05980e31d52a9a8deb18/lxml-6.0.2-cp313-cp313-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ef9266d2aa545d7374938fb5c484531ef5a2ec7f2d573e62f8ce722c735685fd", size = 5244072, upload-time = "2025-09-22T04:02:08.587Z" }, + { url = "https://files.pythonhosted.org/packages/8d/27/b29ff065f9aaca443ee377aff699714fcbffb371b4fce5ac4ca759e436d5/lxml-6.0.2-cp313-cp313-manylinux_2_31_armv7l.whl", hash = "sha256:4077b7c79f31755df33b795dc12119cb557a0106bfdab0d2c2d97bd3cf3dffa6", size = 4718675, upload-time = "2025-09-22T04:02:10.783Z" }, + { url = "https://files.pythonhosted.org/packages/2b/9f/f756f9c2cd27caa1a6ef8c32ae47aadea697f5c2c6d07b0dae133c244fbe/lxml-6.0.2-cp313-cp313-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:a7c5d5e5f1081955358533be077166ee97ed2571d6a66bdba6ec2f609a715d1a", size = 5255171, upload-time = "2025-09-22T04:02:12.631Z" }, + { url = "https://files.pythonhosted.org/packages/61/46/bb85ea42d2cb1bd8395484fd72f38e3389611aa496ac7772da9205bbda0e/lxml-6.0.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:8f8d0cbd0674ee89863a523e6994ac25fd5be9c8486acfc3e5ccea679bad2679", size = 5057175, upload-time = "2025-09-22T04:02:14.718Z" }, + { url = "https://files.pythonhosted.org/packages/95/0c/443fc476dcc8e41577f0af70458c50fe299a97bb6b7505bb1ae09aa7f9ac/lxml-6.0.2-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:2cbcbf6d6e924c28f04a43f3b6f6e272312a090f269eff68a2982e13e5d57659", size = 4785688, upload-time = "2025-09-22T04:02:16.957Z" }, + { url = "https://files.pythonhosted.org/packages/48/78/6ef0b359d45bb9697bc5a626e1992fa5d27aa3f8004b137b2314793b50a0/lxml-6.0.2-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:dfb874cfa53340009af6bdd7e54ebc0d21012a60a4e65d927c2e477112e63484", size = 5660655, upload-time = "2025-09-22T04:02:18.815Z" }, + { url = "https://files.pythonhosted.org/packages/ff/ea/e1d33808f386bc1339d08c0dcada6e4712d4ed8e93fcad5f057070b7988a/lxml-6.0.2-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:fb8dae0b6b8b7f9e96c26fdd8121522ce5de9bb5538010870bd538683d30e9a2", size = 5247695, upload-time = "2025-09-22T04:02:20.593Z" }, + { url = "https://files.pythonhosted.org/packages/4f/47/eba75dfd8183673725255247a603b4ad606f4ae657b60c6c145b381697da/lxml-6.0.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:358d9adae670b63e95bc59747c72f4dc97c9ec58881d4627fe0120da0f90d314", size = 5269841, upload-time = "2025-09-22T04:02:22.489Z" }, + { url = "https://files.pythonhosted.org/packages/76/04/5c5e2b8577bc936e219becb2e98cdb1aca14a4921a12995b9d0c523502ae/lxml-6.0.2-cp313-cp313-win32.whl", hash = "sha256:e8cd2415f372e7e5a789d743d133ae474290a90b9023197fd78f32e2dc6873e2", size = 3610700, upload-time = "2025-09-22T04:02:24.465Z" }, + { url = "https://files.pythonhosted.org/packages/fe/0a/4643ccc6bb8b143e9f9640aa54e38255f9d3b45feb2cbe7ae2ca47e8782e/lxml-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:b30d46379644fbfc3ab81f8f82ae4de55179414651f110a1514f0b1f8f6cb2d7", size = 4010347, upload-time = "2025-09-22T04:02:26.286Z" }, + { url = "https://files.pythonhosted.org/packages/31/ef/dcf1d29c3f530577f61e5fe2f1bd72929acf779953668a8a47a479ae6f26/lxml-6.0.2-cp313-cp313-win_arm64.whl", hash = "sha256:13dcecc9946dca97b11b7c40d29fba63b55ab4170d3c0cf8c0c164343b9bfdcf", size = 3671248, upload-time = "2025-09-22T04:02:27.918Z" }, + { url = "https://files.pythonhosted.org/packages/03/15/d4a377b385ab693ce97b472fe0c77c2b16ec79590e688b3ccc71fba19884/lxml-6.0.2-cp314-cp314-macosx_10_13_universal2.whl", hash = "sha256:b0c732aa23de8f8aec23f4b580d1e52905ef468afb4abeafd3fec77042abb6fe", size = 8659801, upload-time = "2025-09-22T04:02:30.113Z" }, + { url = "https://files.pythonhosted.org/packages/c8/e8/c128e37589463668794d503afaeb003987373c5f94d667124ffd8078bbd9/lxml-6.0.2-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:4468e3b83e10e0317a89a33d28f7aeba1caa4d1a6fd457d115dd4ffe90c5931d", size = 4659403, upload-time = "2025-09-22T04:02:32.119Z" }, + { url = "https://files.pythonhosted.org/packages/00/ce/74903904339decdf7da7847bb5741fc98a5451b42fc419a86c0c13d26fe2/lxml-6.0.2-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:abd44571493973bad4598a3be7e1d807ed45aa2adaf7ab92ab7c62609569b17d", size = 4966974, upload-time = "2025-09-22T04:02:34.155Z" }, + { url = "https://files.pythonhosted.org/packages/1f/d3/131dec79ce61c5567fecf82515bd9bc36395df42501b50f7f7f3bd065df0/lxml-6.0.2-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:370cd78d5855cfbffd57c422851f7d3864e6ae72d0da615fca4dad8c45d375a5", size = 5102953, upload-time = "2025-09-22T04:02:36.054Z" }, + { url = "https://files.pythonhosted.org/packages/3a/ea/a43ba9bb750d4ffdd885f2cd333572f5bb900cd2408b67fdda07e85978a0/lxml-6.0.2-cp314-cp314-manylinux_2_26_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:901e3b4219fa04ef766885fb40fa516a71662a4c61b80c94d25336b4934b71c0", size = 5055054, upload-time = "2025-09-22T04:02:38.154Z" }, + { url = "https://files.pythonhosted.org/packages/60/23/6885b451636ae286c34628f70a7ed1fcc759f8d9ad382d132e1c8d3d9bfd/lxml-6.0.2-cp314-cp314-manylinux_2_26_i686.manylinux_2_28_i686.whl", hash = "sha256:a4bf42d2e4cf52c28cc1812d62426b9503cdb0c87a6de81442626aa7d69707ba", size = 5352421, upload-time = "2025-09-22T04:02:40.413Z" }, + { url = "https://files.pythonhosted.org/packages/48/5b/fc2ddfc94ddbe3eebb8e9af6e3fd65e2feba4967f6a4e9683875c394c2d8/lxml-6.0.2-cp314-cp314-manylinux_2_26_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:b2c7fdaa4d7c3d886a42534adec7cfac73860b89b4e5298752f60aa5984641a0", size = 5673684, upload-time = "2025-09-22T04:02:42.288Z" }, + { url = "https://files.pythonhosted.org/packages/29/9c/47293c58cc91769130fbf85531280e8cc7868f7fbb6d92f4670071b9cb3e/lxml-6.0.2-cp314-cp314-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:98a5e1660dc7de2200b00d53fa00bcd3c35a3608c305d45a7bbcaf29fa16e83d", size = 5252463, upload-time = "2025-09-22T04:02:44.165Z" }, + { url = "https://files.pythonhosted.org/packages/9b/da/ba6eceb830c762b48e711ded880d7e3e89fc6c7323e587c36540b6b23c6b/lxml-6.0.2-cp314-cp314-manylinux_2_31_armv7l.whl", hash = "sha256:dc051506c30b609238d79eda75ee9cab3e520570ec8219844a72a46020901e37", size = 4698437, upload-time = "2025-09-22T04:02:46.524Z" }, + { url = "https://files.pythonhosted.org/packages/a5/24/7be3f82cb7990b89118d944b619e53c656c97dc89c28cfb143fdb7cd6f4d/lxml-6.0.2-cp314-cp314-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:8799481bbdd212470d17513a54d568f44416db01250f49449647b5ab5b5dccb9", size = 5269890, upload-time = "2025-09-22T04:02:48.812Z" }, + { url = "https://files.pythonhosted.org/packages/1b/bd/dcfb9ea1e16c665efd7538fc5d5c34071276ce9220e234217682e7d2c4a5/lxml-6.0.2-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:9261bb77c2dab42f3ecd9103951aeca2c40277701eb7e912c545c1b16e0e4917", size = 5097185, upload-time = "2025-09-22T04:02:50.746Z" }, + { url = "https://files.pythonhosted.org/packages/21/04/a60b0ff9314736316f28316b694bccbbabe100f8483ad83852d77fc7468e/lxml-6.0.2-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:65ac4a01aba353cfa6d5725b95d7aed6356ddc0a3cd734de00124d285b04b64f", size = 4745895, upload-time = "2025-09-22T04:02:52.968Z" }, + { url = "https://files.pythonhosted.org/packages/d6/bd/7d54bd1846e5a310d9c715921c5faa71cf5c0853372adf78aee70c8d7aa2/lxml-6.0.2-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:b22a07cbb82fea98f8a2fd814f3d1811ff9ed76d0fc6abc84eb21527596e7cc8", size = 5695246, upload-time = "2025-09-22T04:02:54.798Z" }, + { url = "https://files.pythonhosted.org/packages/fd/32/5643d6ab947bc371da21323acb2a6e603cedbe71cb4c99c8254289ab6f4e/lxml-6.0.2-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:d759cdd7f3e055d6bc8d9bec3ad905227b2e4c785dc16c372eb5b5e83123f48a", size = 5260797, upload-time = "2025-09-22T04:02:57.058Z" }, + { url = "https://files.pythonhosted.org/packages/33/da/34c1ec4cff1eea7d0b4cd44af8411806ed943141804ac9c5d565302afb78/lxml-6.0.2-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:945da35a48d193d27c188037a05fec5492937f66fb1958c24fc761fb9d40d43c", size = 5277404, upload-time = "2025-09-22T04:02:58.966Z" }, + { url = "https://files.pythonhosted.org/packages/82/57/4eca3e31e54dc89e2c3507e1cd411074a17565fa5ffc437c4ae0a00d439e/lxml-6.0.2-cp314-cp314-win32.whl", hash = "sha256:be3aaa60da67e6153eb15715cc2e19091af5dc75faef8b8a585aea372507384b", size = 3670072, upload-time = "2025-09-22T04:03:38.05Z" }, + { url = "https://files.pythonhosted.org/packages/e3/e0/c96cf13eccd20c9421ba910304dae0f619724dcf1702864fd59dd386404d/lxml-6.0.2-cp314-cp314-win_amd64.whl", hash = "sha256:fa25afbadead523f7001caf0c2382afd272c315a033a7b06336da2637d92d6ed", size = 4080617, upload-time = "2025-09-22T04:03:39.835Z" }, + { url = "https://files.pythonhosted.org/packages/d5/5d/b3f03e22b3d38d6f188ef044900a9b29b2fe0aebb94625ce9fe244011d34/lxml-6.0.2-cp314-cp314-win_arm64.whl", hash = "sha256:063eccf89df5b24e361b123e257e437f9e9878f425ee9aae3144c77faf6da6d8", size = 3754930, upload-time = "2025-09-22T04:03:41.565Z" }, + { url = "https://files.pythonhosted.org/packages/5e/5c/42c2c4c03554580708fc738d13414801f340c04c3eff90d8d2d227145275/lxml-6.0.2-cp314-cp314t-macosx_10_13_universal2.whl", hash = "sha256:6162a86d86893d63084faaf4ff937b3daea233e3682fb4474db07395794fa80d", size = 8910380, upload-time = "2025-09-22T04:03:01.645Z" }, + { url = "https://files.pythonhosted.org/packages/bf/4f/12df843e3e10d18d468a7557058f8d3733e8b6e12401f30b1ef29360740f/lxml-6.0.2-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:414aaa94e974e23a3e92e7ca5b97d10c0cf37b6481f50911032c69eeb3991bba", size = 4775632, upload-time = "2025-09-22T04:03:03.814Z" }, + { url = "https://files.pythonhosted.org/packages/e4/0c/9dc31e6c2d0d418483cbcb469d1f5a582a1cd00a1f4081953d44051f3c50/lxml-6.0.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:48461bd21625458dd01e14e2c38dd0aea69addc3c4f960c30d9f59d7f93be601", size = 4975171, upload-time = "2025-09-22T04:03:05.651Z" }, + { url = "https://files.pythonhosted.org/packages/e7/2b/9b870c6ca24c841bdd887504808f0417aa9d8d564114689266f19ddf29c8/lxml-6.0.2-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:25fcc59afc57d527cfc78a58f40ab4c9b8fd096a9a3f964d2781ffb6eb33f4ed", size = 5110109, upload-time = "2025-09-22T04:03:07.452Z" }, + { url = "https://files.pythonhosted.org/packages/bf/0c/4f5f2a4dd319a178912751564471355d9019e220c20d7db3fb8307ed8582/lxml-6.0.2-cp314-cp314t-manylinux_2_26_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5179c60288204e6ddde3f774a93350177e08876eaf3ab78aa3a3649d43eb7d37", size = 5041061, upload-time = "2025-09-22T04:03:09.297Z" }, + { url = "https://files.pythonhosted.org/packages/12/64/554eed290365267671fe001a20d72d14f468ae4e6acef1e179b039436967/lxml-6.0.2-cp314-cp314t-manylinux_2_26_i686.manylinux_2_28_i686.whl", hash = "sha256:967aab75434de148ec80597b75062d8123cadf2943fb4281f385141e18b21338", size = 5306233, upload-time = "2025-09-22T04:03:11.651Z" }, + { url = "https://files.pythonhosted.org/packages/7a/31/1d748aa275e71802ad9722df32a7a35034246b42c0ecdd8235412c3396ef/lxml-6.0.2-cp314-cp314t-manylinux_2_26_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:d100fcc8930d697c6561156c6810ab4a508fb264c8b6779e6e61e2ed5e7558f9", size = 5604739, upload-time = "2025-09-22T04:03:13.592Z" }, + { url = "https://files.pythonhosted.org/packages/8f/41/2c11916bcac09ed561adccacceaedd2bf0e0b25b297ea92aab99fd03d0fa/lxml-6.0.2-cp314-cp314t-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2ca59e7e13e5981175b8b3e4ab84d7da57993eeff53c07764dcebda0d0e64ecd", size = 5225119, upload-time = "2025-09-22T04:03:15.408Z" }, + { url = "https://files.pythonhosted.org/packages/99/05/4e5c2873d8f17aa018e6afde417c80cc5d0c33be4854cce3ef5670c49367/lxml-6.0.2-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:957448ac63a42e2e49531b9d6c0fa449a1970dbc32467aaad46f11545be9af1d", size = 4633665, upload-time = "2025-09-22T04:03:17.262Z" }, + { url = "https://files.pythonhosted.org/packages/0f/c9/dcc2da1bebd6275cdc723b515f93edf548b82f36a5458cca3578bc899332/lxml-6.0.2-cp314-cp314t-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:b7fc49c37f1786284b12af63152fe1d0990722497e2d5817acfe7a877522f9a9", size = 5234997, upload-time = "2025-09-22T04:03:19.14Z" }, + { url = "https://files.pythonhosted.org/packages/9c/e2/5172e4e7468afca64a37b81dba152fc5d90e30f9c83c7c3213d6a02a5ce4/lxml-6.0.2-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e19e0643cc936a22e837f79d01a550678da8377d7d801a14487c10c34ee49c7e", size = 5090957, upload-time = "2025-09-22T04:03:21.436Z" }, + { url = "https://files.pythonhosted.org/packages/a5/b3/15461fd3e5cd4ddcb7938b87fc20b14ab113b92312fc97afe65cd7c85de1/lxml-6.0.2-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:1db01e5cf14345628e0cbe71067204db658e2fb8e51e7f33631f5f4735fefd8d", size = 4764372, upload-time = "2025-09-22T04:03:23.27Z" }, + { url = "https://files.pythonhosted.org/packages/05/33/f310b987c8bf9e61c4dd8e8035c416bd3230098f5e3cfa69fc4232de7059/lxml-6.0.2-cp314-cp314t-musllinux_1_2_ppc64le.whl", hash = "sha256:875c6b5ab39ad5291588aed6925fac99d0097af0dd62f33c7b43736043d4a2ec", size = 5634653, upload-time = "2025-09-22T04:03:25.767Z" }, + { url = "https://files.pythonhosted.org/packages/70/ff/51c80e75e0bc9382158133bdcf4e339b5886c6ee2418b5199b3f1a61ed6d/lxml-6.0.2-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:cdcbed9ad19da81c480dfd6dd161886db6096083c9938ead313d94b30aadf272", size = 5233795, upload-time = "2025-09-22T04:03:27.62Z" }, + { url = "https://files.pythonhosted.org/packages/56/4d/4856e897df0d588789dd844dbed9d91782c4ef0b327f96ce53c807e13128/lxml-6.0.2-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:80dadc234ebc532e09be1975ff538d154a7fa61ea5031c03d25178855544728f", size = 5257023, upload-time = "2025-09-22T04:03:30.056Z" }, + { url = "https://files.pythonhosted.org/packages/0f/85/86766dfebfa87bea0ab78e9ff7a4b4b45225df4b4d3b8cc3c03c5cd68464/lxml-6.0.2-cp314-cp314t-win32.whl", hash = "sha256:da08e7bb297b04e893d91087df19638dc7a6bb858a954b0cc2b9f5053c922312", size = 3911420, upload-time = "2025-09-22T04:03:32.198Z" }, + { url = "https://files.pythonhosted.org/packages/fe/1a/b248b355834c8e32614650b8008c69ffeb0ceb149c793961dd8c0b991bb3/lxml-6.0.2-cp314-cp314t-win_amd64.whl", hash = "sha256:252a22982dca42f6155125ac76d3432e548a7625d56f5a273ee78a5057216eca", size = 4406837, upload-time = "2025-09-22T04:03:34.027Z" }, + { url = "https://files.pythonhosted.org/packages/92/aa/df863bcc39c5e0946263454aba394de8a9084dbaff8ad143846b0d844739/lxml-6.0.2-cp314-cp314t-win_arm64.whl", hash = "sha256:bb4c1847b303835d89d785a18801a883436cdfd5dc3d62947f9c49e24f0f5a2c", size = 3822205, upload-time = "2025-09-22T04:03:36.249Z" }, +] + [[package]] name = "mypy" version = "1.19.1" @@ -223,6 +303,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7b/03/f335d6c52b4a4761bcc83499789a1e2e16d9d201a58c327a9b5cc9a41bd9/pyarrow-22.0.0-cp314-cp314t-win_amd64.whl", hash = "sha256:0c34fe18094686194f204a3b1787a27456897d8a2d62caf84b61e8dfbc0252ae", size = 29185594, upload-time = "2025-10-24T10:09:53.111Z" }, ] +[[package]] +name = "pyarrow-stubs" +version = "20.0.0.20251215" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyarrow" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/30/16/ca991ada0416dc02c246e4b3c853f35053676eb704c63720ef24547d2d68/pyarrow_stubs-20.0.0.20251215.tar.gz", hash = "sha256:92c1fda4998f0c13e608d8abc7e4b8537e3ef108f6bf42c58e5af97e7d143e75", size = 236578, upload-time = "2025-12-15T06:47:46.187Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/7c/a467f91c6c3aa4c34d5a1dafca9ae4db01b757877577d6b0ae04908a0180/pyarrow_stubs-20.0.0.20251215-py3-none-any.whl", hash = "sha256:0634e70388cd23e7c78e2abbb1989822edc34df2d2ff4fd50a2316dd0cdafd9f", size = 235697, upload-time = "2025-12-15T06:47:44.643Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -275,6 +367,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1d/ca/abfce6de0bb0f017ed05ef9f2330596235cc5a3341b4d8d40895682b3814/pytest_ruff-0.5-py3-none-any.whl", hash = "sha256:d9db170d86fb167008e6702b4d79e2cccd8287f069c3a57f9261831cebdc4a31", size = 4680, upload-time = "2025-06-19T07:26:23.897Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "pytest-xdist" version = "3.8.0" @@ -346,6 +450,7 @@ dependencies = [ [package.optional-dependencies] dev = [ { name = "mypy" }, + { name = "pyarrow-stubs" }, { name = "pytest" }, { name = "pytest-mypy" }, { name = "pytest-ruff" }, @@ -353,12 +458,19 @@ dev = [ { name = "ruff" }, ] +[package.dev-dependencies] +dev = [ + { name = "lxml" }, + { name = "pytest-timeout" }, +] + [package.metadata] requires-dist = [ { name = "click" }, { name = "mypy", marker = "extra == 'dev'" }, { name = "platformdirs" }, { name = "pyarrow" }, + { name = "pyarrow-stubs", marker = "extra == 'dev'" }, { name = "pytest", marker = "extra == 'dev'" }, { name = "pytest-mypy", marker = "extra == 'dev'" }, { name = "pytest-ruff", marker = "extra == 'dev'" }, @@ -367,3 +479,9 @@ requires-dist = [ { name = "structlog" }, ] provides-extras = ["dev"] + +[package.metadata.requires-dev] +dev = [ + { name = "lxml", specifier = ">=6.0.2" }, + { name = "pytest-timeout", specifier = ">=2.4.0" }, +] diff --git a/vgi/arguments.py b/vgi/arguments.py index 2b37970..d6a77aa 100644 --- a/vgi/arguments.py +++ b/vgi/arguments.py @@ -22,10 +22,13 @@ class MyFunction(TableInOutFunction): import re from collections.abc import Sequence from dataclasses import dataclass -from typing import Any, TypeVar, overload +from typing import TYPE_CHECKING, Any, TypeVar, overload import pyarrow as pa +if TYPE_CHECKING: + from pyarrow import Scalar + # Sentinel for missing default value _MISSING: Any = object() @@ -90,8 +93,8 @@ class Arguments: """ - positional: tuple[pa.Scalar | None, ...] = () - named: dict[str, pa.Scalar] | None = None + positional: tuple["Scalar[Any] | None", ...] = () + named: dict[str, "Scalar[Any]"] | None = None def get( self, @@ -169,7 +172,7 @@ def get( return scalar.as_py() - def encoded_dict(self) -> dict[str, pa.Scalar | None]: + def encoded_dict(self) -> dict[str, "Scalar[Any] | None"]: """Convert arguments to a dictionary suitable for serialization. Positional arguments are stored with keys "positional_0", "positional_1", etc. @@ -215,8 +218,8 @@ def decode(data: pa.StructScalar) -> "Arguments": Deserialized Arguments instance. """ - positional: list[pa.Scalar | None] = [] - named: dict[str, pa.Scalar] = {} + positional: list[Scalar[Any] | None] = [] + named: dict[str, Scalar[Any]] = {} for key, value in data.items(): if key.startswith("positional_"): index = int(key[len("positional_") :]) diff --git a/vgi/client/cli.py b/vgi/client/cli.py index 17fc38e..6125334 100644 --- a/vgi/client/cli.py +++ b/vgi/client/cli.py @@ -18,9 +18,10 @@ """ +import io import json import sys -from typing import Any +from typing import Any, cast import pyarrow as pa @@ -67,7 +68,8 @@ def write_batch(self, batch: pa.RecordBatch) -> None: if self._writer is None: if self._is_stdout: self._writer = pq.ParquetWriter( - pa.PythonFile(sys.stdout.buffer, mode="w"), batch.schema + pa.PythonFile(cast(io.IOBase, sys.stdout.buffer), mode="w"), + batch.schema, ) else: self._writer = pq.ParquetWriter(self.output_file, batch.schema) diff --git a/vgi/client/client.py b/vgi/client/client.py index 2e8e21e..26ffb4e 100644 --- a/vgi/client/client.py +++ b/vgi/client/client.py @@ -50,6 +50,8 @@ """ +from __future__ import annotations + import io import json import os @@ -59,10 +61,11 @@ from collections.abc import Callable, Generator, Iterator from dataclasses import dataclass from queue import Queue -from typing import IO, Any +from typing import IO, Any, cast import pyarrow as pa import structlog +import structlog.stdlib from pyarrow import ipc from vgi.function import ( @@ -84,9 +87,11 @@ logger_factory=structlog.PrintLoggerFactory(file=sys.stderr), ) -log = structlog.get_logger().bind(component="client") +log: structlog.stdlib.BoundLogger = structlog.get_logger().bind(component="client") -worker_log = structlog.get_logger().bind(component="worker") +worker_log: structlog.stdlib.BoundLogger = structlog.get_logger().bind( + component="worker" +) class ClientError(Exception): @@ -98,7 +103,7 @@ class WorkerConnection: """Holds state for a single worker subprocess connection.""" proc: subprocess.Popen[bytes] - stdout_buffered: Any # io.BufferedReader - typed as Any due to subprocess IO quirks + stdout_buffered: io.BufferedReader[Any] stdin_sink: pa.PythonFile worker_index: int data_writer: ipc.RecordBatchStreamWriter | None = None @@ -166,7 +171,9 @@ def _combine_batches(batches: list[pa.RecordBatch]) -> pa.RecordBatch | None: return combined[0] def _handle_log_message( - self, output_batch: pa.RecordBatch, output_metadata: dict[bytes, bytes] | None + self, + output_batch: pa.RecordBatch, + output_metadata: pa.KeyValueMetadata | None, ) -> bool: """Handle a log message from the worker if present. @@ -254,7 +261,11 @@ def _parse_bind_result(self, batch: pa.RecordBatch) -> _BindResult: max_processes_array = batch.column( batch.schema.get_field_index("max_processes") ) - max_processes = max_processes_array.cast(pa.int32()).to_pylist()[0] + max_processes_value = max_processes_array.cast(pa.int32()).to_pylist()[0] + # max_processes should always be set in the bind result + max_processes: int = ( + max_processes_value if max_processes_value is not None else 1 + ) invocation_id_array = batch.column( batch.schema.get_field_index("invocation_id") @@ -274,7 +285,7 @@ def _parse_bind_result(self, batch: pa.RecordBatch) -> _BindResult: output_schema_bytes = batch.column( batch.schema.get_field_index("output_schema") ).to_pylist()[0] - output_schema = pa.ipc.read_schema(pa.BufferReader(output_schema_bytes)) + output_schema = pa.ipc.read_schema(pa.BufferReader(output_schema_bytes)) # type: ignore[arg-type] return _BindResult( max_processes=max_processes, @@ -306,9 +317,7 @@ def _validate_features( """ if not active <= requested: unexpected = active - requested - raise ClientError( - f"Worker activated unsupported features: {unexpected}" - ) + raise ClientError(f"Worker activated unsupported features: {unexpected}") log.debug( "features_validated", @@ -632,6 +641,7 @@ def init_worker(worker: WorkerConnection, request: Invocation) -> None: # Create data writer for table-in-out functions data_writer: ipc.RecordBatchStreamWriter | None = None if input_schema is not None: + assert self._stdin_sink is not None data_writer = ipc.new_stream(self._stdin_sink, input_schema) log.debug("starting_data_batches") @@ -640,6 +650,7 @@ def init_worker(worker: WorkerConnection, request: Invocation) -> None: # because the worker waits for input before writing output. output_reader: ipc.RecordBatchStreamReader | None = None if input_schema is None: + assert self._stdout_buffered is not None output_reader = ipc.open_stream(self._stdout_buffered) log.debug("output_stream_opened") @@ -691,7 +702,7 @@ def __init__( self._max_workers = max_workers self._attach_id = attach_id self._proc: subprocess.Popen[bytes] | None = None - self._stdout_buffered: io.BufferedReader | None = None + self._stdout_buffered: io.BufferedReader[Any] | None = None self._stdin_sink: pa.PythonFile | None = None self._stderr_buffer: list[bytes] = [] self._stderr_lock = threading.Lock() @@ -779,7 +790,8 @@ def _spawn_worker(self, worker_index: int) -> WorkerConnection: self._stderr_threads.append(stderr_thread) stdout_buffered = io.BufferedReader(proc.stdout) # type: ignore[type-var] - stdin_sink = pa.PythonFile(proc.stdin) + assert proc.stdin is not None, "stdin pipe not created for worker" + stdin_sink = pa.PythonFile(cast(io.IOBase, proc.stdin)) return WorkerConnection( proc=proc, @@ -953,8 +965,8 @@ def _close_secondary_workers( def _create_primary_worker( self, *, - data_writer: "ipc.RecordBatchStreamWriter | None" = None, - output_reader: "ipc.RecordBatchStreamReader | None" = None, + data_writer: ipc.RecordBatchStreamWriter | None = None, + output_reader: ipc.RecordBatchStreamReader | None = None, ) -> WorkerConnection: """Create a WorkerConnection wrapper for the primary worker subprocess. @@ -1046,7 +1058,7 @@ def _process_batch_on_worker( output_batch, output_metadata = ( worker.output_reader.read_next_batch_with_custom_metadata() ) - status = output_metadata.get("status") if output_metadata else None + status = output_metadata.get(b"status") if output_metadata else None log.debug( "received_output_from_worker", @@ -1066,7 +1078,7 @@ def _process_batch_on_worker( break else: raise ClientError( - f"Unexpected status from worker {worker.worker_index}: {status}" + f"Unexpected status from worker {worker.worker_index}: {status!r}" ) return output_batches @@ -1110,13 +1122,13 @@ def _finalize_worker( while True: log.debug("sending_finalize_to_worker", worker_index=worker.worker_index) worker.data_writer.write_batch( - empty_batch, custom_metadata={"type": "FINALIZE"} + empty_batch, custom_metadata={b"type": b"FINALIZE"} ) output_batch, output_metadata = ( worker.output_reader.read_next_batch_with_custom_metadata() ) - status = output_metadata.get("status") if output_metadata else None + status = output_metadata.get(b"status") if output_metadata else None log.debug( "received_finalize_from_worker", worker_index=worker.worker_index, @@ -1136,7 +1148,7 @@ def _finalize_worker( else: raise ClientError( f"Unexpected finalize status from worker " - f"{worker.worker_index}: {status}" + f"{worker.worker_index}: {status!r}" ) worker.data_writer.close() @@ -1145,8 +1157,8 @@ def _finalize_worker( def _worker_thread_loop( self, worker: WorkerConnection, - input_queue: "Queue[tuple[int, pa.RecordBatch] | None]", - output_queue: "Queue[tuple[int, list[pa.RecordBatch]] | BaseException]", + input_queue: Queue[tuple[int, pa.RecordBatch] | None], + output_queue: Queue[tuple[int, list[pa.RecordBatch]] | BaseException], ) -> None: """Thread function that processes batches for a single worker. @@ -1243,8 +1255,9 @@ def start(self) -> None: ) self._stderr_thread.start() - self._stdout_buffered = io.BufferedReader(self._proc.stdout) # type: ignore[arg-type] - self._stdin_sink = pa.PythonFile(self._proc.stdin) + self._stdout_buffered = io.BufferedReader(self._proc.stdout) + assert self._proc.stdin is not None, "stdin pipe not created for worker" + self._stdin_sink = pa.PythonFile(cast(io.IOBase, self._proc.stdin)) def stop(self) -> int: """Stop all worker subprocesses and clean up resources. @@ -1307,7 +1320,7 @@ def stop(self) -> int: self._stderr_thread = None return returncode - def __enter__(self) -> "Client": + def __enter__(self) -> Client: """Enter the context manager by starting the worker subprocess. Calls start() to spawn the primary worker process and prepare for @@ -1435,6 +1448,7 @@ def table_in_out_function( # Use parallel processing for all cases (handles both single and # multi-worker) + assert data_writer is not None # set when input_schema is not None yield from self._table_in_out_function_parallel( input_batch=input_batch, input_iterator=input, diff --git a/vgi/examples/table.py b/vgi/examples/table.py index 8675ea5..9bfbd52 100644 --- a/vgi/examples/table.py +++ b/vgi/examples/table.py @@ -14,7 +14,7 @@ import random import struct -from typing import ClassVar +from typing import ClassVar, cast import pyarrow as pa @@ -292,12 +292,11 @@ class Meta: @property def output_schema(self) -> pa.Schema: """Return output schema with id and value columns.""" - return pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("value", pa.float64()), - ] - ) + fields: list[tuple[str, pa.DataType]] = [ + ("id", pa.int64()), + ("value", pa.float64()), + ] + return pa.schema(fields) def cardinality(self) -> CardinalityInfo: """Return cardinality estimate.""" @@ -565,12 +564,15 @@ class Meta: # Full schema with all 4 columns FULL_SCHEMA: pa.Schema = pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("name", pa.string()), - pa.field("value", pa.float64()), - pa.field("extra", pa.int64()), - ] + cast( + list[tuple[str, pa.DataType]], + [ + ("id", pa.int64()), + ("name", pa.string()), + ("value", pa.float64()), + ("extra", pa.int64()), + ], + ) ) BATCH_SIZE: int = 1000 diff --git a/vgi/examples/table_in_out.py b/vgi/examples/table_in_out.py index 43f90ce..b6c3580 100644 --- a/vgi/examples/table_in_out.py +++ b/vgi/examples/table_in_out.py @@ -23,6 +23,8 @@ SumAllColumnsFunction - Aggregates numeric columns into sums """ +from typing import Any + import pyarrow as pa import pyarrow.compute as pc import structlog @@ -149,10 +151,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: _ = yield None while True: - batch = yield None - if batch is None: + received = yield None + if received is None: break - self.buffered_batches.append(batch) + self.buffered_batches.append(received) def finalize(self) -> OutputGenerator: """Emit all buffered batches sequentially.""" @@ -254,9 +256,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: for _ in range(self.repeat_count): yield Output(batch, has_more=True) - batch = yield None - if batch is None: + received = yield None + if received is None: break + batch = received class SumAllColumnsFunction(TableInOutGeneratorFunction): @@ -363,7 +366,7 @@ def __init__( ) -> None: """Initialize the sum accumulator.""" super().__init__(invocation=invocation, logger=logger) - self.sums: dict[str, pa.Scalar] = {} + self.sums: dict[str, pa.Scalar[Any]] = {} @property def output_schema(self) -> pa.Schema: @@ -372,6 +375,7 @@ def output_schema(self) -> pa.Schema: raise ValueError("input_schema is required but was None") output_fields = [] for field in self.input_schema: + out_type: pa.DataType if pa.types.is_integer(field.type): out_type = pa.int64() elif pa.types.is_floating(field.type): @@ -398,9 +402,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: if col_sum.is_valid: self.sums[name] = pc.add(self.sums[name], col_sum) - batch = yield None - if batch is None: + received = yield None + if received is None: break + batch = received def finalize(self) -> OutputGenerator: """Emit single row containing the column sums.""" @@ -447,7 +452,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: """Accumulate column sums across all batches.""" _ = yield None - sums: dict[str, pa.Scalar] = {} + sums: dict[str, pa.Scalar[Any]] = {} # Initialize sums to zero for each numeric column for field in self.output_schema: sums[field.name] = pa.scalar(0, type=field.type) @@ -460,9 +465,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: if col_sum.is_valid: sums[name] = pc.add(sums[name], col_sum) - batch = yield None - if batch is None: + received = yield None + if received is None: break + batch = received except GeneratorExit: # Generator is being closed - save state with explicit schema state_batch = pa.RecordBatch.from_pydict( @@ -544,9 +550,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: if col_sum.is_valid: self.sums[name] = pc.add(self.sums[name], col_sum) - batch = yield None - if batch is None: + received = yield None + if received is None: break + batch = received def finalize(self) -> OutputGenerator: """Emit single row containing the column sums with logging.""" @@ -578,13 +585,14 @@ class Meta: def process(self, batch: pa.RecordBatch) -> OutputGenerator: """Raise an exception on the second batch.""" _ = yield None # priming + _ = batch # unused, first batch is passed as parameter batch_index = 1 # First batch is from parameter while True: if batch_index % 2 == 0: raise ValueError(f"Intentional exception on batch {batch_index}") - batch = yield None - if batch is None: + received = yield None + if received is None: break batch_index += 1 @@ -666,7 +674,7 @@ def __init__( ) -> None: """Initialize with empty sums dict.""" super().__init__(invocation=invocation, logger=logger) - self.sums: dict[str, pa.Scalar] = {} + self.sums: dict[str, pa.Scalar[Any]] = {} def cardinality(self) -> CardinalityInfo | None: """Return cardinality estimate of exactly 1 row.""" @@ -679,6 +687,7 @@ def output_schema(self) -> pa.Schema: raise ValueError("input_schema is required but was None") output_fields = [] for field in self.input_schema: + out_type: pa.DataType if pa.types.is_integer(field.type): out_type = pa.int64() elif pa.types.is_floating(field.type): diff --git a/vgi/function.py b/vgi/function.py index 3c7df00..7916e82 100644 --- a/vgi/function.py +++ b/vgi/function.py @@ -392,14 +392,13 @@ def serialize_schema(self) -> pa.Schema: Arrow schema with fields for each serialized attribute. """ - return pa.schema( - [ - pa.field("output_schema", pa.binary(), nullable=False), - pa.field("max_processes", pa.int64(), nullable=True), - pa.field("invocation_id", pa.binary(), nullable=True), - pa.field("active_features", pa.list_(pa.utf8()), nullable=False), - ] - ) + fields: list[pa.Field[Any]] = [ + pa.field("output_schema", pa.binary(), nullable=False), + pa.field("max_processes", pa.int64(), nullable=True), + pa.field("invocation_id", pa.binary(), nullable=True), + pa.field("active_features", pa.list_(pa.utf8()), nullable=False), + ] + return pa.schema(fields) def serialize_dict(self) -> dict[str, Any]: """Convert this bind result to a dictionary for Arrow serialization. diff --git a/vgi/schema_utils.py b/vgi/schema_utils.py index 3cc6a21..7e95b31 100644 --- a/vgi/schema_utils.py +++ b/vgi/schema_utils.py @@ -36,6 +36,7 @@ def output_schema(self) -> pa.Schema: """ from collections.abc import Mapping +from typing import Any import pyarrow as pa @@ -95,7 +96,7 @@ def schema( all_fields.update(kwargs) # Validate and build schema - pa_fields: list[pa.Field] = [] + pa_fields: list[pa.Field[Any]] = [] for name, dtype in all_fields.items(): if not isinstance(dtype, pa.DataType): raise TypeError( @@ -199,7 +200,7 @@ def output_schema(self) -> pa.Schema: rename_map = rename or {} replace_map = replace or {} - new_fields: list[pa.Field] = [] + new_fields: list[pa.Field[Any]] = [] final_names: set[str] = set() for field in source: diff --git a/vgi/table_function.py b/vgi/table_function.py index d399da7..848404a 100644 --- a/vgi/table_function.py +++ b/vgi/table_function.py @@ -254,7 +254,9 @@ def metadata( if self.log_message is not None: metadata_dict = self.log_message.add_to_metadata(invocation, metadata_dict) - return pa.KeyValueMetadata(metadata_dict) + return pa.KeyValueMetadata( + {k.encode(): v.encode() for k, v in metadata_dict.items()} + ) @classmethod def from_process_result(cls, process_result: "_OutputComplete") -> "ProtocolOutput": diff --git a/vgi/table_in_out_function.py b/vgi/table_in_out_function.py index f8d5225..1c45158 100644 --- a/vgi/table_in_out_function.py +++ b/vgi/table_in_out_function.py @@ -127,7 +127,7 @@ def is_finalize(self) -> bool: """Check if this input signals the FINALIZE phase.""" return ( self.metadata is not None - and self.metadata.get("type") == self._FINALIZE_SIGNAL + and self.metadata.get(b"type") == self._FINALIZE_SIGNAL ) @classmethod @@ -137,7 +137,7 @@ def create_finalize(cls, batch: pa.RecordBatch) -> "ProtocolInput": This is only sent once so there is no benefit to caching it. """ return cls( - batch=batch, metadata=pa.KeyValueMetadata({"type": cls._FINALIZE_SIGNAL}) + batch=batch, metadata=pa.KeyValueMetadata({b"type": cls._FINALIZE_SIGNAL}) ) @@ -170,12 +170,14 @@ def metadata( KeyValueMetadata containing status and optional log message fields. """ - metadata_dict = {"status": self.status.value} + metadata_dict: dict[str, str] = {"status": self.status.value} if self.log_message is not None: metadata_dict = self.log_message.add_to_metadata(invocation, metadata_dict) - return pa.KeyValueMetadata(metadata_dict) + return pa.KeyValueMetadata( + {k.encode(): v.encode() for k, v in metadata_dict.items()} + ) @classmethod def from_process_result( @@ -669,9 +671,10 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: _ = yield None # Priming yield while True: - batch = yield Output(batch) - if batch is None: + received = yield Output(batch) + if received is None: break + batch = received def finalize(self) -> OutputGenerator | None: """Finalize processing and produce any remaining output. @@ -1052,16 +1055,17 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: is_last = i == len(result) - 1 if is_last: # Last batch: receive next input - batch = yield Output(output_batch, has_more=False) + received = yield Output(output_batch, has_more=False) else: # More batches: caller re-sends same input, we ignore it _ = yield Output(output_batch, has_more=True) else: # Single batch: yield output and receive next input - batch = yield Output(result) + received = yield Output(result) - if batch is None: + if received is None: break + batch = received except GeneratorExit: # Save state for distributed processing before generator closes state = self.save_state() diff --git a/vgi/table_in_out_function_patterns.py b/vgi/table_in_out_function_patterns.py index e884a8a..41cd6db 100644 --- a/vgi/table_in_out_function_patterns.py +++ b/vgi/table_in_out_function_patterns.py @@ -39,8 +39,10 @@ """ +from __future__ import annotations + from abc import abstractmethod -from typing import final +from typing import Any, final import pyarrow as pa import pyarrow.compute as pc @@ -370,7 +372,7 @@ def predicate(self, batch: pa.RecordBatch) -> pa.Array: """ @abstractmethod - def predicate(self, batch: pa.RecordBatch) -> pa.Array: + def predicate(self, batch: pa.RecordBatch) -> pa.Array[Any]: """Return boolean array indicating which rows to keep. Args: @@ -381,7 +383,7 @@ def predicate(self, batch: pa.RecordBatch) -> pa.Array: True = keep row, False = drop row. Example: - def predicate(self, batch: pa.RecordBatch) -> pa.Array: + def predicate(self, batch: pa.RecordBatch) -> pa.Array[Any]: # Keep rows where 'status' equals 'active' return pc.equal(batch.column("status"), "active") @@ -397,7 +399,8 @@ def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch: """ mask = self.predicate(batch) - result = pc.filter(batch, mask) + # pc.filter supports RecordBatch but pyarrow-stubs don't have the overload + result = pc.filter(batch, mask) # type: ignore[call-overload] # Calculate and log filtering stats # pc.sum on boolean array counts True values @@ -411,11 +414,8 @@ def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch: f"Filtered batch: {kept} rows kept, {dropped} rows dropped", ) - # pc.filter returns a ChunkedArray for RecordBatch, need to handle this - if isinstance(result, pa.ChunkedArray): - # This shouldn't happen with RecordBatch input, but handle it - result = result.combine_chunks() - + # pc.filter returns RecordBatch for RecordBatch input + assert isinstance(result, pa.RecordBatch) return result @@ -487,7 +487,7 @@ def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: """ @abstractmethod - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: """Return dictionary mapping column names to transformed arrays. Only include columns that are being modified. Columns not in the @@ -501,7 +501,7 @@ def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: Each array must have the same length as the input batch. Example: - def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array]: + def map_columns(self, batch: pa.RecordBatch) -> dict[str, pa.Array[Any]]: return { "value": pc.multiply(batch.column("value"), 2), "name": pc.utf8_lower(batch.column("name")), diff --git a/vgi/testing.py b/vgi/testing.py index e36d061..f2c8520 100644 --- a/vgi/testing.py +++ b/vgi/testing.py @@ -76,10 +76,11 @@ import uuid from collections.abc import Callable, Generator, Iterator -from typing import Any +from typing import Any, cast import pyarrow as pa import structlog +import structlog.stdlib from vgi.function import Arguments, Invocation from vgi.log import Level, Message @@ -145,7 +146,9 @@ def __init__( """ self.function_class = function_class self.logs: list[Message] = [] - self._logger = structlog.get_logger().bind(component="test_client") + self._logger: structlog.stdlib.BoundLogger = structlog.get_logger().bind( + component="test_client" + ) def __enter__(self) -> "FunctionTestClient": """Enter context manager.""" @@ -220,11 +223,14 @@ def table_in_out_function( } ], schema=pa.schema( - [ - pa.field("output_schema", pa.binary()), - pa.field("max_processes", pa.int64()), - pa.field("invocation_id", pa.binary()), - ] + cast( + list[tuple[str, pa.DataType]], + [ + ("output_schema", pa.binary()), + ("max_processes", pa.int64()), + ("invocation_id", pa.binary()), + ], + ) ), ) bind_result_callback(bind_batch) @@ -366,7 +372,9 @@ def __init__( """ self.function_class = function_class self.logs: list[Message] = [] - self._logger = structlog.get_logger().bind(component="table_test_client") + self._logger: structlog.stdlib.BoundLogger = structlog.get_logger().bind( + component="table_test_client" + ) def __enter__(self) -> "TableFunctionTestClient": """Enter context manager.""" @@ -514,8 +522,8 @@ def run_function( """ # Build Arguments from args/kwargs - positional: tuple[pa.Scalar, ...] = () - named: dict[str, pa.Scalar] = {} + positional: tuple[pa.Scalar[Any], ...] = () + named: dict[str, pa.Scalar[Any]] = {} if args: positional = tuple(pa.scalar(a) for a in args) @@ -780,8 +788,8 @@ def run_table_function( """ # Build Arguments from args/kwargs - positional: tuple[pa.Scalar, ...] = () - named: dict[str, pa.Scalar] = {} + positional: tuple[pa.Scalar[Any], ...] = () + named: dict[str, pa.Scalar[Any]] = {} if args: positional = tuple(pa.scalar(a) for a in args) diff --git a/vgi/worker.py b/vgi/worker.py index 39e9f3a..eab68a3 100644 --- a/vgi/worker.py +++ b/vgi/worker.py @@ -70,9 +70,12 @@ class MyWorker(Worker): import sys from collections.abc import Sequence from dataclasses import dataclass +from io import IOBase +from typing import cast import pyarrow as pa import structlog +import structlog.stdlib from pyarrow import ipc from vgi.function import ( @@ -247,7 +250,9 @@ def __init__(self) -> None: wrapper_class=structlog.make_filtering_bound_logger(0), logger_factory=structlog.PrintLoggerFactory(file=sys.stderr), ) - self.log = structlog.get_logger().bind(component="worker") + self.log: structlog.stdlib.BoundLogger = structlog.get_logger().bind( + component="worker" + ) def _read_ipc_batch(self, context: str) -> pa.RecordBatch: """Read a schema + record batch pair from stdin. @@ -295,8 +300,8 @@ def _process_batches( next(generator) # Prime the run() generator with ( - ipc.new_stream(sys.stdout, instance.output_schema) as writer, - ipc.open_stream(sys.stdin) as data_reader, + ipc.new_stream(cast(IOBase, sys.stdout), instance.output_schema) as writer, + ipc.open_stream(cast(IOBase, sys.stdin)) as data_reader, ): # Validate data stream schema matches expected input schema if data_reader.schema != invocation.in_out_function_input_schema: @@ -334,7 +339,9 @@ def _process_batches( output = generator.send(ProtocolInput(batch=batch, metadata=metadata)) fn_log.debug("batch_processed", output=output) - output_rows = output.batch.num_rows if output.batch else 0 + # After initial priming, batch is always set by the protocol + assert output.batch is not None + output_rows = output.batch.num_rows total_output_rows += output_rows writer.write_batch( output.batch, custom_metadata=output.metadata(invocation) @@ -368,13 +375,15 @@ def _generate_batches( """ generator = instance.run() - with ipc.new_stream(sys.stdout, instance.output_schema) as writer: + with ipc.new_stream(cast(IOBase, sys.stdout), instance.output_schema) as writer: batch_count = 0 total_output_rows = 0 for output in generator: batch_count += 1 - output_rows = output.batch.num_rows if output.batch else 0 + # Table function generator always produces a batch + assert output.batch is not None + output_rows = output.batch.num_rows total_output_rows += output_rows writer.write_batch(