Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .beads/issues.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.240397-05:00"}
{"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]}
{"id":"vgi-python-aad","title":"Design: DuckDB settings/pragmas access for VGI functions","description":"Design how VGI functions can declare required DuckDB settings/pragmas in their Meta class, and how these settings values should be passed during the bind phase.\n\nKey design decisions:\n1. How to declare required settings in function Meta (e.g., required_settings = ['timezone', 'threads'])\n2. How to add settings to Invocation dataclass\n3. How settings values should be accessed in function code\n4. Serialization format for settings in Arrow IPC\n\nRecommendation: Add 'duckdb_settings: dict[str, str] | None' to Invocation and 'required_settings: list[str]' to Meta class.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T13:05:47.619105-05:00","created_by":"rusty","updated_at":"2026-01-04T13:11:13.197139-05:00","closed_at":"2026-01-04T13:11:13.197139-05:00","close_reason":"Design document created at docs/design-duckdb-settings.md"}
{"id":"vgi-python-bi8","title":"Extract common _process_with_exception_handling into mixin","description":"The _process_with_exception_handling and _process_and_validate methods are duplicated across scalar_function.py:296-346, table_function.py:386-438, and table_in_out_function.py:586-642. All follow same pattern: try _process_and_validate, catch exceptions, return OutputComplete with error message. Extract to ProcessingMixin that all function types inherit from.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:41.02111-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:15.947758-05:00","dependencies":[{"issue_id":"vgi-python-bi8","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.181408-05:00","created_by":"rusty"}]}
{"id":"vgi-python-bi8","title":"Extract common _process_with_exception_handling into mixin","description":"The _process_with_exception_handling and _process_and_validate methods are duplicated across scalar_function.py:296-346, table_function.py:386-438, and table_in_out_function.py:586-642. All follow same pattern: try _process_and_validate, catch exceptions, return OutputComplete with error message. Extract to ProcessingMixin that all function types inherit from.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:41.02111-05:00","created_by":"rusty","updated_at":"2026-01-04T21:44:11.818421-05:00","dependencies":[{"issue_id":"vgi-python-bi8","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.181408-05:00","created_by":"rusty"}]}
{"id":"vgi-python-bku","title":"Change cardinality() method to property for consistency with output_schema","description":"Inconsistent access patterns: output_schema is a property but cardinality() is a method. Both return immutable data. Change cardinality() to a property for API consistency. Located in table_function.py:304-314.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.211782-05:00","created_by":"rusty","updated_at":"2026-01-04T21:33:10.617152-05:00","closed_at":"2026-01-04T21:33:10.617152-05:00","close_reason":"PR #6 created: https://github.com/Query-farm/vgi-python/pull/6"}
{"id":"vgi-python-bqb","title":"Update worker to handle DuckDB settings during bind","description":"Update vgi/worker.py to process DuckDB settings from Invocation during the bind phase.\n\nChanges needed:\n- Read settings from invocation.duckdb_settings\n- Validate that all required_settings (from Meta) are present in invocation\n- Pass settings to function instance for access\n- Log settings usage for debugging\n\nThe worker should validate settings early in bind to fail fast if required settings are missing.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.04037-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.17079-05:00","closed_at":"2026-01-04T13:20:41.17079-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-bqb","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.714281-05:00","created_by":"rusty"}]}
{"id":"vgi-python-c2b","title":"Add duckdb_settings field to Invocation class","description":"Update vgi/invocation.py to add a duckdb_settings field to the Invocation dataclass.\n\nChanges needed:\n- Add 'duckdb_settings: dict[str, str] | None = None' field to Invocation\n- Update serialize() to include settings in Arrow IPC batch\n- Update deserialize() to read settings from Arrow IPC batch\n- Handle None case (no settings requested)\n\nSerialization: Use a struct field with string key-value pairs or a map type.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:47.765077-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.167817-05:00","closed_at":"2026-01-04T13:20:41.167817-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-c2b","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.664038-05:00","created_by":"rusty"}]}
Expand Down
18 changes: 18 additions & 0 deletions vgi/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import structlog

import vgi.ipc_utils
import vgi.log
from vgi.exceptions import ExecutionIdentifierError, SchemaValidationError
from vgi.function_storage import FunctionStorage, FunctionStorageSqlite
from vgi.invocation import InitResult, Invocation
from vgi.metadata import MetadataMixin, ResolvedMetadata
from vgi.output_complete import OutputComplete

__all__ = [
"Function",
Expand Down Expand Up @@ -584,6 +586,22 @@ def _validate_output_schema(self, batch: pa.RecordBatch) -> None:
context=f"output from {type(self).__name__}",
)

@final
def _should_terminate(self, result: OutputComplete) -> bool:
"""Check if processing should terminate due to an exception."""
return (
result.log_message is not None
and result.log_message.level == vgi.log.Level.EXCEPTION
)

@final
def _create_error_output(self, exception: Exception) -> OutputComplete:
"""Create an OutputComplete with error message from exception."""
return OutputComplete(
batch=self.empty_output_batch,
log_message=vgi.log.Message.from_exception(exception),
)

@property
def input_schema(self) -> pa.Schema:
"""Return the input schema from the invocation.
Expand Down
13 changes: 2 additions & 11 deletions vgi/scalar_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,9 @@ def _process_with_exception_handling(
try:
return self._process_and_validate(generator, input_batch)
except Exception as e:
return OutputComplete(
batch=self.empty_output_batch,
log_message=vgi.log.Message.from_exception(e),
)
return self._create_error_output(e)

@final
def _should_terminate(self, result: OutputComplete) -> bool:
"""Check if processing should terminate due to an exception."""
return (
result.log_message is not None
and result.log_message.level == vgi.log.Level.EXCEPTION
)
# _should_terminate inherited from Function

@abstractmethod
def process(self, batch: pa.RecordBatch) -> ScalarOutputGenerator:
Expand Down
13 changes: 2 additions & 11 deletions vgi/table_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,9 @@ def _process_with_exception_handling(
except StopIteration:
raise
except Exception as e:
return OutputComplete(
batch=self.empty_output_batch,
log_message=vgi.log.Message.from_exception(e),
)
return self._create_error_output(e)

@final
def _should_terminate(self, result: OutputComplete) -> bool:
"""Check if processing should terminate due to an exception."""
return (
result.log_message is not None
and result.log_message.level == vgi.log.Level.EXCEPTION
)
# _should_terminate inherited from Function

def process(self) -> OutputGenerator:
"""Process batches during the DATA phase.
Expand Down
13 changes: 2 additions & 11 deletions vgi/table_in_out_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,18 +583,9 @@ def _process_with_exception_handling(
try:
return self._process_and_validate(generator, batch)
except Exception as e:
return OutputComplete(
batch=self.empty_output_batch,
log_message=vgi.log.Message.from_exception(e),
)
return self._create_error_output(e)

@final
def _should_terminate(self, result: OutputComplete) -> bool:
"""Check if processing should terminate due to an exception."""
return (
result.log_message is not None
and result.log_message.level == vgi.log.Level.EXCEPTION
)
# _should_terminate inherited from Function

def process(self, batch: pa.RecordBatch) -> OutputGenerator:
"""Process input batches during the DATA phase.
Expand Down
Loading