Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,16 @@ def _duration_expired():
while not result_queue.empty():
client_metrics.append(result_queue.get())
pbar.update(1)
# Fallback: detect dead children whose TERM_SIGNAL was lost
dead = sum(1 for c in clients if not c.is_alive())
if dead >= bench_args.num_clients:
lost = bench_args.num_clients - num_clients_finished
logger.warning(
f"{Color.YELLOW}All client processes are dead but "
f"{lost} TERM_SIGNALs were lost — breaking out of "
f"main loop{Color.RESET}"
)
break
continue

# Collect results (measurements)
Comment on lines 1098 to 1113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 Pre-existing bug: The asyncio.wait_for(loop.run_in_executor(None, conv_queue.get), timeout=1.0) pattern is the root cause of the lost TERM_SIGNALs that the dead-children fallback works around. When wait_for times out, it cancels the asyncio Future but cannot stop the underlying thread still blocked on queue.get(), so stale threads accumulate and silently consume queue items. Replace with loop.run_in_executor(None, lambda: conv_queue.get(timeout=1.0)) and catch queue.Empty to eliminate the race.

Extended reasoning...

The anti-pattern

The code at line 1092-1095 uses asyncio.wait_for(loop.run_in_executor(None, conv_queue.get), timeout=1.0). This is a well-known Python anti-pattern when combining asyncio.wait_for with run_in_executor on a blocking call that has no internal timeout.

How it fails

When wait_for times out after 1 second, it cancels the asyncio Future wrapper. However, the underlying thread spawned by run_in_executor is already running and blocked on conv_queue.get()concurrent.futures.Future.cancel() returns False for a running task, so the thread continues blocking indefinitely. On the next loop iteration, run_in_executor spawns a new thread, also blocking on conv_queue.get(). After N timeouts, there are N+1 threads competing for queue items.

Step-by-step proof of item loss

  1. Iteration 1: run_in_executor spawns Thread-A blocking on conv_queue.get(). After 1s, wait_for times out and cancels asyncio Future-A.
  2. Iteration 2: run_in_executor spawns Thread-B blocking on conv_queue.get(). Now Thread-A and Thread-B both wait on the queue.
  3. A client puts (TERM_SIGNAL, TERM_SIGNAL) on conv_queue.
  4. Thread-A wins the race and calls conv_queue.get(), consuming the item. It sets the result on its concurrent.futures.Future.
  5. The bridge callback (_chain_future / _copy_future_state in CPython asyncio) checks the destination asyncio Future-A — it was already cancelled in step 1. The callback calls if dest.cancelled(): return, silently discarding the result.
  6. The TERM_SIGNAL is consumed from the queue but never seen by the main loop. num_clients_finished is never incremented for that client.

This exactly explains the PR description observation: all 32 clients logged "is done" (printed after conv_queue.put((TERM_SIGNAL, TERM_SIGNAL)) at line 818-822), but only 24 TERM_SIGNALs were received by the main loop — the other 8 were consumed by stale executor threads.

Impact

Without the dead-children fallback added by this PR, the main loop would hang forever waiting for TERM_SIGNALs that were already consumed and discarded. The fallback correctly works around the symptom, but the root cause remains: queue items can be silently lost, and stale threads accumulate over the benchmark lifetime.

Fix

Replace the wait_for + run_in_executor pattern with a thread-level timeout:

try:
    conv_id, messages = await loop.run_in_executor(
        None, lambda: conv_queue.get(timeout=1.0)
    )
except queue.Empty:
    # handle timeout
    continue

This way the thread itself times out cleanly via queue.get(timeout=1.0), no items are silently consumed, and no stale threads accumulate. The dead-children fallback could then be kept as a safety net rather than being required for correctness.

Expand Down Expand Up @@ -2011,3 +2021,7 @@ async def main() -> None:

if __name__ == "__main__":
asyncio.run(main())
# Force exit to avoid hanging on multiprocessing Queue feeder threads
# during interpreter shutdown (the queues may still have thousands of
# undelivered items whose feeder threads deadlock on GC).
os._exit(0)
Comment on lines +2024 to +2027
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 os._exit(0) bypasses Python buffer flushing, which can silently discard the benchmark statistics output when stdout is piped (e.g., python benchmark.py | tee output.log). Add sys.stdout.flush() and sys.stderr.flush() before the os._exit(0) call.

Extended reasoning...

Analysis

os._exit(0) calls the C-level _exit() which does not flush Python stdio buffers and does not call Python atexit handlers. This is intentional for avoiding the Queue feeder thread deadlock, but it introduces a data loss risk for buffered output.

Code path

The process_statistics() function (lines 1331-1387) outputs the benchmark statistics summary via multiple print() calls to stdout. This is the primary human-readable output of the benchmark tool. After main() returns, the script immediately calls os._exit(0) at line 2027.

Why this manifests in practice

When stdout is connected to a terminal, Python uses line buffering, so each print() (which ends with a newline) is flushed immediately. However, when stdout is piped — e.g., python benchmark.py ... | tee output.log or python benchmark.py ... > output.log — Python switches to block buffering with an ~8KB buffer. The statistics summary from process_statistics() is relatively small and may fit entirely within the block buffer. If the buffer has not been flushed by the OS by the time os._exit(0) runs, all of that output is silently discarded. This is a very common usage pattern for benchmark scripts, where users typically redirect output to a log file.

Step-by-step proof

  1. User runs: python benchmark_serving_multi_turn.py ... | tee benchmark_results.log
  2. Python detects stdout is a pipe and enables block buffering (~8KB buffer)
  3. asyncio.run(main()) executes the benchmark, calling process_statistics() which issues ~20-30 print() calls with the statistics summary
  4. These print() calls write to Python's BufferedWriter for stdout; since the total output is under 8KB, it may remain in the buffer
  5. os._exit(0) is called, which invokes C _exit() — Python's BufferedWriter.flush() is never called
  6. The benchmark statistics (the most important output of the tool) are silently lost from the log file

Fix

The fix is trivial — add explicit flushes before the os._exit(0) call:

if __name__ == "__main__":
    asyncio.run(main())
    sys.stdout.flush()
    sys.stderr.flush()
    os._exit(0)

This preserves the deadlock-avoidance benefit of os._exit(0) while ensuring all output is written.