Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .beads/issues.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{"id":"vgi-python-5er","title":"Extract _should_terminate into shared base class","description":"Identical _should_terminate method is copy-pasted in all three function modules. Implementation is always: check if log_message exists and level is EXCEPTION. Move to shared base class (Function or new ProcessingMixin) to eliminate duplication.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.190482-05:00","created_by":"rusty","updated_at":"2026-01-04T21:49:59.765614-05:00","closed_at":"2026-01-04T21:49:59.765614-05:00","close_reason":"Completed as part of PR #8 - _should_terminate moved to Function base class","dependencies":[{"issue_id":"vgi-python-5er","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.283865-05:00","created_by":"rusty"}]}
{"id":"vgi-python-67w","title":"Create example function using DuckDB settings","description":"Create an example function that demonstrates using DuckDB settings to determine its output.\n\nRequirements:\n- Function declares required_settings in Meta\n- Output schema depends on a setting value (e.g., include extra column based on setting)\n- Clear documentation showing the pattern\n\nExample ideas:\n1. TimezoneAwareFunction: Output includes timezone info based on 'timezone' setting\n2. VerboseOutput: Adds debug columns when 'debug_mode' setting is true\n3. NumericPrecision: Uses 'numeric_precision' to determine output type precision\n\nAdd to vgi/examples/ and register in ExampleWorker.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.503681-05:00","created_by":"rusty","updated_at":"2026-01-04T13:22:23.779895-05:00","closed_at":"2026-01-04T13:22:23.779895-05:00","close_reason":"Added SettingsAwareFunction example","dependencies":[{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-c2b","type":"blocks","created_at":"2026-01-04T13:06:13.865474-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-ivf","type":"blocks","created_at":"2026-01-04T13:06:13.890269-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-bqb","type":"blocks","created_at":"2026-01-04T13:06:13.912531-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-a99","type":"blocks","created_at":"2026-01-04T13:06:13.936552-05:00","created_by":"rusty"},{"issue_id":"vgi-python-67w","depends_on_id":"vgi-python-j4t","type":"blocks","created_at":"2026-01-04T13:06:13.958494-05:00","created_by":"rusty"}]}
{"id":"vgi-python-6o0","title":"Consolidate _OutputComplete classes into shared module","description":"Three nearly identical _OutputComplete classes exist in scalar_function.py:168-197 (_ScalarOutputComplete), table_function.py:136-175 (_OutputComplete), and table_in_out_function.py:356-400 (_OutputComplete). All are frozen dataclasses with batch field, log_message field, and from_process_result() classmethod. Extract to shared module (e.g., vgi/protocol_types.py) with a single parameterized class.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:40.893139-05:00","created_by":"rusty","updated_at":"2026-01-04T21:18:34.529683-05:00","closed_at":"2026-01-04T21:18:34.529683-05:00","close_reason":"PR #5 created: https://github.com/Query-farm/vgi-python/pull/5"}
{"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T21:53:26.965345-05:00","closed_at":"2026-01-04T21:53:26.965345-05:00","close_reason":"PR #9 created - unified ProtocolInput with shared base in protocol_types.py"}
{"id":"vgi-python-79e","title":"Unify ProtocolInput classes with shared base","description":"ProtocolInput classes in scalar_function.py:151-166 and table_in_out_function.py:109-142 have similar structure with batch and metadata fields. The table_in_out version adds is_finalize logic. Create shared base ProtocolInput in protocol_types.py with table_in_out extending it.","status":"in_progress","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.31917-05:00","created_by":"rusty","updated_at":"2026-01-04T21:51:05.655788-05:00"}
{"id":"vgi-python-a99","title":"Add settings accessor to function base classes","description":"Add a property to access DuckDB settings values in function implementations.\n\nChanges needed:\n- Add 'settings: dict[str, str]' property to Function base class\n- Property should return self.invocation.duckdb_settings or empty dict\n- Add convenience method like 'get_setting(name, default=None)'\n- Update ScalarFunction, TableFunctionGenerator, TableInOutFunction\n\nExample usage in function:\ndef compute(self, batch):\n tz = self.get_setting('timezone', 'UTC')\n # or\n tz = self.settings.get('timezone', 'UTC')","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.221602-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.171991-05:00","closed_at":"2026-01-04T13:20:41.171991-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-a99","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.738212-05:00","created_by":"rusty"}]}
{"id":"vgi-python-aad","title":"Design: DuckDB settings/pragmas access for VGI functions","description":"Design how VGI functions can declare required DuckDB settings/pragmas in their Meta class, and how these settings values should be passed during the bind phase.\n\nKey design decisions:\n1. How to declare required settings in function Meta (e.g., required_settings = ['timezone', 'threads'])\n2. How to add settings to Invocation dataclass\n3. How settings values should be accessed in function code\n4. Serialization format for settings in Arrow IPC\n\nRecommendation: Add 'duckdb_settings: dict[str, str] | None' to Invocation and 'required_settings: list[str]' to Meta class.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T13:05:47.619105-05:00","created_by":"rusty","updated_at":"2026-01-04T13:11:13.197139-05:00","closed_at":"2026-01-04T13:11:13.197139-05:00","close_reason":"Design document created at docs/design-duckdb-settings.md"}
{"id":"vgi-python-bi8","title":"Extract common _process_with_exception_handling into mixin","description":"The _process_with_exception_handling and _process_and_validate methods are duplicated across scalar_function.py:296-346, table_function.py:386-438, and table_in_out_function.py:586-642. All follow same pattern: try _process_and_validate, catch exceptions, return OutputComplete with error message. Extract to ProcessingMixin that all function types inherit from.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T20:06:41.02111-05:00","created_by":"rusty","updated_at":"2026-01-04T21:49:46.320532-05:00","closed_at":"2026-01-04T21:49:46.320532-05:00","close_reason":"PR #8 created - extracted _should_terminate and added _create_error_output to Function base class","dependencies":[{"issue_id":"vgi-python-bi8","depends_on_id":"vgi-python-6o0","type":"blocks","created_at":"2026-01-04T20:07:49.181408-05:00","created_by":"rusty"}]}
Expand Down
25 changes: 25 additions & 0 deletions vgi/protocol_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Shared protocol types for VGI function communication.

This module provides base classes for protocol messages used across
different function types.
"""

from dataclasses import dataclass

import pyarrow as pa

__all__ = ["ProtocolInput"]


@dataclass(frozen=True, slots=True)
class ProtocolInput:
"""Base input sent to function generators via send().

Attributes:
batch: The input RecordBatch to process.
metadata: Optional metadata from the IPC stream.

"""

batch: pa.RecordBatch
metadata: pa.KeyValueMetadata | None = None
18 changes: 2 additions & 16 deletions vgi/scalar_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def compute(self, batch: pa.RecordBatch) -> pa.Array:

from abc import abstractmethod
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any, final

import pyarrow as pa
Expand All @@ -51,6 +50,7 @@ def compute(self, batch: pa.RecordBatch) -> pa.Array:
import vgi.log
from vgi.exceptions import SchemaValidationError
from vgi.output_complete import OutputComplete
from vgi.protocol_types import ProtocolInput
from vgi.table_function import Output, ProtocolOutput

__all__ = [
Expand Down Expand Up @@ -149,21 +149,7 @@ def _build_detailed_message(
ScalarOutputGenerator = Generator[vgi.log.Message | Output, pa.RecordBatch | None, None]


@dataclass(frozen=True, slots=True)
class ProtocolInput:
"""Input sent to the scalar function generator via send().

Contains an input batch and optional metadata. The scalar function
processes each batch and returns an output batch with the same row count.

Attributes:
batch: The input RecordBatch to process.
metadata: Optional metadata from the IPC stream.

"""

batch: pa.RecordBatch
metadata: pa.KeyValueMetadata | None = None
# ProtocolInput imported from vgi.protocol_types


class ScalarFunctionGenerator(vgi.function.Function[vgi.function.FunctionInitInput]):
Expand Down
11 changes: 6 additions & 5 deletions vgi/table_in_out_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator:
import vgi.log
import vgi.table_function
from vgi.output_complete import OutputComplete
from vgi.protocol_types import ProtocolInput as ProtocolInputBase

__all__ = [
"ProtocolInput",
Expand Down Expand Up @@ -107,10 +108,13 @@ class _OutputStatus(Enum):
FINISHED = "FINISHED"


@dataclass(frozen=True, slots=True)
class ProtocolInput:
@dataclass(frozen=True)
class ProtocolInput(ProtocolInputBase):
"""Input sent to the generator via send().

Extends ProtocolInputBase with finalize phase signaling for table-in-out
functions.

Attributes:
batch: The input RecordBatch to process.
metadata: Optional metadata; used to signal the FINALIZE phase.
Expand All @@ -120,9 +124,6 @@ class ProtocolInput:
# pa.KeyValueMetadata uses bytes so we define signals as bytes
_FINALIZE_SIGNAL: ClassVar[bytes] = b"FINALIZE"

batch: pa.RecordBatch
metadata: pa.KeyValueMetadata | None = None

@property
def is_finalize(self) -> bool:
"""Check if this input signals the FINALIZE phase."""
Expand Down
Loading