Skip to content

feat: add scalar function support#4

Merged
rustyconover merged 6 commits into
mainfrom
feat-scalar-functions
Jan 4, 2026
Merged

feat: add scalar function support#4
rustyconover merged 6 commits into
mainfrom
feat-scalar-functions

Conversation

@rustyconover

Copy link
Copy Markdown
Contributor

Summary

Add support for scalar functions - a new function type that transforms input batches to single-column output with strict 1:1 row mapping.

Key Features

  • Two-tier API like TableInOut functions:
    • ScalarFunctionGenerator: Generator-based with process() for full control
    • ScalarFunction: Callback-based with compute() for simpler use cases
  • Key constraints enforced:
    • Single-column output (validated at construction)
    • Row count must match input (validated at runtime, except for log messages)
    • No finalize phase
  • Logging support: Via yield Message in generator API or self.log() in callback API

New Files

  • vgi/scalar_function.py: Core scalar function classes
  • vgi/examples/scalar.py: Example functions (DoubleColumnFunction, AddColumnsFunction, UpperCaseFunction)
  • tests/scalar/test_function.py: Comprehensive tests

Updated Files

  • vgi/worker.py: Added dispatch for ScalarFunctionGenerator and _process_scalar_batches() method
  • vgi/client/client.py: Added scalar_function() method
  • vgi/client/cli.py: Added --type option (auto/table/table-in-out/scalar)
  • vgi/testing.py: Added ScalarFunctionTestClient, run_scalar_function(), assert_scalar_function_output()
  • vgi/table_function.py: Added RowCountMismatchError exception
  • vgi/__init__.py: Exported new classes

Example Usage

class DoubleColumn(ScalarFunction):
    column = Arg[str](0, doc="Column to double")

    @property
    def output_type(self) -> pa.DataType:
        return self.input_schema.field(self.column).type

    def compute(self, batch: pa.RecordBatch) -> pa.Array:
        return pc.multiply(batch.column(self.column), 2)

CLI:

vgi-client --input data.parquet --function double_column --args '["x"]' --type scalar

Test plan

  • All 465 existing tests pass
  • New scalar function tests pass (9 test cases)
  • Ruff linting passes
  • Mypy type checking passes

🤖 Generated with Claude Code

rustyconover and others added 3 commits January 2, 2026 15:44
Add support for scalar functions - a new function type that transforms
input batches to single-column output with strict 1:1 row mapping.

Key features:
- ScalarFunctionGenerator: Generator-based base class with process()
- ScalarFunction: Callback-based API with compute() method
- Single-column output enforced at construction
- Row count validation (output must match input rows)
- Logging support via yield Message or self.log()
- No finalize phase (ends when input exhausted)

New files:
- vgi/scalar_function.py: Core scalar function classes
- vgi/examples/scalar.py: Example functions (double_column, add_columns, upper_case)
- tests/scalar/test_function.py: Comprehensive tests

Updated:
- Worker dispatch for ScalarFunctionGenerator
- Client.scalar_function() method for invoking scalar functions
- CLI --type option (auto/table/table-in-out/scalar)
- Testing utilities (ScalarFunctionTestClient, run_scalar_function, etc.)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove mentions of HAVE_MORE_OUTPUT and FINALIZE from public docstrings
- Remove dead code that handled FINALIZE signal (scalar functions don't use it)
- Simplify run() method to use while True loop (exits via generator.close())
- Clean up internal comments

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@rustyconover rustyconover force-pushed the feat-scalar-functions branch from e64e8f0 to 54db81e Compare January 2, 2026 20:54
rustyconover and others added 3 commits January 2, 2026 16:17
- Add end-to-end tests via Client subprocess (tests/scalar/test_client.py)
  - TestScalarFunctionClient: 9 tests for basic operations, error handling
  - TestScalarFunctionParallel: 4 tests for parallel processing
- Add CLI tests for --type scalar option (tests/client/test_cli.py)
  - 10 tests covering invocation, output formats, validation
- Create scalar-specific ProtocolInput without unused is_finalize field
- Update worker.py to use ScalarProtocolInput

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@rustyconover rustyconover merged commit f2a039b into main Jan 4, 2026
16 checks passed
@rustyconover rustyconover deleted the feat-scalar-functions branch January 4, 2026 04:06
rustyconover added a commit that referenced this pull request May 16, 2026
… tests

Blocker fix from senior review #1. ``TableBufferingFinalizeState``
inherited the no-op ``StreamState.on_cancel`` from the base, so the
user-facing ``TableBufferingFunction.on_cancel`` classmethod was dead
code — documented but never invoked. Streaming peers
(``TableProducerState.on_cancel``) already forwarded their cancel hooks;
the buffered finalize path was the asymmetric outlier.

Wire:
* ``TableBufferingFinalizeState.on_cancel(ctx)`` resolves func_cls +
  params via the same cold-load path as ``produce()`` (no in-process
  cache; HTTP rehydration may land on a different worker process),
  deserializes the user's last-emitted state from ``state_blob``, then
  calls ``cls.on_cancel(params, finalize_state_id, state)``.
* Pre-init cancel (state_initialized=False) short-circuits — no user
  state to forward.
* Implementation lookup failures and user-callback exceptions are
  swallowed via ``contextlib.suppress`` — we're on a teardown path and
  must not double-fault during pipeline unwind.

Tests (tests/test_table_buffering_function.py — 5 new cases):
* Happy-path: cls.on_cancel runs with deserialized state.
* state_initialized=False: no invocation.
* state_initialized=True + empty blob: cls.on_cancel runs with state=None.
* ctx.implementation=None: silent no-op (no crash).
* User on_cancel raising: silent no-op (no propagation).

The tests sit at the protocol layer rather than driving an SQL LIMIT
end-to-end, because the wire cancel is delivered asynchronously by
``VgiCancelDispatcher`` (one thread writes the cancel batch; the
subprocess worker reads it on its own schedule) and racing that against
a probe-file assertion produces flaky tests. The contract that matters
— "when the framework calls state.on_cancel, the user's classmethod
runs with deserialized state" — is fully exercised here.

A ``SlowCancellableBufferingFunction`` fixture is also registered in
the example worker, mirroring the existing ``slow_cancellable`` /
``slow_cancellable_inout`` fixtures, so a future deterministic
integration test has the building block ready when the async-cancel
delivery story stabilizes.

Side improvement: ``SumAllColumnsFunction.combine`` now emits a
``params.client_log`` line under ``logging=true``, mirroring its
process()-side log. Locks in the combine-side unary-RPC client_log
path (review #4) so adding new buffered-table fixtures can't silently
regress that.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant