From 998ddf4835f715464259e4810ceb059cad79a6b6 Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Sun, 4 Jan 2026 21:43:28 -0500 Subject: [PATCH] refactor: rename TableInOutGeneratorFunction to TableInOutGenerator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename for naming consistency with TableFunctionGenerator. The naming pattern was inconsistent: - TableFunctionGenerator (Generator suffix) - TableInOutGeneratorFunction (GeneratorFunction suffix) Now both use the *Generator suffix pattern. Closes: vgi-python-kz4 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .beads/issues.jsonl | 2 +- CLAUDE.md | 2 +- docs/generator-api.md | 12 ++++----- tests/table_in_out/generator/__init__.py | 2 +- .../table_in_out/test_streaming_decorator.py | 12 ++++----- vgi/__init__.py | 18 ++++++------- vgi/client/client.py | 2 +- vgi/examples/table_in_out.py | 10 +++---- vgi/examples/worker.py | 4 +-- vgi/output_complete.py | 2 +- vgi/table_function.py | 10 +++---- vgi/table_in_out_function.py | 26 +++++++++---------- vgi/testing.py | 10 +++---- vgi/worker.py | 20 +++++++------- 14 files changed, 66 insertions(+), 66 deletions(-) diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 72ce516..1df6b31 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -16,7 +16,7 @@ {"id":"vgi-python-e9q","title":"Unify ProtocolOutput classes with shared base","description":"ProtocolOutput classes in table_function.py:177-224 and table_in_out_function.py:144-207 share similar metadata() method and from_process_result() classmethod. The table_in_out version adds status field. Create shared base with table_in_out extending it for status support.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.45014-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.371419-05:00"} {"id":"vgi-python-ivf","title":"Add required_settings to function Meta class","description":"Update function metadata to support declaring required DuckDB settings.\n\nChanges needed:\n- Add 'required_settings: list[str]' to FunctionMeta in vgi/metadata.py\n- Update Meta class resolution in vgi/function.py\n- Add validation that required_settings is a list of strings\n- Make it available via get_metadata() for introspection\n\nExample usage:\nclass MyFunction(TableInOutFunction):\n class Meta:\n required_settings = ['timezone', 'threads']","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:47.903747-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.169516-05:00","closed_at":"2026-01-04T13:20:41.169516-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-ivf","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.690253-05:00","created_by":"rusty"}]} {"id":"vgi-python-j4t","title":"Update client to pass DuckDB settings in Invocation","description":"Update vgi/client/client.py to support passing DuckDB settings.\n\nChanges needed:\n- Add 'duckdb_settings: dict[str, str] | None = None' parameter to relevant methods\n- Include settings in Invocation creation\n- Add helper to query function's required_settings from metadata\n\nThe client needs to know what settings to pass. Options:\n1. Client queries worker for function metadata first\n2. Settings passed explicitly by caller\n3. Client introspects function class if available locally","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.358656-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.173178-05:00","closed_at":"2026-01-04T13:20:41.173178-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-j4t","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.761572-05:00","created_by":"rusty"}]} -{"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.506499-05:00"} +{"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"in_progress","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T21:39:29.83686-05:00"} {"id":"vgi-python-odi","title":"Change max_processes from method to property in Function hierarchy","description":"Refactor max_processes from a method to a property across the Function class hierarchy (Function, ScalarFunction, TableFunctionGenerator, TableInOutFunction, etc.). This makes the API more consistent since max_processes is effectively a constant per function class and properties are more idiomatic for such values.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T11:25:29.750648-05:00","created_by":"rusty","updated_at":"2026-01-04T11:50:57.566545-05:00","closed_at":"2026-01-04T11:50:57.566545-05:00","close_reason":"Closed"} {"id":"vgi-python-p91","title":"Move exception classes from function.py to own file","description":"Move InitIdentifierError and SchemaValidationError from vgi/function.py to a new vgi/exceptions.py file. Update imports in function.py and any other files that reference these exceptions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:12:28.058227-05:00","created_by":"rusty","updated_at":"2026-01-04T09:17:52.477661-05:00","closed_at":"2026-01-04T09:17:52.477661-05:00","close_reason":"Closed"} {"id":"vgi-python-r3t","title":"Consolidate test client infrastructure in testing.py","description":"testing.py has three test client classes (FunctionTestClient, TableFunctionTestClient, ScalarFunctionTestClient) with shared infrastructure patterns. Extend _BaseTestClient pattern to reduce code duplication. Consider using a single unified client with method dispatch based on function type.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.913912-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:38.132591-05:00"} diff --git a/CLAUDE.md b/CLAUDE.md index 66479f4..36b5d9b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,7 +84,7 @@ vgi/ function.py # Invocation, OutputSpec, Arguments, FunctionType scalar_function.py # ScalarFunction, ScalarFunctionGenerator table_function.py # TableFunctionGenerator, TableCardinality, Output - table_in_out_function.py # TableInOutFunction, TableInOutGeneratorFunction + table_in_out_function.py # TableInOutFunction, TableInOutGenerator metadata.py # Function metadata for introspection schema_utils.py # Schema builder helpers (schema, schema_like) worker.py # Worker base class diff --git a/docs/generator-api.md b/docs/generator-api.md index c329acc..3b1d065 100644 --- a/docs/generator-api.md +++ b/docs/generator-api.md @@ -170,7 +170,7 @@ def process(self): ## Table-In-Out Generator Function (Advanced) -For advanced streaming control with input data, use `TableInOutGeneratorFunction`. Most users should prefer `TableInOutFunction` instead. +For advanced streaming control with input data, use `TableInOutGenerator`. Most users should prefer `TableInOutFunction` instead. ### When to Use Generator API @@ -182,9 +182,9 @@ For advanced streaming control with input data, use `TableInOutGeneratorFunction ```python import pyarrow as pa -from vgi import TableInOutGeneratorFunction, Output, OutputGenerator, Arg +from vgi import TableInOutGenerator, Output, OutputGenerator, Arg -class MyFunction(TableInOutGeneratorFunction): +class MyFunction(TableInOutGenerator): """One-line description.""" @property @@ -208,13 +208,13 @@ class MyFunction(TableInOutGeneratorFunction): **Passthrough (Echo):** ```python -class EchoFunction(TableInOutGeneratorFunction): +class EchoFunction(TableInOutGenerator): pass # Default process() passes input unchanged ``` **Aggregation (emit on finalize):** ```python -class SumFunction(TableInOutGeneratorFunction): +class SumFunction(TableInOutGenerator): @property def output_schema(self): return pa.schema([pa.field("sum", pa.int64())]) @@ -270,7 +270,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: ## Common Mistakes -### 1. Forgetting the priming yield (TableInOutGeneratorFunction only) +### 1. Forgetting the priming yield (TableInOutGenerator only) ```python # ❌ WRONG - will raise TypeError on first send() diff --git a/tests/table_in_out/generator/__init__.py b/tests/table_in_out/generator/__init__.py index bf0db85..48f134e 100644 --- a/tests/table_in_out/generator/__init__.py +++ b/tests/table_in_out/generator/__init__.py @@ -1 +1 @@ -"""Tests for TableInOutGeneratorFunction implementations.""" +"""Tests for TableInOutGenerator implementations.""" diff --git a/tests/table_in_out/test_streaming_decorator.py b/tests/table_in_out/test_streaming_decorator.py index 0d15a04..9a600a2 100644 --- a/tests/table_in_out/test_streaming_decorator.py +++ b/tests/table_in_out/test_streaming_decorator.py @@ -8,14 +8,14 @@ Output, OutputGenerator, StreamingGenerator, - TableInOutGeneratorFunction, + TableInOutGenerator, streaming, ) from vgi.log import Level, Message from vgi.testing import FunctionTestClient, batch -class EchoStreamingFunction(TableInOutGeneratorFunction): +class EchoStreamingFunction(TableInOutGenerator): """Simple echo function using the @streaming decorator.""" @streaming @@ -26,7 +26,7 @@ def process(self, b: pa.RecordBatch) -> StreamingGenerator: current = yield Output(current) -class CountingStreamingFunction(TableInOutGeneratorFunction): +class CountingStreamingFunction(TableInOutGenerator): """Function that counts batches using @streaming.""" def __init__( @@ -45,7 +45,7 @@ def process(self, b: pa.RecordBatch) -> StreamingGenerator: current = yield Output(current) -class AccumulatingStreamingFunction(TableInOutGeneratorFunction): +class AccumulatingStreamingFunction(TableInOutGenerator): """Function that accumulates and outputs empty batches during process.""" def __init__( @@ -80,7 +80,7 @@ def finalize(self) -> OutputGenerator: ) -class LoggingStreamingFunction(TableInOutGeneratorFunction): +class LoggingStreamingFunction(TableInOutGenerator): """Function that logs using the @streaming decorator.""" @streaming @@ -171,7 +171,7 @@ class TestStreamingDecoratorComparedToManual: def test_equivalent_output(self) -> None: """@streaming decorated function should produce same output as manual.""" - class ManualEcho(TableInOutGeneratorFunction): + class ManualEcho(TableInOutGenerator): def process(self, b: pa.RecordBatch) -> OutputGenerator: """Manual process without decorator.""" _ = yield None diff --git a/vgi/__init__.py b/vgi/__init__.py index c438ba5..9d9b1dd 100644 --- a/vgi/__init__.py +++ b/vgi/__init__.py @@ -16,13 +16,13 @@ def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch: # Transform each batch here return batch -For advanced streaming control, use TableInOutGeneratorFunction with the +For advanced streaming control, use TableInOutGenerator with the @streaming decorator: - from vgi import TableInOutGeneratorFunction, Output, StreamingGenerator, streaming + from vgi import TableInOutGenerator, Output, StreamingGenerator, streaming import pyarrow as pa - class MyFunction(TableInOutGeneratorFunction): + class MyFunction(TableInOutGenerator): @streaming def process(self, batch: pa.RecordBatch) -> StreamingGenerator: # No priming yield needed! @@ -31,10 +31,10 @@ def process(self, batch: pa.RecordBatch) -> StreamingGenerator: Or without the decorator (more verbose): - from vgi import TableInOutGeneratorFunction, Output, OutputGenerator + from vgi import TableInOutGenerator, Output, OutputGenerator import pyarrow as pa - class MyFunction(TableInOutGeneratorFunction): + class MyFunction(TableInOutGenerator): def process(self, batch: pa.RecordBatch) -> OutputGenerator: _ = yield None # Required priming yield while True: @@ -58,7 +58,7 @@ class MyWorker(Worker): Classes and functions exported from this module: TableInOutFunction - Callback-based API (recommended) - TableInOutGeneratorFunction - Generator-based API (advanced) + TableInOutGenerator - Generator-based API (advanced) ScalarFunction - Scalar function with compute() (single-column output) ScalarFunctionGenerator - Scalar function with generator protocol Output - Output batch from process()/finalize() @@ -114,7 +114,7 @@ class Meta: vgi.function.Function - Base (max_processes, invocation_id) ├─ vgi.table_function.TableFunctionBase - Adds cardinality hints, projection │ ├─ TableFunctionGenerator - Generate output without input - │ └─ TableInOutGeneratorFunction - Full streaming (process/finalize) + │ └─ TableInOutGenerator - Full streaming (process/finalize) │ └─ TableInOutFunction - Callback API (transform/finish) │ ├─ AggregationFunction - Reduce to summary │ ├─ FilterFunction - Row filtering @@ -159,7 +159,7 @@ class Meta: OutputGenerator, StreamingGenerator, TableInOutFunction, - TableInOutGeneratorFunction, + TableInOutGenerator, streaming, ) from vgi.table_in_out_function_patterns import ( @@ -195,7 +195,7 @@ class Meta: "ScalarOutputGenerator", "StreamingGenerator", "TableInOutFunction", - "TableInOutGeneratorFunction", + "TableInOutGenerator", "TableInput", "TableInputValidationError", "Worker", diff --git a/vgi/client/client.py b/vgi/client/client.py index b249c8d..ba39a42 100644 --- a/vgi/client/client.py +++ b/vgi/client/client.py @@ -38,7 +38,7 @@ ------- client.start() : Start the worker subprocess client.stop() : Stop the worker subprocess -client.table_in_out_function() : Invoke a TableInOutGeneratorFunction and stream results +client.table_in_out_function() : Invoke a TableInOutGenerator and stream results client.table_function() : Invoke a TableFunctionGenerator and stream results client.scalar_function() : Invoke a ScalarFunction and stream results client.get_worker_stderr() : Get captured stderr from worker diff --git a/vgi/examples/table_in_out.py b/vgi/examples/table_in_out.py index 2f8624b..f32459b 100644 --- a/vgi/examples/table_in_out.py +++ b/vgi/examples/table_in_out.py @@ -39,7 +39,7 @@ Output, OutputGenerator, TableInOutFunction, - TableInOutGeneratorFunction, + TableInOutGenerator, ) __all__ = [ @@ -55,7 +55,7 @@ ] -class EchoFunction(TableInOutGeneratorFunction): +class EchoFunction(TableInOutGenerator): """Passthrough function that emits each input batch unchanged. USE CASE @@ -90,7 +90,7 @@ class Meta: data: TableInput = Arg[TableInput](0, doc="Input table") # type: ignore[assignment] -class BufferInputFunction(TableInOutGeneratorFunction): +class BufferInputFunction(TableInOutGenerator): """Buffering function that collects all input and emits during finalization. USE CASE @@ -166,7 +166,7 @@ def finalize(self) -> OutputGenerator: yield Output(b, has_more) -class RepeatInputsFunction(TableInOutGeneratorFunction): +class RepeatInputsFunction(TableInOutGenerator): """Explosion function that duplicates each input batch N times. USE CASE @@ -262,7 +262,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: batch = received -class SumAllColumnsFunction(TableInOutGeneratorFunction): +class SumAllColumnsFunction(TableInOutGenerator): """Aggregation function that computes column-wise sums across all batches. USE CASE diff --git a/vgi/examples/worker.py b/vgi/examples/worker.py index f0c49be..e213da1 100644 --- a/vgi/examples/worker.py +++ b/vgi/examples/worker.py @@ -5,7 +5,7 @@ each class's metadata (Meta.name or snake_case of class name). The worker supports: -- TableInOutGeneratorFunction: Transforms input batches to output batches +- TableInOutGenerator: Transforms input batches to output batches - TableFunctionGenerator: Generates output batches without input - ScalarFunctionGenerator: Transforms input to single-column output (1:1 rows) @@ -47,7 +47,7 @@ class ExampleWorker(Worker): """Example worker with built-in test functions.""" functions = [ - # TableInOutGeneratorFunction - transform input batches + # TableInOutGenerator - transform input batches EchoFunction, BufferInputFunction, RepeatInputsFunction, diff --git a/vgi/output_complete.py b/vgi/output_complete.py index 7395029..98561fc 100644 --- a/vgi/output_complete.py +++ b/vgi/output_complete.py @@ -32,7 +32,7 @@ class OutputComplete: Attributes: batch: Always a valid RecordBatch (never None). has_more: If True, generator expects another send() call. - Only used by TableInOutGeneratorFunction. + Only used by TableInOutGenerator. log_message: Present when user yielded Message directly. """ diff --git a/vgi/table_function.py b/vgi/table_function.py index c742726..4dff9a3 100644 --- a/vgi/table_function.py +++ b/vgi/table_function.py @@ -14,11 +14,11 @@ Function (vgi.function) └── TableFunctionBase └── TableFunctionGenerator (simple generator, no input via send) - └── TableInOutGeneratorFunction (full protocol with input batches) + └── TableInOutGenerator (full protocol with input batches) TableFunctionGenerator is useful for functions that don't need to receive input batches via yield - they just produce output batches in a loop until done. -For functions that transform input batches, use TableInOutGeneratorFunction. +For functions that transform input batches, use TableInOutGenerator. """ from collections.abc import Generator @@ -230,7 +230,7 @@ class TableFunctionBase(vgi.function.Function[TableFunctionInitInput]): This class is not meant to be used directly. Subclass either: - TableFunctionGenerator: For simple generators that produce output - - TableInOutGeneratorFunction: For functions that transform input batches + - TableInOutGenerator: For functions that transform input batches Attributes: init_input: TableFunctionInitInput with projection info (set after init) @@ -238,7 +238,7 @@ class TableFunctionBase(vgi.function.Function[TableFunctionInitInput]): See Also: TableFunctionGenerator: Simple generator base class - TableInOutGeneratorFunction: Full streaming with input batches + TableInOutGenerator: Full streaming with input batches """ @@ -305,7 +305,7 @@ class TableFunctionGenerator(TableFunctionBase): - Produce a fixed sequence of output batches - Don't need the full DATA/FINALIZE protocol - For functions that transform input batches, use TableInOutGeneratorFunction. + For functions that transform input batches, use TableInOutGenerator. LIFECYCLE --------- diff --git a/vgi/table_in_out_function.py b/vgi/table_in_out_function.py index a3d8378..e522b46 100644 --- a/vgi/table_in_out_function.py +++ b/vgi/table_in_out_function.py @@ -10,7 +10,7 @@ 3. FINALIZE: The finalize() generator is called to flush buffered data Key Components: - TableInOutGeneratorFunction: Base class to subclass for custom functions. + TableInOutGenerator: Base class to subclass for custom functions. Output: Return type for process()/finalize() with batch and has_more flag. OutputGenerator: Type alias for the process()/finalize() return type. ProtocolInput/ProtocolOutput: Protocol messages for the run() generator. @@ -19,7 +19,7 @@ The process() method uses a generator pattern. Always use this explicit loop structure for clarity: - class MyFunction(TableInOutGeneratorFunction): + class MyFunction(TableInOutGenerator): def process(self, batch: pa.RecordBatch) -> OutputGenerator: # 1. REQUIRED: Priming yield (framework advances past this) _ = yield None @@ -62,7 +62,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: if batch is None: break -See TableInOutGeneratorFunction docstring for comprehensive documentation and examples. +See TableInOutGenerator docstring for comprehensive documentation and examples. """ from collections.abc import Callable, Generator @@ -87,7 +87,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: "OutputGenerator", "StreamingGenerator", "streaming", - "TableInOutGeneratorFunction", + "TableInOutGenerator", "TableInOutFunction", ] @@ -275,7 +275,7 @@ def streaming[T]( Using the @streaming decorator (recommended for new code): ```python - class MyFunction(TableInOutGeneratorFunction): + class MyFunction(TableInOutGenerator): @streaming def process(self, batch: pa.RecordBatch) -> StreamingGenerator: # No priming yield needed! @@ -286,7 +286,7 @@ def process(self, batch: pa.RecordBatch) -> StreamingGenerator: Equivalent without decorator (more verbose): ```python - class MyFunction(TableInOutGeneratorFunction): + class MyFunction(TableInOutGenerator): def process(self, batch: pa.RecordBatch) -> OutputGenerator: _ = yield None # Required priming yield @@ -300,7 +300,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: With logging: ```python - class LoggingFunction(TableInOutGeneratorFunction): + class LoggingFunction(TableInOutGenerator): @streaming def process(self, batch: pa.RecordBatch) -> StreamingGenerator: while batch is not None: @@ -311,7 +311,7 @@ def process(self, batch: pa.RecordBatch) -> StreamingGenerator: Aggregation pattern: ```python - class SumFunction(TableInOutGeneratorFunction): + class SumFunction(TableInOutGenerator): @streaming def process(self, batch: pa.RecordBatch) -> StreamingGenerator: while batch is not None: @@ -354,7 +354,7 @@ def wrapper(self: T, first_batch: pa.RecordBatch) -> OutputGenerator: return wrapper -class TableInOutGeneratorFunction(vgi.table_function.TableFunctionBase): +class TableInOutGenerator(vgi.table_function.TableFunctionBase): """Base class for streaming table functions that transform Arrow RecordBatches. This class handles functions that receive arguments and a streaming table input, @@ -453,7 +453,7 @@ class TableInOutGeneratorFunction(vgi.table_function.TableFunctionBase): ------------------- Functions can use setup/teardown for resource cleanup: - class MyDbFunction(TableInOutGeneratorFunction): + class MyDbFunction(TableInOutGenerator): def setup(self) -> None: self.conn = sqlite3.connect("my.db") @@ -472,7 +472,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: CALLER PROTOCOL --------------- - To use a TableInOutGeneratorFunction, the caller must: + To use a TableInOutGenerator, the caller must: 1. Create the bind result: invocation = vgi.invocation.Invocation( @@ -511,7 +511,7 @@ def __init__( if invocation.input_schema is None: raise ValueError( f"{type(self).__name__} requires an input schema, but none was " - f"provided. TableInOutGeneratorFunction processes input batches and " + f"provided. TableInOutGenerator processes input batches and " f"requires input_schema to be set in the Invocation. " f"If your function generates output without input, inherit from " f"TableFunctionGenerator instead." @@ -727,7 +727,7 @@ def run(self) -> Generator[ProtocolOutput, ProtocolInput | None, None]: self.teardown() -class TableInOutFunction(TableInOutGeneratorFunction): +class TableInOutFunction(TableInOutGenerator): """Simplified base class using callbacks instead of generators. This class provides a simpler API for common use cases where you don't need diff --git a/vgi/testing.py b/vgi/testing.py index 547f411..a0ca91f 100644 --- a/vgi/testing.py +++ b/vgi/testing.py @@ -103,7 +103,7 @@ ProtocolInput, ProtocolOutput, TableInOutFunction, - TableInOutGeneratorFunction, + TableInOutGenerator, _OutputStatus, ) @@ -328,7 +328,7 @@ class FunctionTestClient(_BaseTestClient): def __init__( self, - function_class: type[TableInOutGeneratorFunction] | type[TableInOutFunction], + function_class: type[TableInOutGenerator] | type[TableInOutFunction], ) -> None: """Initialize the TestClient. @@ -634,7 +634,7 @@ def batch(__schema: pa.Schema | None = None, **columns: list[Any]) -> pa.RecordB def run_function( - function: type[TableInOutGeneratorFunction] | type[TableInOutFunction], + function: type[TableInOutGenerator] | type[TableInOutFunction], input_batches: list[pa.RecordBatch], args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, @@ -677,7 +677,7 @@ def run_function( def assert_function_output( - function: type[TableInOutGeneratorFunction] | type[TableInOutFunction], + function: type[TableInOutGenerator] | type[TableInOutFunction], input: list[pa.RecordBatch], expected: list[pa.RecordBatch], args: tuple[Any, ...] | None = None, @@ -754,7 +754,7 @@ def assert_function_output( def assert_function_logs( - function: type[TableInOutGeneratorFunction] | type[TableInOutFunction], + function: type[TableInOutGenerator] | type[TableInOutFunction], input: list[pa.RecordBatch], expected_logs: list[dict[str, Any]], args: tuple[Any, ...] | None = None, diff --git a/vgi/worker.py b/vgi/worker.py index d98baee..9c4883c 100644 --- a/vgi/worker.py +++ b/vgi/worker.py @@ -10,7 +10,7 @@ 1. ScalarFunctionGenerator: Transforms input batches to single-column output with 1:1 row mapping. Use for per-row computations like add(), upper(), etc. -2. TableInOutGeneratorFunction: Reads input batches, produces output batches. +2. TableInOutGenerator: Reads input batches, produces output batches. Use for transforming, filtering, or aggregating input data. 3. TableFunctionGenerator: Generates output batches without reading input. @@ -22,14 +22,14 @@ from vgi.worker import Worker from vgi.scalar_function import ScalarFunction - from vgi.table_in_out_function import TableInOutGeneratorFunction + from vgi.table_in_out_function import TableInOutGenerator from vgi.table_function import TableFunctionGenerator class DoubleColumn(ScalarFunction): # Single-column output with 1:1 row mapping ... - class EchoFunction(TableInOutGeneratorFunction): + class EchoFunction(TableInOutGenerator): # Transforms input batches ... @@ -54,7 +54,7 @@ class MyWorker(Worker): 4. Stream: read input batches -> compute -> write single-column output batches (ends when input exhausted, no FINALIZE phase) -PROTOCOL FLOW (TableInOutGeneratorFunction) +PROTOCOL FLOW (TableInOutGenerator) ------------------------------------------- 1. Read Invocation: function name, arguments, input schema 2. Write OutputSpec: output schema, max_processes, invocation_id @@ -106,7 +106,7 @@ class MyWorker(Worker): from vgi.table_function import TableFunctionGenerator from vgi.table_in_out_function import ( ProtocolInput, - TableInOutGeneratorFunction, + TableInOutGenerator, ) @@ -467,7 +467,7 @@ def _process_scalar_batches( def _process_batches( self, - instance: TableInOutGeneratorFunction, + instance: TableInOutGenerator, invocation: Invocation, fn_log: structlog.stdlib.BoundLogger, ) -> WorkerStats: @@ -667,13 +667,13 @@ def run(self) -> None: # Dispatch to appropriate processing method based on function type. # ScalarFunctionGenerator processes input batches to single-column output. - # TableInOutGeneratorFunction reads input batches and produces output. + # TableInOutGenerator reads input batches and produces output. # TableFunctionGenerator generates output without input batches. # Note: Check ScalarFunctionGenerator first since it doesn't inherit from - # TableInOutGeneratorFunction, then TableInOutGeneratorFunction. + # TableInOutGenerator, then TableInOutGenerator. if isinstance(instance, ScalarFunctionGenerator): stats = self._process_scalar_batches(instance, invocation, fn_log) - elif isinstance(instance, TableInOutGeneratorFunction): + elif isinstance(instance, TableInOutGenerator): stats = self._process_batches(instance, invocation, fn_log) elif isinstance(instance, TableFunctionGenerator): stats = self._generate_batches(instance, invocation, fn_log) @@ -681,7 +681,7 @@ def run(self) -> None: raise TypeError( f"Unsupported function type: {type(instance).__name__}. " f"Functions must inherit from ScalarFunctionGenerator (for " - f"scalar functions), TableInOutGeneratorFunction (for functions " + f"scalar functions), TableInOutGenerator (for functions " f"that process input batches), or TableFunctionGenerator (for " f"functions that generate output without input). " f"See vgi.scalar_function, vgi.table_in_out_function, and "