Skip to content

Commit e64e8f0

Browse files
rustyconoverclaude
andcommitted
refactor: clean up scalar function docstrings and remove dead code
- Remove mentions of HAVE_MORE_OUTPUT and FINALIZE from public docstrings - Remove dead code that handled FINALIZE signal (scalar functions don't use it) - Simplify run() method to use while True loop (exits via generator.close()) - Clean up internal comments 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a952fb4 commit e64e8f0

3 files changed

Lines changed: 9 additions & 18 deletions

File tree

duckdb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit e797eded1bdf44f6c64a749bc57049f59969c6cd

vgi/client/client.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,16 +1818,14 @@ def scalar_function(
18181818
"""Invoke a scalar function on the worker and stream results.
18191819
18201820
Scalar functions transform input batches to single-column output with
1821-
1:1 row mapping. Unlike table_in_out_function, scalar functions have
1822-
no finalize phase - processing ends when input is exhausted.
1821+
1:1 row mapping. Processing ends when input is exhausted.
18231822
18241823
Processing flow:
18251824
1. Reads the first input batch to determine the input schema
18261825
2. Sends Invocation to worker and receives bind result
18271826
3. Spawns additional workers if max_processes > 1
18281827
4. Distributes input batches to workers (round-robin for parallel mode)
1829-
5. Collects output batches, handling HAVE_MORE_OUTPUT for log messages
1830-
6. Returns when input is exhausted (no FINALIZE signal)
1828+
5. Collects and yields output batches
18311829
18321830
For parallel processing (max_processes > 1), input batches are distributed
18331831
round-robin across workers using dedicated threads. Output order may not
@@ -1915,7 +1913,6 @@ def _scalar_function_parallel(
19151913
) -> Generator[pa.RecordBatch, None, None]:
19161914
"""Process scalar function batches across one or more workers using threads.
19171915
1918-
Similar to _table_in_out_function_parallel but without finalization.
19191916
Handles both single-worker and multi-worker cases uniformly.
19201917
19211918
Processing flow:
@@ -1925,7 +1922,7 @@ def _scalar_function_parallel(
19251922
4. Signals end-of-input to all workers via None sentinel
19261923
5. Collects all output batches from shared output queue
19271924
6. Waits for worker threads to complete
1928-
7. Closes all workers (no finalize phase)
1925+
7. Closes all workers
19291926
19301927
Args:
19311928
input_batch: The first input batch, already consumed from the
@@ -1937,8 +1934,7 @@ def _scalar_function_parallel(
19371934
19381935
Yields:
19391936
Output RecordBatches from processing, in non-deterministic order for
1940-
multi-worker mode. When multiple batches are returned for a single
1941-
input (HAVE_MORE_OUTPUT for logs), they are combined into one batch.
1937+
multi-worker mode.
19421938
19431939
Raises:
19441940
ClientError: If a worker thread fails with an exception.
@@ -2020,7 +2016,6 @@ def _scalar_function_parallel(
20202016
self._join_threads(threads)
20212017
log.debug("all_scalar_worker_threads_complete")
20222018

2023-
# Close all workers (no finalize for scalar functions)
20242019
# Close data writers to signal EOF to workers
20252020
for worker in all_workers:
20262021
if worker.data_writer is not None:

vgi/scalar_function.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,12 @@ def run(self) -> Generator[ProtocolOutput, ProtocolInput | None, None]:
278278
"""Run the scalar function protocol. Do not override.
279279
280280
This generator implements the SETUP -> DATA -> TEARDOWN lifecycle.
281-
No FINALIZE phase for scalar functions.
281+
The generator is closed by the caller when input is exhausted.
282282
283283
Protocol:
284284
- Caller primes with next() or send(None)
285285
- Caller sends ProtocolInput for each batch
286-
- When input exhausted, generator closes (no FINALIZE signal needed)
286+
- When input exhausted, caller closes the generator
287287
"""
288288
# Priming yield - caller calls next() or send(None)
289289
input: ProtocolInput | None = yield ProtocolOutput(
@@ -300,8 +300,8 @@ def run(self) -> Generator[ProtocolOutput, ProtocolInput | None, None]:
300300
generator.send(None)
301301

302302
try:
303-
# DATA phase - no FINALIZE for scalar functions
304-
while not input.is_finalize:
303+
# DATA phase - process batches until generator is closed
304+
while True:
305305
result = self._process_with_exception_handling(generator, input.batch)
306306

307307
# Determine status based on result
@@ -320,11 +320,6 @@ def run(self) -> Generator[ProtocolOutput, ProtocolInput | None, None]:
320320
raise ValueError("Expected ProtocolInput, got None")
321321
if self._should_terminate(result):
322322
return
323-
324-
# When FINALIZE signal comes, just emit FINISHED (no finalize phase)
325-
yield ProtocolOutput(
326-
batch=self.empty_output_batch, status=_OutputStatus.FINISHED
327-
)
328323
finally:
329324
generator.close()
330325
# Release resources after processing completes

0 commit comments

Comments
 (0)