Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ VGI (Vector Gateway Interface) provides an Apache Arrow-based protocol for conne
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Worker Process │ │
│ │ SCALAR FUNCTION (ScalarFunction) │ │
│ │ - compute(batch): Transform each row to single output column │ │
│ │ OR │ │
│ │ TABLE FUNCTION (TableFunctionGenerator) │ │
│ │ - process(): Generator yielding output batches (no input) │ │
│ │ OR │ │
Expand All @@ -52,13 +55,15 @@ VGI (Vector Gateway Interface) provides an Apache Arrow-based protocol for conne

| Type | Base Class | Input | Use Case |
|------|------------|-------|----------|
| **Scalar Function** | `ScalarFunction` | Batches | Per-row transforms (1:1 row mapping, single output column) |
| **Table Function** | `TableFunctionGenerator` | None | Generate data (sequences, ranges) |
| **Table-In-Out Function** | `TableInOutFunction` | Batches | Transform, filter, aggregate |

### Key Components

- **Worker** (`vgi/worker.py`): Subprocess that hosts functions
- **Client** (`vgi/client/client.py`): Spawns workers, streams data
- **ScalarFunction** (`vgi/scalar_function.py`): Base for scalar functions
- **TableFunctionGenerator** (`vgi/table_function.py`): Base for table functions
- **TableInOutFunction** (`vgi/table_in_out_function.py`): Base for table-in-out functions

Expand All @@ -67,7 +72,8 @@ VGI (Vector Gateway Interface) provides an Apache Arrow-based protocol for conne
```
vgi/
__init__.py # Package exports
function.py # Invocation, OutputSpec, Arguments, GlobalInitResult
function.py # Invocation, OutputSpec, Arguments, FunctionType
scalar_function.py # ScalarFunction, ScalarFunctionGenerator
table_function.py # TableFunctionGenerator, CardinalityInfo, Output
table_in_out_function.py # TableInOutFunction, TableInOutGeneratorFunction
metadata.py # Function metadata for introspection
Expand All @@ -76,6 +82,7 @@ vgi/
client/
client.py # Client class
examples/
scalar.py # Example scalar functions
table.py # Example table functions
table_in_out.py # Example table-in-out functions
worker.py # ExampleWorker with registry
Expand All @@ -89,6 +96,32 @@ vgi-client --input data.parquet --function echo --server vgi-example-worker
vgi-client --input data.parquet --function sum_all_columns --server vgi-example-worker
```

## Creating a Scalar Function (Per-Row Transform)

```python
import pyarrow as pa
import pyarrow.compute as pc
from vgi import ScalarFunction, Arg

class DoubleColumn(ScalarFunction):
"""Double the value in a specified column."""

column = Arg[str](0, doc="Column to double")

@property
def output_type(self) -> pa.DataType:
# Output type matches input column type
return self.input_schema.field(self.column).type

def compute(self, batch: pa.RecordBatch) -> pa.Array:
return pc.multiply(batch.column(self.column), 2)
```

### Key Constraints for Scalar Functions:
- **1:1 row mapping**: Output must have exactly the same number of rows as input
- **Single column output**: Output schema has exactly one column named "result"
- **No finalize phase**: All processing happens in compute()

## Creating a Table-In-Out Function (Recommended)

```python
Expand Down Expand Up @@ -182,6 +215,9 @@ if __name__ == "__main__":
### Imports

```python
# Scalar Functions (per-row transform)
from vgi import ScalarFunction, Arg, Worker

# Table Functions (no input)
from vgi import TableFunctionGenerator, Output, Arg, Worker

Expand Down Expand Up @@ -221,6 +257,17 @@ output_schema = schema_like(self.input_schema, rename={"old": "new"})

### Method Override Summary

**ScalarFunction:**

| Method | When to Override | Default |
|--------|------------------|---------|
| `output_type` | Define output column type | Required |
| `compute(batch)` | Transform batch to single array | Required |
| `setup()` | Acquire resources | No-op |
| `teardown()` | Release resources | No-op |

**TableInOutFunction:**

| Method | When to Override | Default |
|--------|------------------|---------|
| `output_schema` | Change output columns | Returns input_schema |
Expand All @@ -232,17 +279,22 @@ output_schema = schema_like(self.input_schema, rename={"old": "new"})
### Pattern Decision Tree

```
Need to implement a VGI function?
├─ Does the function receive input data?
│ │
│ ├─ NO → Use TableFunctionGenerator
│ │ Override process() to yield Output batches
│ │
│ └─ YES → Use TableInOutFunction
│ ├─ Transform each batch? → Override transform()
│ ├─ Aggregate results? → Accumulate in transform(), emit in finish()
│ └─ Need generator control? → See docs/generator-api.md
How will your function be used in SQL?

1. SELECT my_func(col1, col2) FROM table
→ SCALAR FUNCTION: Returns one value per input row
→ Use ScalarFunction, override output_type and compute()
→ Example: upper(), abs(), concat()

2. SELECT * FROM my_func(args)
→ TABLE FUNCTION: Generates rows from arguments (no input table)
→ Use TableFunctionGenerator, override process()
→ Example: range(), read_csv(), glob()

3. SELECT * FROM my_func(args, (SELECT * FROM input_table))
→ TABLE-IN-OUT FUNCTION: Transforms input rows to output rows
→ Use TableInOutFunction, override transform() and optionally finish()
→ Example: filtering, enrichment, aggregation
```

## Additional Documentation
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ warn_unused_ignores = true
module = "structlog.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
# These files have type: ignore comments for ty that mypy doesn't need
module = ["vgi.examples.scalar", "vgi.table_function"]
warn_unused_ignores = false

[tool.ty.environment]
python-version = "3.12"

Expand Down
Loading