Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .beads/issues.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{"id":"vgi-python-e9q","title":"Unify ProtocolOutput classes with shared base","description":"ProtocolOutput classes in table_function.py:177-224 and table_in_out_function.py:144-207 share similar metadata() method and from_process_result() classmethod. The table_in_out version adds status field. Create shared base with table_in_out extending it for status support.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.45014-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.371419-05:00"}
{"id":"vgi-python-ivf","title":"Add required_settings to function Meta class","description":"Update function metadata to support declaring required DuckDB settings.\n\nChanges needed:\n- Add 'required_settings: list[str]' to FunctionMeta in vgi/metadata.py\n- Update Meta class resolution in vgi/function.py\n- Add validation that required_settings is a list of strings\n- Make it available via get_metadata() for introspection\n\nExample usage:\nclass MyFunction(TableInOutFunction):\n class Meta:\n required_settings = ['timezone', 'threads']","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:47.903747-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.169516-05:00","closed_at":"2026-01-04T13:20:41.169516-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-ivf","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.690253-05:00","created_by":"rusty"}]}
{"id":"vgi-python-j4t","title":"Update client to pass DuckDB settings in Invocation","description":"Update vgi/client/client.py to support passing DuckDB settings.\n\nChanges needed:\n- Add 'duckdb_settings: dict[str, str] | None = None' parameter to relevant methods\n- Include settings in Invocation creation\n- Add helper to query function's required_settings from metadata\n\nThe client needs to know what settings to pass. Options:\n1. Client queries worker for function metadata first\n2. Settings passed explicitly by caller\n3. Client introspects function class if available locally","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T13:05:48.358656-05:00","created_by":"rusty","updated_at":"2026-01-04T13:20:41.173178-05:00","closed_at":"2026-01-04T13:20:41.173178-05:00","close_reason":"Implementation complete, all tests pass","dependencies":[{"issue_id":"vgi-python-j4t","depends_on_id":"vgi-python-aad","type":"blocks","created_at":"2026-01-04T13:06:13.761572-05:00","created_by":"rusty"}]}
{"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:16.506499-05:00"}
{"id":"vgi-python-kz4","title":"Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency","description":"Naming inconsistency: TableFunctionGenerator uses *Generator suffix, but TableInOutGeneratorFunction uses *GeneratorFunction suffix. Rename TableInOutGeneratorFunction to TableInOutGenerator for consistency. Also consider renaming ScalarFunctionGenerator if needed.","status":"in_progress","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:41.581028-05:00","created_by":"rusty","updated_at":"2026-01-04T21:39:29.83686-05:00"}
{"id":"vgi-python-odi","title":"Change max_processes from method to property in Function hierarchy","description":"Refactor max_processes from a method to a property across the Function class hierarchy (Function, ScalarFunction, TableFunctionGenerator, TableInOutFunction, etc.). This makes the API more consistent since max_processes is effectively a constant per function class and properties are more idiomatic for such values.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T11:25:29.750648-05:00","created_by":"rusty","updated_at":"2026-01-04T11:50:57.566545-05:00","closed_at":"2026-01-04T11:50:57.566545-05:00","close_reason":"Closed"}
{"id":"vgi-python-p91","title":"Move exception classes from function.py to own file","description":"Move InitIdentifierError and SchemaValidationError from vgi/function.py to a new vgi/exceptions.py file. Update imports in function.py and any other files that reference these exceptions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:12:28.058227-05:00","created_by":"rusty","updated_at":"2026-01-04T09:17:52.477661-05:00","closed_at":"2026-01-04T09:17:52.477661-05:00","close_reason":"Closed"}
{"id":"vgi-python-r3t","title":"Consolidate test client infrastructure in testing.py","description":"testing.py has three test client classes (FunctionTestClient, TableFunctionTestClient, ScalarFunctionTestClient) with shared infrastructure patterns. Extend _BaseTestClient pattern to reduce code duplication. Consider using a single unified client with method dispatch based on function type.","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-04T20:06:53.913912-05:00","created_by":"rusty","updated_at":"2026-01-04T20:07:38.132591-05:00"}
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ vgi/
function.py # Invocation, OutputSpec, Arguments, FunctionType
scalar_function.py # ScalarFunction, ScalarFunctionGenerator
table_function.py # TableFunctionGenerator, TableCardinality, Output
table_in_out_function.py # TableInOutFunction, TableInOutGeneratorFunction
table_in_out_function.py # TableInOutFunction, TableInOutGenerator
metadata.py # Function metadata for introspection
schema_utils.py # Schema builder helpers (schema, schema_like)
worker.py # Worker base class
Expand Down
12 changes: 6 additions & 6 deletions docs/generator-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def process(self):

## Table-In-Out Generator Function (Advanced)

For advanced streaming control with input data, use `TableInOutGeneratorFunction`. Most users should prefer `TableInOutFunction` instead.
For advanced streaming control with input data, use `TableInOutGenerator`. Most users should prefer `TableInOutFunction` instead.

### When to Use Generator API

Expand All @@ -182,9 +182,9 @@ For advanced streaming control with input data, use `TableInOutGeneratorFunction

```python
import pyarrow as pa
from vgi import TableInOutGeneratorFunction, Output, OutputGenerator, Arg
from vgi import TableInOutGenerator, Output, OutputGenerator, Arg

class MyFunction(TableInOutGeneratorFunction):
class MyFunction(TableInOutGenerator):
"""One-line description."""

@property
Expand All @@ -208,13 +208,13 @@ class MyFunction(TableInOutGeneratorFunction):

**Passthrough (Echo):**
```python
class EchoFunction(TableInOutGeneratorFunction):
class EchoFunction(TableInOutGenerator):
pass # Default process() passes input unchanged
```

**Aggregation (emit on finalize):**
```python
class SumFunction(TableInOutGeneratorFunction):
class SumFunction(TableInOutGenerator):
@property
def output_schema(self):
return pa.schema([pa.field("sum", pa.int64())])
Expand Down Expand Up @@ -270,7 +270,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator:

## Common Mistakes

### 1. Forgetting the priming yield (TableInOutGeneratorFunction only)
### 1. Forgetting the priming yield (TableInOutGenerator only)

```python
# ❌ WRONG - will raise TypeError on first send()
Expand Down
2 changes: 1 addition & 1 deletion tests/table_in_out/generator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Tests for TableInOutGeneratorFunction implementations."""
"""Tests for TableInOutGenerator implementations."""
12 changes: 6 additions & 6 deletions tests/table_in_out/test_streaming_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
Output,
OutputGenerator,
StreamingGenerator,
TableInOutGeneratorFunction,
TableInOutGenerator,
streaming,
)
from vgi.log import Level, Message
from vgi.testing import FunctionTestClient, batch


class EchoStreamingFunction(TableInOutGeneratorFunction):
class EchoStreamingFunction(TableInOutGenerator):
"""Simple echo function using the @streaming decorator."""

@streaming
Expand All @@ -26,7 +26,7 @@ def process(self, b: pa.RecordBatch) -> StreamingGenerator:
current = yield Output(current)


class CountingStreamingFunction(TableInOutGeneratorFunction):
class CountingStreamingFunction(TableInOutGenerator):
"""Function that counts batches using @streaming."""

def __init__(
Expand All @@ -45,7 +45,7 @@ def process(self, b: pa.RecordBatch) -> StreamingGenerator:
current = yield Output(current)


class AccumulatingStreamingFunction(TableInOutGeneratorFunction):
class AccumulatingStreamingFunction(TableInOutGenerator):
"""Function that accumulates and outputs empty batches during process."""

def __init__(
Expand Down Expand Up @@ -80,7 +80,7 @@ def finalize(self) -> OutputGenerator:
)


class LoggingStreamingFunction(TableInOutGeneratorFunction):
class LoggingStreamingFunction(TableInOutGenerator):
"""Function that logs using the @streaming decorator."""

@streaming
Expand Down Expand Up @@ -171,7 +171,7 @@ class TestStreamingDecoratorComparedToManual:
def test_equivalent_output(self) -> None:
"""@streaming decorated function should produce same output as manual."""

class ManualEcho(TableInOutGeneratorFunction):
class ManualEcho(TableInOutGenerator):
def process(self, b: pa.RecordBatch) -> OutputGenerator:
"""Manual process without decorator."""
_ = yield None
Expand Down
18 changes: 9 additions & 9 deletions vgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch:
# Transform each batch here
return batch

For advanced streaming control, use TableInOutGeneratorFunction with the
For advanced streaming control, use TableInOutGenerator with the
@streaming decorator:

from vgi import TableInOutGeneratorFunction, Output, StreamingGenerator, streaming
from vgi import TableInOutGenerator, Output, StreamingGenerator, streaming
import pyarrow as pa

class MyFunction(TableInOutGeneratorFunction):
class MyFunction(TableInOutGenerator):
@streaming
def process(self, batch: pa.RecordBatch) -> StreamingGenerator:
# No priming yield needed!
Expand All @@ -31,10 +31,10 @@ def process(self, batch: pa.RecordBatch) -> StreamingGenerator:

Or without the decorator (more verbose):

from vgi import TableInOutGeneratorFunction, Output, OutputGenerator
from vgi import TableInOutGenerator, Output, OutputGenerator
import pyarrow as pa

class MyFunction(TableInOutGeneratorFunction):
class MyFunction(TableInOutGenerator):
def process(self, batch: pa.RecordBatch) -> OutputGenerator:
_ = yield None # Required priming yield
while True:
Expand All @@ -58,7 +58,7 @@ class MyWorker(Worker):
Classes and functions exported from this module:

TableInOutFunction - Callback-based API (recommended)
TableInOutGeneratorFunction - Generator-based API (advanced)
TableInOutGenerator - Generator-based API (advanced)
ScalarFunction - Scalar function with compute() (single-column output)
ScalarFunctionGenerator - Scalar function with generator protocol
Output - Output batch from process()/finalize()
Expand Down Expand Up @@ -114,7 +114,7 @@ class Meta:
vgi.function.Function - Base (max_processes, invocation_id)
├─ vgi.table_function.TableFunctionBase - Adds cardinality hints, projection
│ ├─ TableFunctionGenerator - Generate output without input
│ └─ TableInOutGeneratorFunction - Full streaming (process/finalize)
│ └─ TableInOutGenerator - Full streaming (process/finalize)
│ └─ TableInOutFunction - Callback API (transform/finish)
│ ├─ AggregationFunction - Reduce to summary
│ ├─ FilterFunction - Row filtering
Expand Down Expand Up @@ -159,7 +159,7 @@ class Meta:
OutputGenerator,
StreamingGenerator,
TableInOutFunction,
TableInOutGeneratorFunction,
TableInOutGenerator,
streaming,
)
from vgi.table_in_out_function_patterns import (
Expand Down Expand Up @@ -195,7 +195,7 @@ class Meta:
"ScalarOutputGenerator",
"StreamingGenerator",
"TableInOutFunction",
"TableInOutGeneratorFunction",
"TableInOutGenerator",
"TableInput",
"TableInputValidationError",
"Worker",
Expand Down
2 changes: 1 addition & 1 deletion vgi/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
-------
client.start() : Start the worker subprocess
client.stop() : Stop the worker subprocess
client.table_in_out_function() : Invoke a TableInOutGeneratorFunction and stream results
client.table_in_out_function() : Invoke a TableInOutGenerator and stream results
client.table_function() : Invoke a TableFunctionGenerator and stream results
client.scalar_function() : Invoke a ScalarFunction and stream results
client.get_worker_stderr() : Get captured stderr from worker
Expand Down
10 changes: 5 additions & 5 deletions vgi/examples/table_in_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
Output,
OutputGenerator,
TableInOutFunction,
TableInOutGeneratorFunction,
TableInOutGenerator,
)

__all__ = [
Expand All @@ -55,7 +55,7 @@
]


class EchoFunction(TableInOutGeneratorFunction):
class EchoFunction(TableInOutGenerator):
"""Passthrough function that emits each input batch unchanged.

USE CASE
Expand Down Expand Up @@ -90,7 +90,7 @@ class Meta:
data: TableInput = Arg[TableInput](0, doc="Input table") # type: ignore[assignment]


class BufferInputFunction(TableInOutGeneratorFunction):
class BufferInputFunction(TableInOutGenerator):
"""Buffering function that collects all input and emits during finalization.

USE CASE
Expand Down Expand Up @@ -166,7 +166,7 @@ def finalize(self) -> OutputGenerator:
yield Output(b, has_more)


class RepeatInputsFunction(TableInOutGeneratorFunction):
class RepeatInputsFunction(TableInOutGenerator):
"""Explosion function that duplicates each input batch N times.

USE CASE
Expand Down Expand Up @@ -262,7 +262,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator:
batch = received


class SumAllColumnsFunction(TableInOutGeneratorFunction):
class SumAllColumnsFunction(TableInOutGenerator):
"""Aggregation function that computes column-wise sums across all batches.

USE CASE
Expand Down
4 changes: 2 additions & 2 deletions vgi/examples/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
each class's metadata (Meta.name or snake_case of class name).

The worker supports:
- TableInOutGeneratorFunction: Transforms input batches to output batches
- TableInOutGenerator: Transforms input batches to output batches
- TableFunctionGenerator: Generates output batches without input
- ScalarFunctionGenerator: Transforms input to single-column output (1:1 rows)

Expand Down Expand Up @@ -47,7 +47,7 @@ class ExampleWorker(Worker):
"""Example worker with built-in test functions."""

functions = [
# TableInOutGeneratorFunction - transform input batches
# TableInOutGenerator - transform input batches
EchoFunction,
BufferInputFunction,
RepeatInputsFunction,
Expand Down
2 changes: 1 addition & 1 deletion vgi/output_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class OutputComplete:
Attributes:
batch: Always a valid RecordBatch (never None).
has_more: If True, generator expects another send() call.
Only used by TableInOutGeneratorFunction.
Only used by TableInOutGenerator.
log_message: Present when user yielded Message directly.

"""
Expand Down
10 changes: 5 additions & 5 deletions vgi/table_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
Function (vgi.function)
└── TableFunctionBase
└── TableFunctionGenerator (simple generator, no input via send)
└── TableInOutGeneratorFunction (full protocol with input batches)
└── TableInOutGenerator (full protocol with input batches)

TableFunctionGenerator is useful for functions that don't need to receive
input batches via yield - they just produce output batches in a loop until done.
For functions that transform input batches, use TableInOutGeneratorFunction.
For functions that transform input batches, use TableInOutGenerator.
"""

from collections.abc import Generator
Expand Down Expand Up @@ -230,15 +230,15 @@ class TableFunctionBase(vgi.function.Function[TableFunctionInitInput]):

This class is not meant to be used directly. Subclass either:
- TableFunctionGenerator: For simple generators that produce output
- TableInOutGeneratorFunction: For functions that transform input batches
- TableInOutGenerator: For functions that transform input batches

Attributes:
init_input: TableFunctionInitInput with projection info (set after init)
empty_output_batch: Cached empty batch conforming to output_schema

See Also:
TableFunctionGenerator: Simple generator base class
TableInOutGeneratorFunction: Full streaming with input batches
TableInOutGenerator: Full streaming with input batches

"""

Expand Down Expand Up @@ -305,7 +305,7 @@ class TableFunctionGenerator(TableFunctionBase):
- Produce a fixed sequence of output batches
- Don't need the full DATA/FINALIZE protocol

For functions that transform input batches, use TableInOutGeneratorFunction.
For functions that transform input batches, use TableInOutGenerator.

LIFECYCLE
---------
Expand Down
Loading
Loading