diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 1df6b31..f3a1047 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -8,7 +8,7 @@ {"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.240397-05:00"} {"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]} {"id":"vgi-python-aad","title":"Design: DuckDB settings/pragmas access for VGI functions","description":"Design how VGI functions can declare required DuckDB settings/pragmas in their Meta class, and how these settings values should be passed during the bind phase.\n\nKey design decisions:\n1. How to declare required settings in function Meta (e.g., required_settings = ['timezone', 'threads'])\n2. How to add settings to Invocation dataclass\n3. How settings values should be accessed in function code\n4. Serialization format for settings in Arrow IPC\n\nRecommendation: Add 'duckdb_settings: dict[str, str] | None' to Invocation and 'required_settings: list[str]' to Meta class.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T13:05:47.619105-05:00","created_by":"rusty","updated_at":"2026-01-04T13:11:13.197139-05:00","closed_at":"2026-01-04T13:11:13.197139-05:00","close_reason":"Design document created at docs/design-duckdb-settings.md"} -{"id":"vgi-python-bi8","title":"Extract common _process_with_exception_handling into mixin","description":"The _process_with_exception_handling and _process_and_validate methods are duplicated across scalar_function.py:296-346, table_function.py:386-438, and table_in_out_function.py:586-642. All follow same pattern: try _process_and_validate, catch exceptions, return OutputComplete with error message. Extract to ProcessingMixin that all function types inherit from.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:41.02111-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:15.947758-05:00","dependencies":[{"issue_id":"vgi-python-bi8","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.181408-05:00","created_by":"rusty"}]} +{"id":"vgi-python-bi8","title":"Extract common _process_with_exception_handling into mixin","description":"The _process_with_exception_handling and _process_and_validate methods are duplicated across scalar_function.py:296-346, table_function.py:386-438, and table_in_out_function.py:586-642. All follow same pattern: try _process_and_validate, catch exceptions, return OutputComplete with error message. Extract to ProcessingMixin that all function types inherit from.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:41.02111-05:00","created_by":"rusty","updated_at":"2026-01-04T21:44:11.818421-05:00","dependencies":[{"issue_id":"vgi-python-bi8","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.181408-05:00","created_by":"rusty"}]} {"id":"vgi-python-bku","title":"Change cardinality() method to property for consistency with output_schema","description":"Inconsistent access patterns: output_schema is a property but cardinality() is a method. Both return immutable data. Change cardinality() to a property for API consistency. Located in table_function.py:304-314.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.211782-05:00","created_by":"rusty","updated_at":"2026-01-04T21:33:10.617152-05:00","closed_at":"2026-01-04T21:33:10.617152-05:00","close_reason":"PR #6 created: https://github.com/Query-farm/vgi-python/pull/6"} {"id":"vgi-python-bqb","title":"Update worker to handle DuckDB settings during bind","description":"Update vgi/worker.py to process DuckDB settings from Invocation during the bind phase.\n\nChanges needed:\n- Read settings from invocation.duckdb_settings\n- Validate that all required_settings (from Meta) are present in invocation\n- Pass settings to function instance for access\n- Log settings usage for debugging\n\nThe worker should validate settings early in bind to fail fast if required settings are missing.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.04037-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.17079-05:00","closed_at":"2026-01-04T13:20:41.17079-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-bqb","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.714281-05:00","created_by":"rusty"}]} {"id":"vgi-python-c2b","title":"Add duckdb_settings field to Invocation class","description":"Update vgi/invocation.py to add a duckdb_settings field to the Invocation dataclass.\n\nChanges needed:\n- Add 'duckdb_settings: dict[str, str] | None = None' field to Invocation\n- Update serialize() to include settings in Arrow IPC batch\n- Update deserialize() to read settings from Arrow IPC batch\n- Handle None case (no settings requested)\n\nSerialization: Use a struct field with string key-value pairs or a map type.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:47.765077-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.167817-05:00","closed_at":"2026-01-04T13:20:41.167817-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-c2b","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.664038-05:00","created_by":"rusty"}]} diff --git a/vgi/function.py b/vgi/function.py index e174eef..19d7621 100644 --- a/vgi/function.py +++ b/vgi/function.py @@ -42,10 +42,12 @@ import structlog import vgi.ipc_utils +import vgi.log from vgi.exceptions import ExecutionIdentifierError, SchemaValidationError from vgi.function_storage import FunctionStorage, FunctionStorageSqlite from vgi.invocation import InitResult, Invocation from vgi.metadata import MetadataMixin, ResolvedMetadata +from vgi.output_complete import OutputComplete __all__ = [ "Function", @@ -584,6 +586,22 @@ def _validate_output_schema(self, batch: pa.RecordBatch) -> None: context=f"output from {type(self).__name__}", ) + @final + def _should_terminate(self, result: OutputComplete) -> bool: + """Check if processing should terminate due to an exception.""" + return ( + result.log_message is not None + and result.log_message.level == vgi.log.Level.EXCEPTION + ) + + @final + def _create_error_output(self, exception: Exception) -> OutputComplete: + """Create an OutputComplete with error message from exception.""" + return OutputComplete( + batch=self.empty_output_batch, + log_message=vgi.log.Message.from_exception(exception), + ) + @property def input_schema(self) -> pa.Schema: """Return the input schema from the invocation. diff --git a/vgi/scalar_function.py b/vgi/scalar_function.py index c101d47..2ee9f2c 100644 --- a/vgi/scalar_function.py +++ b/vgi/scalar_function.py @@ -309,18 +309,9 @@ def _process_with_exception_handling( try: return self._process_and_validate(generator, input_batch) except Exception as e: - return OutputComplete( - batch=self.empty_output_batch, - log_message=vgi.log.Message.from_exception(e), - ) + return self._create_error_output(e) - @final - def _should_terminate(self, result: OutputComplete) -> bool: - """Check if processing should terminate due to an exception.""" - return ( - result.log_message is not None - and result.log_message.level == vgi.log.Level.EXCEPTION - ) + # _should_terminate inherited from Function @abstractmethod def process(self, batch: pa.RecordBatch) -> ScalarOutputGenerator: diff --git a/vgi/table_function.py b/vgi/table_function.py index 4dff9a3..f1ecae5 100644 --- a/vgi/table_function.py +++ b/vgi/table_function.py @@ -385,18 +385,9 @@ def _process_with_exception_handling( except StopIteration: raise except Exception as e: - return OutputComplete( - batch=self.empty_output_batch, - log_message=vgi.log.Message.from_exception(e), - ) + return self._create_error_output(e) - @final - def _should_terminate(self, result: OutputComplete) -> bool: - """Check if processing should terminate due to an exception.""" - return ( - result.log_message is not None - and result.log_message.level == vgi.log.Level.EXCEPTION - ) + # _should_terminate inherited from Function def process(self) -> OutputGenerator: """Process batches during the DATA phase. diff --git a/vgi/table_in_out_function.py b/vgi/table_in_out_function.py index e522b46..e71cb3f 100644 --- a/vgi/table_in_out_function.py +++ b/vgi/table_in_out_function.py @@ -583,18 +583,9 @@ def _process_with_exception_handling( try: return self._process_and_validate(generator, batch) except Exception as e: - return OutputComplete( - batch=self.empty_output_batch, - log_message=vgi.log.Message.from_exception(e), - ) + return self._create_error_output(e) - @final - def _should_terminate(self, result: OutputComplete) -> bool: - """Check if processing should terminate due to an exception.""" - return ( - result.log_message is not None - and result.log_message.level == vgi.log.Level.EXCEPTION - ) + # _should_terminate inherited from Function def process(self, batch: pa.RecordBatch) -> OutputGenerator: """Process input batches during the DATA phase.