feat: add scalar function support#4
Merged
Merged
Conversation
Add support for scalar functions - a new function type that transforms input batches to single-column output with strict 1:1 row mapping. Key features: - ScalarFunctionGenerator: Generator-based base class with process() - ScalarFunction: Callback-based API with compute() method - Single-column output enforced at construction - Row count validation (output must match input rows) - Logging support via yield Message or self.log() - No finalize phase (ends when input exhausted) New files: - vgi/scalar_function.py: Core scalar function classes - vgi/examples/scalar.py: Example functions (double_column, add_columns, upper_case) - tests/scalar/test_function.py: Comprehensive tests Updated: - Worker dispatch for ScalarFunctionGenerator - Client.scalar_function() method for invoking scalar functions - CLI --type option (auto/table/table-in-out/scalar) - Testing utilities (ScalarFunctionTestClient, run_scalar_function, etc.) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove mentions of HAVE_MORE_OUTPUT and FINALIZE from public docstrings - Remove dead code that handled FINALIZE signal (scalar functions don't use it) - Simplify run() method to use while True loop (exits via generator.close()) - Clean up internal comments 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
e64e8f0 to
54db81e
Compare
- Add end-to-end tests via Client subprocess (tests/scalar/test_client.py) - TestScalarFunctionClient: 9 tests for basic operations, error handling - TestScalarFunctionParallel: 4 tests for parallel processing - Add CLI tests for --type scalar option (tests/client/test_cli.py) - 10 tests covering invocation, output formats, validation - Create scalar-specific ProtocolInput without unused is_finalize field - Update worker.py to use ScalarProtocolInput 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
rustyconover
added a commit
that referenced
this pull request
May 16, 2026
… tests Blocker fix from senior review #1. ``TableBufferingFinalizeState`` inherited the no-op ``StreamState.on_cancel`` from the base, so the user-facing ``TableBufferingFunction.on_cancel`` classmethod was dead code — documented but never invoked. Streaming peers (``TableProducerState.on_cancel``) already forwarded their cancel hooks; the buffered finalize path was the asymmetric outlier. Wire: * ``TableBufferingFinalizeState.on_cancel(ctx)`` resolves func_cls + params via the same cold-load path as ``produce()`` (no in-process cache; HTTP rehydration may land on a different worker process), deserializes the user's last-emitted state from ``state_blob``, then calls ``cls.on_cancel(params, finalize_state_id, state)``. * Pre-init cancel (state_initialized=False) short-circuits — no user state to forward. * Implementation lookup failures and user-callback exceptions are swallowed via ``contextlib.suppress`` — we're on a teardown path and must not double-fault during pipeline unwind. Tests (tests/test_table_buffering_function.py — 5 new cases): * Happy-path: cls.on_cancel runs with deserialized state. * state_initialized=False: no invocation. * state_initialized=True + empty blob: cls.on_cancel runs with state=None. * ctx.implementation=None: silent no-op (no crash). * User on_cancel raising: silent no-op (no propagation). The tests sit at the protocol layer rather than driving an SQL LIMIT end-to-end, because the wire cancel is delivered asynchronously by ``VgiCancelDispatcher`` (one thread writes the cancel batch; the subprocess worker reads it on its own schedule) and racing that against a probe-file assertion produces flaky tests. The contract that matters — "when the framework calls state.on_cancel, the user's classmethod runs with deserialized state" — is fully exercised here. A ``SlowCancellableBufferingFunction`` fixture is also registered in the example worker, mirroring the existing ``slow_cancellable`` / ``slow_cancellable_inout`` fixtures, so a future deterministic integration test has the building block ready when the async-cancel delivery story stabilizes. Side improvement: ``SumAllColumnsFunction.combine`` now emits a ``params.client_log`` line under ``logging=true``, mirroring its process()-side log. Locks in the combine-side unary-RPC client_log path (review #4) so adding new buffered-table fixtures can't silently regress that. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add support for scalar functions - a new function type that transforms input batches to single-column output with strict 1:1 row mapping.
Key Features
ScalarFunctionGenerator: Generator-based withprocess()for full controlScalarFunction: Callback-based withcompute()for simpler use casesyield Messagein generator API orself.log()in callback APINew Files
vgi/scalar_function.py: Core scalar function classesvgi/examples/scalar.py: Example functions (DoubleColumnFunction,AddColumnsFunction,UpperCaseFunction)tests/scalar/test_function.py: Comprehensive testsUpdated Files
vgi/worker.py: Added dispatch forScalarFunctionGeneratorand_process_scalar_batches()methodvgi/client/client.py: Addedscalar_function()methodvgi/client/cli.py: Added--typeoption (auto/table/table-in-out/scalar)vgi/testing.py: AddedScalarFunctionTestClient,run_scalar_function(),assert_scalar_function_output()vgi/table_function.py: AddedRowCountMismatchErrorexceptionvgi/__init__.py: Exported new classesExample Usage
CLI:
vgi-client --input data.parquet --function double_column --args '["x"]' --type scalarTest plan
🤖 Generated with Claude Code