From 519ba40a8cc536108737f84227969790b543c95e Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Sun, 4 Jan 2026 21:15:59 -0500 Subject: [PATCH] refactor: consolidate _OutputComplete classes into shared module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the nearly identical _OutputComplete class from scalar_function.py, table_function.py, and table_in_out_function.py into a shared output_complete.py module. This reduces code duplication by consolidating three classes into one: - _ScalarOutputComplete (scalar_function.py) - _OutputComplete (table_function.py) - _OutputComplete (table_in_out_function.py) The unified OutputComplete class handles all use cases with a single from_process_result() method that accepts Output, Message, or None. Also fixes a pre-existing mypy error in vgi/examples/table.py by adding the missing type parameter to pa.Field. Closes: vgi-python-6o0 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .beads/issues.jsonl | 2 +- vgi/examples/table.py | 2 +- vgi/output_complete.py | 72 ++++++++++++++++++++++++++++++++++++ vgi/scalar_function.py | 46 ++++------------------- vgi/table_function.py | 54 ++++----------------------- vgi/table_in_out_function.py | 59 ++++------------------------- 6 files changed, 96 insertions(+), 139 deletions(-) create mode 100644 vgi/output_complete.py diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 39317c8..3793cab 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -4,7 +4,7 @@ {"id":"vgi-python-3fq","title":"Abstract common worker batch processing logic","description":"Worker batch processing methods _process_scalar_batches (377-466), _process_batches (468-550), and _generate_batches (552-593) share significant structure: IPC writer/reader setup, batch counting/logging, main processing loop. Extract common logic to reduce duplication - consider a BatchProcessor helper class or template method pattern.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:53.350497-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:37.598552-05:00"} {"id":"vgi-python-5er","title":"Extract _should_terminate into shared base class","description":"Identical _should_terminate method is copy-pasted in all three function modules. Implementation is always: check if log_message exists and level is EXCEPTION. Move to shared base class (Function or new ProcessingMixin) to eliminate duplication.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.190482-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.071737-05:00","dependencies":[{"issue_id":"vgi-python-5er","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.283865-05:00","created_by":"rusty"}]} {"id":"vgi-python-67w","title":"Create example function using DuckDB settings","description":"Create an example function that demonstrates using DuckDB settings to determine its output.\n\nRequirements:\n- Function declares required_settings in Meta\n- Output schema depends on a setting value (e.g., include extra column based on setting)\n- Clear documentation showing the pattern\n\nExample ideas:\n1. TimezoneAwareFunction: Output includes timezone info based on 'timezone' setting\n2. VerboseOutput: Adds debug columns when 'debug_mode' setting is true\n3. NumericPrecision: Uses 'numeric_precision' to determine output type precision\n\nAdd to vgi/examples/ and register in ExampleWorker.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.503681-05:00","created_by":"rusty","updated_at":"2026-01-04T13:22:23.779895-05:00","closed_at":"2026-01-04T13:22:23.779895-05:00","close_reason":"Added SettingsAwareFunction example","dependencies":[{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-c2b","type":"blocks","created_at":"2026-01-04T13:06:13.865474-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-ivf","type":"blocks","created_at":"2026-01-04T13:06:13.890269-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-bqb","type":"blocks","created_at":"2026-01-04T13:06:13.912531-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-a99","type":"blocks","created_at":"2026-01-04T13:06:13.936552-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-j4t","type":"blocks","created_at":"2026-01-04T13:06:13.958494-05:00","created_by":"rusty"}]} -{"id":"vgi-python-6o0","title":"Consolidate _OutputComplete classes into shared module","description":"Three nearly identical _OutputComplete classes exist in scalar_function.py:168-197 (_ScalarOutputComplete), table_function.py:136-175 (_OutputComplete), and table_in_out_function.py:356-400 (_OutputComplete). All are frozen dataclasses with batch field, log_message field, and from_process_result() classmethod. Extract to shared module (e.g., vgi/protocol_types.py) with a single parameterized class.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:40.893139-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:15.806567-05:00"} +{"id":"vgi-python-6o0","title":"Consolidate _OutputComplete classes into shared module","description":"Three nearly identical _OutputComplete classes exist in scalar_function.py:168-197 (_ScalarOutputComplete), table_function.py:136-175 (_OutputComplete), and table_in_out_function.py:356-400 (_OutputComplete). All are frozen dataclasses with batch field, log_message field, and from_process_result() classmethod. Extract to shared module (e.g., vgi/protocol_types.py) with a single parameterized class.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:40.893139-05:00","created_by":"rusty","updated_at":"2026-01-04T21:08:41.602491-05:00"} {"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.240397-05:00"} {"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]} {"id":"vgi-python-aad","title":"Design: DuckDB settings/pragmas access for VGI functions","description":"Design how VGI functions can declare required DuckDB settings/pragmas in their Meta class, and how these settings values should be passed during the bind phase.\n\nKey design decisions:\n1. How to declare required settings in function Meta (e.g., required_settings = ['timezone', 'threads'])\n2. How to add settings to Invocation dataclass\n3. How settings values should be accessed in function code\n4. Serialization format for settings in Arrow IPC\n\nRecommendation: Add 'duckdb_settings: dict[str, str] | None' to Invocation and 'required_settings: list[str]' to Meta class.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T13:05:47.619105-05:00","created_by":"rusty","updated_at":"2026-01-04T13:11:13.197139-05:00","closed_at":"2026-01-04T13:11:13.197139-05:00","close_reason":"Design document created at docs/design-duckdb-settings.md"} diff --git a/vgi/examples/table.py b/vgi/examples/table.py index dfdfee3..a790881 100644 --- a/vgi/examples/table.py +++ b/vgi/examples/table.py @@ -694,7 +694,7 @@ def output_schema(self) -> pa.Schema: When vgi_verbose_mode is "true", includes an extra "details" column. This demonstrates how settings can affect the bind result. """ - fields: list[pa.Field] = [ + fields: list[pa.Field[pa.DataType]] = [ pa.field("id", pa.int64()), pa.field("value", pa.float64()), ] diff --git a/vgi/output_complete.py b/vgi/output_complete.py new file mode 100644 index 0000000..7395029 --- /dev/null +++ b/vgi/output_complete.py @@ -0,0 +1,72 @@ +"""Internal output normalization for VGI function generators. + +This module provides the _OutputComplete class used by all function types +to normalize generator yields into a consistent format with guaranteed +non-None batches. + +This is an internal module - users should not import from here directly. +""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyarrow as pa + +import vgi.log + +if TYPE_CHECKING: + from vgi.table_function import Output as TableOutput + from vgi.table_in_out_function import Output as TableInOutOutput + +__all__ = ["OutputComplete"] + + +@dataclass(frozen=True, slots=True) +class OutputComplete: + """Internal: Output with guaranteed non-None batch. + + Used by the framework to normalize generator yields. When the user yields + None, Output with None batch, or Message, this class ensures we always + have a valid RecordBatch for the protocol. + + Attributes: + batch: Always a valid RecordBatch (never None). + has_more: If True, generator expects another send() call. + Only used by TableInOutGeneratorFunction. + log_message: Present when user yielded Message directly. + + """ + + batch: pa.RecordBatch + has_more: bool = False + log_message: vgi.log.Message | None = None + + @classmethod + def from_process_result( + cls, + source: "vgi.log.Message | TableOutput | TableInOutOutput | None", + empty_batch: pa.RecordBatch, + ) -> "OutputComplete": + """Create from user's yield value. + + Args: + source: What the user yielded (Output, Message, or None). + empty_batch: Empty batch to substitute when needed. + + Returns: + Normalized output with guaranteed non-None batch. + + """ + if source is None: + return cls(batch=empty_batch) + if isinstance(source, vgi.log.Message): + # When yielding a log message, has_more=True so the caller + # re-sends the current input after the message is processed + return cls(batch=empty_batch, has_more=True, log_message=source) + # source is Output (either TableOutput or TableInOutOutput) + # TableOutput doesn't have has_more, TableInOutOutput does + has_more = getattr(source, "has_more", False) + return cls( + batch=source.batch if source.batch is not None else empty_batch, + has_more=has_more, + ) diff --git a/vgi/scalar_function.py b/vgi/scalar_function.py index cfac8ca..c101d47 100644 --- a/vgi/scalar_function.py +++ b/vgi/scalar_function.py @@ -50,6 +50,7 @@ def compute(self, batch: pa.RecordBatch) -> pa.Array: import vgi.function import vgi.log from vgi.exceptions import SchemaValidationError +from vgi.output_complete import OutputComplete from vgi.table_function import Output, ProtocolOutput __all__ = [ @@ -165,37 +166,6 @@ class ProtocolInput: metadata: pa.KeyValueMetadata | None = None -@dataclass(frozen=True, slots=True) -class _ScalarOutputComplete: - """Internal: Output with guaranteed non-None batch for scalar functions.""" - - batch: pa.RecordBatch - log_message: vgi.log.Message | None = None - - @classmethod - def from_process_result( - cls, - source: vgi.log.Message | Output, - empty_batch: pa.RecordBatch, - ) -> _ScalarOutputComplete: - """Create from user's yield value. - - Args: - source: What the user yielded (Output or Message). - empty_batch: Empty batch to substitute when yielding Message. - - Returns: - Normalized output with guaranteed non-None batch. - - """ - if isinstance(source, vgi.log.Message): - return cls(batch=empty_batch, log_message=source) - # source is Output - return cls( - batch=source.batch if source.batch is not None else empty_batch, - ) - - class ScalarFunctionGenerator(vgi.function.Function[vgi.function.FunctionInitInput]): """Generator-based base class for scalar functions. @@ -299,7 +269,7 @@ def _process_and_validate( self, generator: ScalarOutputGenerator, input_batch: pa.RecordBatch, - ) -> _ScalarOutputComplete: + ) -> OutputComplete: """Process a batch and validate schemas and row count. Args: @@ -307,7 +277,7 @@ def _process_and_validate( input_batch: The input RecordBatch to process. Returns: - _ScalarOutputComplete with validated output batch. + OutputComplete with validated output batch. Raises: SchemaValidationError: If input or output batch schema doesn't match. @@ -315,7 +285,7 @@ def _process_and_validate( """ self._validate_input_schema(input_batch) - result: _ScalarOutputComplete = _ScalarOutputComplete.from_process_result( + result: OutputComplete = OutputComplete.from_process_result( generator.send(input_batch), self.empty_output_batch, ) @@ -330,22 +300,22 @@ def _process_with_exception_handling( self, generator: ScalarOutputGenerator, input_batch: pa.RecordBatch, - ) -> _ScalarOutputComplete: + ) -> OutputComplete: """Process a batch with exception handling. Wraps _process_and_validate to catch exceptions and convert them - to _ScalarOutputComplete with an error log message. + to OutputComplete with an error log message. """ try: return self._process_and_validate(generator, input_batch) except Exception as e: - return _ScalarOutputComplete( + return OutputComplete( batch=self.empty_output_batch, log_message=vgi.log.Message.from_exception(e), ) @final - def _should_terminate(self, result: _ScalarOutputComplete) -> bool: + def _should_terminate(self, result: OutputComplete) -> bool: """Check if processing should terminate due to an exception.""" return ( result.log_message is not None diff --git a/vgi/table_function.py b/vgi/table_function.py index 1291cc7..5d778fa 100644 --- a/vgi/table_function.py +++ b/vgi/table_function.py @@ -31,6 +31,7 @@ import vgi.function import vgi.ipc_utils import vgi.log +from vgi.output_complete import OutputComplete __all__ = [ "TableCardinality", @@ -133,47 +134,6 @@ class Output: OutputGenerator = Generator[vgi.log.Message | Output, None, None] -@dataclass(frozen=True, slots=True) -class _OutputComplete: - """Internal: Output with guaranteed non-None batch. - - Used by the framework to normalize generator yields. When the user yields - None, Output with None batch, or Message, this class ensures we always - have a valid RecordBatch for the protocol. - - Attributes: - batch: Always a valid RecordBatch (never None). - log_message: Present when user yielded Message directly. - - """ - - batch: pa.RecordBatch - log_message: vgi.log.Message | None = None - - @classmethod - def from_process_result( - cls, - source: vgi.log.Message | Output, - empty_batch: pa.RecordBatch, - ) -> "_OutputComplete": - """Create from user's yield value. - - Args: - source: What the user yielded (Output or Message). - empty_batch: Empty batch to substitute when needed. - - Returns: - Normalized output with guaranteed non-None batch. - - """ - if isinstance(source, vgi.log.Message): - return cls(batch=empty_batch, log_message=source) - # source is Output - return cls( - batch=source.batch if source.batch is not None else empty_batch, - ) - - @dataclass(frozen=True, slots=True) class ProtocolOutput: """Output yielded by the generator after each send(). @@ -210,7 +170,7 @@ def metadata( ) @classmethod - def from_process_result(cls, process_result: "_OutputComplete") -> "ProtocolOutput": + def from_process_result(cls, process_result: "OutputComplete") -> "ProtocolOutput": """Create a ProtocolOutput from an Output and status. Args: @@ -383,7 +343,7 @@ def process(self) -> OutputGenerator: """ @final - def _process_and_validate(self, generator: OutputGenerator) -> _OutputComplete: + def _process_and_validate(self, generator: OutputGenerator) -> OutputComplete: """Process a batch and validate the output schema. Converts the result of the generator to OutputComplete, and @@ -399,7 +359,7 @@ def _process_and_validate(self, generator: OutputGenerator) -> _OutputComplete: SchemaValidationError: If output batch schema doesn't match. """ - result: _OutputComplete = _OutputComplete.from_process_result( + result: OutputComplete = OutputComplete.from_process_result( generator.send(None), self.empty_output_batch, ) @@ -410,7 +370,7 @@ def _process_and_validate(self, generator: OutputGenerator) -> _OutputComplete: def _process_with_exception_handling( self, generator: OutputGenerator, - ) -> _OutputComplete: + ) -> OutputComplete: """Process a batch with exception handling. Wraps _process_and_validate to catch exceptions and convert them @@ -424,13 +384,13 @@ def _process_with_exception_handling( except StopIteration: raise except Exception as e: - return _OutputComplete( + return OutputComplete( batch=self.empty_output_batch, log_message=vgi.log.Message.from_exception(e), ) @final - def _should_terminate(self, result: _OutputComplete) -> bool: + def _should_terminate(self, result: OutputComplete) -> bool: """Check if processing should terminate due to an exception.""" return ( result.log_message is not None diff --git a/vgi/table_in_out_function.py b/vgi/table_in_out_function.py index 879ca5d..a3d8378 100644 --- a/vgi/table_in_out_function.py +++ b/vgi/table_in_out_function.py @@ -78,6 +78,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator: import vgi.ipc_utils import vgi.log import vgi.table_function +from vgi.output_complete import OutputComplete __all__ = [ "ProtocolInput", @@ -181,7 +182,7 @@ def metadata( @classmethod def from_process_result( - cls, process_result: "_OutputComplete", in_finalize_phase: bool + cls, process_result: "OutputComplete", in_finalize_phase: bool ) -> "ProtocolOutput": """Create a ProtocolOutput from an Output and status. @@ -353,52 +354,6 @@ def wrapper(self: T, first_batch: pa.RecordBatch) -> OutputGenerator: return wrapper -@dataclass(frozen=True, slots=True) -class _OutputComplete: - """Internal: Output with guaranteed non-None batch. - - Used by the framework to normalize generator yields. When the user yields - None, Output with None batch, or Message, this class ensures we always - have a valid RecordBatch for the protocol. - - Attributes: - batch: Always a valid RecordBatch (never None). - has_more: If True, generator expects another send() call. - log_message: Present when user yielded Message directly. - - """ - - batch: pa.RecordBatch - has_more: bool = False - log_message: vgi.log.Message | None = None - - @classmethod - def from_process_result( - cls, - source: vgi.log.Message | Output | None, - empty_batch: pa.RecordBatch, - ) -> "_OutputComplete": - """Create from user's yield value. - - Args: - source: What the user yielded (Output, Message, or None). - empty_batch: Empty batch to substitute when needed. - - Returns: - Normalized output with guaranteed non-None batch. - - """ - if source is None: - return cls(batch=empty_batch) - if isinstance(source, vgi.log.Message): - return cls(batch=empty_batch, has_more=True, log_message=source) - # source is Output - return cls( - batch=source.batch if source.batch is not None else empty_batch, - has_more=source.has_more, - ) - - class TableInOutGeneratorFunction(vgi.table_function.TableFunctionBase): """Base class for streaming table functions that transform Arrow RecordBatches. @@ -588,7 +543,7 @@ def _process_and_validate( self, generator: OutputGenerator, batch: pa.RecordBatch | None, - ) -> _OutputComplete: + ) -> OutputComplete: """Process a batch and validate both input and output schemas. Validates the input batch schema, sends it to the generator, converts @@ -607,7 +562,7 @@ def _process_and_validate( """ if batch is not None: self._validate_input_schema(batch) - result: _OutputComplete = _OutputComplete.from_process_result( + result: OutputComplete = OutputComplete.from_process_result( generator.send(batch), self.empty_output_batch, ) @@ -619,7 +574,7 @@ def _process_with_exception_handling( self, generator: OutputGenerator, batch: pa.RecordBatch | None, - ) -> _OutputComplete: + ) -> OutputComplete: """Process a batch with exception handling. Wraps _process_and_validate to catch exceptions and convert them @@ -628,13 +583,13 @@ def _process_with_exception_handling( try: return self._process_and_validate(generator, batch) except Exception as e: - return _OutputComplete( + return OutputComplete( batch=self.empty_output_batch, log_message=vgi.log.Message.from_exception(e), ) @final - def _should_terminate(self, result: _OutputComplete) -> bool: + def _should_terminate(self, result: OutputComplete) -> bool: """Check if processing should terminate due to an exception.""" return ( result.log_message is not None