Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,6 @@ docs/superpowers/

# User-specific local dev configs; do not commit
CLAUDE.local.md

# Generated dataset cache (created by Dataset.get_dataloader())
dataset_cache/
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import argparse
import asyncio
import logging
from contextlib import AbstractContextManager, nullcontext
from pathlib import Path

Expand Down Expand Up @@ -91,10 +92,16 @@ async def main() -> None:

# Using ternary operator causes errors in MyPy object type coalescing
# (coalesces to 'object' not 'AbstractContextManager[TokenizePool | None]')
pool_cm: AbstractContextManager[TokenizePool | None]
if args.tokenizer:
pool_cm: AbstractContextManager[TokenizePool | None] = TokenizePool(
args.tokenizer, n_workers=args.tokenizer_workers
)
try:
pool_cm = TokenizePool(args.tokenizer, n_workers=args.tokenizer_workers)
except Exception as e:
logging.warning(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] low · error-handling

Tokenizer load failure is silently downgraded to "no token metrics":

except Exception as e:
    logging.warning(
        f"Failed to load tokenizer '{args.tokenizer}': {e}. "
        "ISL/OSL/TPOT token metrics will be unavailable."
    )
    pool_cm = nullcontext()

A user who explicitly passes --tokenizer X and thus expects token metrics will instead see a single WARNING line buried in the log, then a silent benchmark run with empty ISL/OSL/TPOT columns. By the time they check the report, the run is over.

Fix: re-raise unless an explicit --allow-tokenizer-fallback flag is set, OR at least log at ERROR level and surface the missing-tokenizer state in the final report. Also use the module logger (logger = logging.getLogger(__name__)) rather than the root logging module.

f"Failed to load tokenizer '{args.tokenizer}': {e}. "
"ISL/OSL/TPOT token metrics will be unavailable."
)
pool_cm = nullcontext()
else:
pool_cm = nullcontext()

Expand Down
50 changes: 49 additions & 1 deletion src/inference_endpoint/commands/benchmark/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import json
import logging
import platform
import random
import shutil
import signal
import tempfile
import uuid
from dataclasses import dataclass, field
from dataclasses import replace as dataclass_replace
from datetime import datetime
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -70,7 +72,7 @@
TestType,
)
from inference_endpoint.core.types import QueryResult
from inference_endpoint.dataset_manager.dataset import Dataset
from inference_endpoint.dataset_manager.dataset import Dataset, SaltedDataset
from inference_endpoint.dataset_manager.factory import DataLoaderFactory
from inference_endpoint.endpoint_client.cpu_affinity import AffinityPlan, pin_loadgen
from inference_endpoint.endpoint_client.http_client import HTTPEndpointClient
Expand Down Expand Up @@ -347,6 +349,33 @@ def _build_phases(ctx: BenchmarkContext) -> list[PhaseConfig]:
"""Build the phase list from BenchmarkContext."""
phases: list[PhaseConfig] = []

# Warmup phase (optional, before performance)
warmup_cfg = ctx.config.settings.warmup
if warmup_cfg.enabled:
warmup_dataset: Dataset = (
SaltedDataset(ctx.dataloader) if warmup_cfg.salt else ctx.dataloader
)
warmup_rt = dataclass_replace(
ctx.rt_settings,
min_duration_ms=0,
max_duration_ms=None,
n_samples_from_dataset=ctx.dataloader.num_samples(),
n_samples_to_issue=warmup_cfg.n_requests,
min_sample_count=1,
rng_sched=random.Random(warmup_cfg.warmup_random_seed),
rng_sample_index=random.Random(warmup_cfg.warmup_random_seed + 1),
load_pattern=LoadPattern(type=LoadPatternType.MAX_THROUGHPUT),
)
phases.append(
PhaseConfig(
"warmup",
warmup_rt,
warmup_dataset,
PhaseType.WARMUP,
drain_after=warmup_cfg.drain,
)
)

# Performance phase
phases.append(
PhaseConfig(
Expand Down Expand Up @@ -525,12 +554,31 @@ async def _run_benchmark_async(
phases = _build_phases(ctx)
report: Report | None = None

# Global wall-clock timeout covers warmup + performance + accuracy phases
# combined, and bounds the warmup drain so a dropped request can't hang forever.
global_timeout_handle = None
max_duration_ms = ctx.rt_settings.max_duration_ms
if max_duration_ms is not None:

def _on_global_timeout() -> None:
logger.warning(
"Global experiment timeout reached (%d ms); stopping session.",
max_duration_ms,
)
session.stop()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] low · concurrency

The global timer's callback calls session.stop() directly, with no guard against firing after cleanup has begun:

def _on_global_timeout() -> None:
    logger.warning(
        "Global experiment timeout reached (%d ms); stopping session.",
        max_duration_ms,
    )
    session.stop()

global_timeout_handle.cancel() only runs after entering the finally: block at line 580, so the timer can fire during the gap between await session.run(phases) returning and the cancel() call running — or worse, during the slow cleanup that follows (http_client.shutdown_async(), publisher close, etc.). When that happens, session.stop() flips _stop_requested=True and _drain_event.set() on a session that has already finished. The downstream effect is mostly harmless today (the strategy task is None), but the race is real and easy to make harmful with future refactors.

Suggested fix:

def _on_global_timeout() -> None:
    if session._done:  # or expose a public flag
        return
    logger.warning(...)
    session.stop()

Also consider moving global_timeout_handle.cancel() to immediately after session.run(phases) returns (success path), and only run it in finally to handle the exception path.


global_timeout_handle = loop.call_later(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Both (Codex P1 + Claude)] high · api-contract

The new global timer redefines what max_duration_ms means, but only when warmup is enabled, and the schema description was not updated.

max_duration_ms = ctx.rt_settings.max_duration_ms
if max_duration_ms is not None:
    ...
    global_timeout_handle = loop.call_later(
        max_duration_ms / 1000.0, _on_global_timeout
    )

session.py:_make_stop_check still enforces max_duration_ms per-phase against phase_start_ns, so we now have two clocks running on the same configured value:

  • per-phase, from each phase's start (the original semantic — and still what _build_phases clears to None for warmup/accuracy).
  • global, from loop.call_later scheduling time (warmup + perf + accuracy combined).

For a user with warmup.enabled=True and max_duration_ms=600_000 (10 min), if warmup takes 90s, the perf phase's effective wall-clock budget collapses from 10 min to ~8.5 min — silently. Worse, an accuracy pass in TestMode.BOTH (which sets max_duration_ms=None per-phase, expecting unbounded) can now be truncated mid-run by the global timer.

The schema description still reads "Maximum test duration in ms (0 for no limit)", which is the original per-phase meaning. Existing YAML configs will get shorter perf/accuracy phases once they enable warmup, with no documentation indicating why.

Suggested fixes (any one):

  1. Schedule the timer at the start of the performance phase (e.g., on START_PERFORMANCE_TRACKING) so warmup time doesn't eat the perf budget — preserves the per-phase semantic.
  2. Expose a separate warmup.max_duration_ms to bound only warmup, leave perf-phase max_duration_ms untouched.
  3. If a global cap is genuinely the goal, rename it (experiment_timeout_ms?), keep max_duration_ms per-phase, and document the change in WarmupConfig and release notes.

max_duration_ms / 1000.0, _on_global_timeout
)

loop.add_signal_handler(signal.SIGINT, session.stop)
try:
result = await session.run(phases)
except Exception as e:
raise ExecutionError(f"Benchmark execution failed: {e}") from e
finally:
if global_timeout_handle is not None:
global_timeout_handle.cancel()
loop.remove_signal_handler(signal.SIGINT)
logger.info("Cleaning up...")
try:
Expand Down
25 changes: 25 additions & 0 deletions src/inference_endpoint/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,30 @@ def _validate_completeness(self) -> Self:
return self


class WarmupConfig(BaseModel):
"""Warmup phase configuration. Runs before the performance phase; results are not recorded."""

model_config = ConfigDict(extra="forbid", frozen=True)

enabled: bool = Field(
False, description="Enable warmup phase before performance run"
)
n_requests: int | None = Field(
None, gt=0, description="Warmup request count (None = full dataset once)"
)
salt: bool = Field(
False, description="Prepend a unique random hex salt to each warmup prompt"
)
drain: bool = Field(
False,
description="Drain in-flight warmup requests before starting the performance phase",
)
warmup_random_seed: int = Field(
42,
description="RNG seed for warmup scheduling and sample ordering",
)


@cyclopts.Parameter(name="*")
class Settings(BaseModel):
"""Test settings."""
Expand All @@ -401,6 +425,7 @@ class Settings(BaseModel):
runtime: RuntimeConfig = Field(default_factory=RuntimeConfig)
load_pattern: LoadPattern = Field(default_factory=LoadPattern)
client: HTTPClientConfig = Field(default_factory=HTTPClientConfig)
warmup: WarmupConfig = Field(default_factory=WarmupConfig)


class OfflineSettings(Settings):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ settings:
max_idle_time: 4.0 # Discard connections idle longer than this (seconds)
min_required_connections: -1 # Min connections to initialize (-1=auto, 0=disabled)
worker_gc_mode: relaxed # Worker GC strategy | options: disabled, relaxed, system
warmup:
enabled: false # Enable warmup phase before performance run
n_requests: null # Warmup request count (None = full dataset once)
salt: false # Prepend a unique random hex salt to each warmup prompt
drain: false # Drain in-flight warmup requests before starting the performance phase
warmup_random_seed: 42 # RNG seed for warmup scheduling and sample ordering
endpoint_config:
endpoints: # Endpoint URL(s). Must include scheme, e.g. 'http://host:port'.
- http://localhost:8000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ settings:
max_idle_time: 4.0 # Discard connections idle longer than this (seconds)
min_required_connections: -1 # Min connections to initialize (-1=auto, 0=disabled)
worker_gc_mode: relaxed # Worker GC strategy | options: disabled, relaxed, system
warmup:
enabled: false # Enable warmup phase before performance run
n_requests: null # Warmup request count (None = full dataset once)
salt: false # Prepend a unique random hex salt to each warmup prompt
drain: false # Drain in-flight warmup requests before starting the performance phase
warmup_random_seed: 42 # RNG seed for warmup scheduling and sample ordering
endpoint_config:
endpoints: # Endpoint URL(s). Must include scheme, e.g. 'http://host:port'.
- http://localhost:8000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ settings:
max_idle_time: 4.0 # Discard connections idle longer than this (seconds)
min_required_connections: -1 # Min connections to initialize (-1=auto, 0=disabled)
worker_gc_mode: relaxed # Worker GC strategy | options: disabled, relaxed, system
warmup:
enabled: false # Enable warmup phase before performance run
n_requests: null # Warmup request count (None = full dataset once)
salt: false # Prepend a unique random hex salt to each warmup prompt
drain: false # Drain in-flight warmup requests before starting the performance phase
warmup_random_seed: 42 # RNG seed for warmup scheduling and sample ordering
endpoint_config:
endpoints: # Endpoint URL(s). Must include scheme, e.g. 'http://host:port'.
- http://localhost:8000
Expand Down
73 changes: 71 additions & 2 deletions src/inference_endpoint/dataset_manager/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@ class Dataset:
def __init_subclass__(
cls,
dataset_id: str | None = None,
register: bool = True,
**kwargs,
):
super().__init_subclass__(**kwargs)

if not inspect.isabstract(cls):
if register and not inspect.isabstract(cls):
if dataset_id is None:
dataset_id = cls.__name__
cls.DATASET_ID = dataset_id
Expand Down Expand Up @@ -411,7 +412,7 @@ def num_samples(self) -> int:
@classmethod
def get_dataloader(
cls,
datasets_dir: Path = Path("datasets"),
datasets_dir: Path = Path("dataset_cache"),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] high · api-contract

Default datasets_dir changed from Path("datasets") to Path("dataset_cache"):

datasets_dir: Path = Path("dataset_cache"),

This is a silent breaking change. Any existing user who relied on the previous default will, after upgrade, fail to find the cached dataframe under datasets/<DATASET_ID>/<variant> and will silently re-download/re-generate (potentially many GB and minutes of work). The .gitignore change only ignores dataset_cache/, leaving any pre-existing datasets/ cache uncovered.

Suggest: keep Path("datasets") as the default, OR add a fallback that reads from datasets/ if dataset_cache/ is empty, OR document the migration prominently. At minimum, decouple this rename from the warmup PR — it's an orthogonal change.

num_repeats: int = 1,
transforms: list[Transform] | None = None,
force_regenerate: bool = False,
Expand All @@ -431,6 +432,74 @@ def get_dataloader(
return cls(df, transforms=transforms, repeats=num_repeats)


class SaltedDataset(Dataset, register=False):
"""Wraps a loaded Dataset, prepending a unique random salt to each prompt on load_sample().

Each call to load_sample() generates a fresh salt, so reused samples (when
n_requests > dataset size) each receive a distinct salt.
"""

def __init__(self, inner: Dataset) -> None:
# Skip Dataset.__init__ — all state is delegated to inner
self._inner = inner
self.dataframe = None
self.transforms = None
self.repeats = inner.repeats
self.logger = getLogger(__name__)
Comment on lines +442 to +448
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

SaltedDataset inherits from Dataset but its __init__ method signature is incompatible with the base class. While it works as a wrapper for an existing instance in the current implementation, it breaks the contract for any code that expects to instantiate a Dataset (or subclass) using the standard signature (e.g., df, transforms, repeats).

Specifically, calling the inherited @classmethod get_dataloader on SaltedDataset will fail with a TypeError because it attempts to call the constructor with arguments that SaltedDataset.__init__ does not accept. Additionally, setting self.dataframe and self.transforms to None might break other inherited methods that expect these attributes to be populated.


@property # type: ignore[override]
def data(self) -> list[Any] | None:
return self._inner.data

@data.setter
def data(self, value: list[Any] | None) -> None:
self._inner.data = value

def load(
self,
adapter: "HttpRequestAdapter | None" = None,
api_type: APIType | None = None,
model_params: ModelParams | None = None,
force: bool = False,
) -> None:
pass # Inner dataset already loaded

def load_sample(self, index: int) -> Any:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] high · bug

SaltedDataset.load_sample() only mutates data["prompt"], but several adapters bypass the prompt entirely. In src/inference_endpoint/sglang/adapter.py:90:

input_tokens = query.data["input_tokens"]
return cls._request_encoder.encode(
    SGLangGenerateRequest(input_ids=input_tokens, ...))

When salt=True is used with a tokenized dataset (any pipeline that pre-tokenizes — common in MLPerf submissions), the salt prefix is added to the (unused) text but input_tokens are unchanged, so the server receives the same token-ID sequence for every warmup request. KV-cache reuse is NOT prevented. The class docstring's claim that "each call to load_sample() generates a fresh salt, so reused samples … each receive a distinct salt" is silently violated.

Fix options: (a) detect input_tokens/token_ids keys and either drop them or re-tokenize after salting; (b) raise an error at warmup setup if salt=True and the inner dataset has tokenized fields; (c) explicitly document the restriction.

data = self._inner.load_sample(index)
if not isinstance(data, dict):
return data
if "input_tokens" in data and "prompt" not in data:
self.logger.warning(
"SaltedDataset: sample has 'input_tokens' but no 'prompt' — "
"salt cannot be applied to pre-tokenized input; KV-cache reuse may not be prevented"
)
return data
if "prompt" not in data:
return data
prompt = data["prompt"]
salt = os.urandom(8).hex()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] low · design

salt = os.urandom(8).hex() on every load_sample() ignores the seeded rng_sample_index in RuntimeSettings. Reproducibility-conscious users who set dataloader_random_seed to compare runs will see different salted prompts on each invocation. Warmup metrics aren't reported, but the actual prompts sent to the server (and thus server-side caches/stats) are nondeterministic.

Fix: thread the rng_sample_index (or a derived random.Random) through SaltedDataset.__init__ and use f'{rng.getrandbits(64):016x}' to keep determinism with a non-zero seed.

if isinstance(prompt, str):
return {**data, "prompt": f"[{salt}] {prompt}"}
if isinstance(prompt, list) and prompt:
# Find the first text part at any index (image-first prompts place text at index 1+)
for i, part in enumerate(prompt):
if isinstance(part, dict) and part.get("type") == "text":
salted_parts = [
*prompt[:i],
{**part, "text": f"[{salt}] {part['text']}"},
*prompt[i + 1 :],
]
return {**data, "prompt": salted_parts}
self.logger.warning(
"SaltedDataset: multimodal prompt has no text part — "
"salt cannot be applied; KV-cache reuse may not be prevented"
)
return data # unsupported prompt type — skip salting
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] medium · bug

When the prompt is a multimodal list where ALL parts are non-text (e.g., all image_url), the loop finds no text part and falls through to return data with no warning:

if isinstance(prompt, list) and prompt:
    for i, part in enumerate(prompt):
        if isinstance(part, dict) and part.get("type") == "text":
            ...
            return {**data, "prompt": salted_parts}
return data  # unsupported prompt type — skip salting (silent!)

Unlike the input_tokens case (which emits a logger.warning), this fallback is completely silent. Users who enable salt=True for image-only multimodal datasets will not see any indication that KV-cache bypass is not functioning. Add a logger.warning consistent with the input_tokens path.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] medium · bug

When the prompt is a multimodal list where ALL parts are non-text (e.g., all image_url), the loop finds no text part and falls through to return data without salting and without any warning:

if isinstance(prompt, list) and prompt:
    for i, part in enumerate(prompt):
        if isinstance(part, dict) and part.get("type") == "text":
            ...
            return {**data, "prompt": salted_parts}
return data  # unsupported prompt type — skip salting (silent!)

Unlike the input_tokens case (which emits logger.warning), this fallback is completely silent. Users who set salt=True for image-only datasets will see no indication that KV-cache bypass is not functioning. Add a logger.warning here, consistent with the input_tokens path.


def num_samples(self) -> int:
return self._inner.num_samples()


class EmptyDataset(Dataset):
"""Empty dataset to be used as performance dataset when running only accuracy tests."""

Expand Down
26 changes: 11 additions & 15 deletions src/inference_endpoint/load_generator/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import asyncio
import logging
import os
import time
import uuid
from collections.abc import Callable
Expand All @@ -44,8 +43,6 @@

logger = logging.getLogger(__name__)

_WARMUP_ENABLED = os.environ.get("ENABLE_WARMUP") == "1"


# ---------------------------------------------------------------------------
# Phase configuration
Expand All @@ -68,6 +65,7 @@ class PhaseConfig:
runtime_settings: RuntimeSettings
dataset: Dataset
phase_type: PhaseType = PhaseType.PERFORMANCE
drain_after: bool = True
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] low · api-contract

drain_after: bool = True was added to PhaseConfig with default True, but for PhaseType.WARMUP the previous behavior was effectively drain_after=False (the old _run_phase skipped drain for warmup unconditionally). Existing callers that construct PhaseConfig(..., PhaseType.WARMUP) directly (e.g. via the Python API or in tests) will now get the opposite drain semantics from before unless they pass drain_after=False explicitly. The PR's _build_phases correctly passes warmup_cfg.drain through, but third-party code paths get a silent behavior flip.

Fix: either default drain_after based on phase_type via a __post_init__ (warmup → False, others → True), OR make drain_after a required keyword for warmup phases.



# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -242,6 +240,7 @@ def __init__(
self._stop_requested = False
self._done = False
self._current_phase_issuer: PhaseIssuer | None = None
self._current_phase_type: PhaseType | None = None
self._current_strategy: LoadStrategy | None = None
self._recv_task: asyncio.Task | None = None
self._strategy_task: asyncio.Task | None = None
Expand Down Expand Up @@ -274,12 +273,6 @@ async def run(self, phases: list[PhaseConfig]) -> SessionResult:
for phase in phases:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] medium · api-contract

The _WARMUP_ENABLED = os.environ.get("ENABLE_WARMUP") == "1" gate (and the ENABLE_WARMUP=1 to enable log message) was removed. Anyone who scripted ENABLE_WARMUP=1 inference-endpoint ... to opt in to warmup will now silently see no behavior change — the env var is ignored, and warmup runs only when the YAML enables it.

Suggested fix: log a one-time warning at session start if ENABLE_WARMUP is set in the environment but the YAML doesn't enable warmup; OR document the rename in release notes. (Either way, the silent no-op is the worst case.)

if self._stop_requested:
break
if phase.phase_type == PhaseType.WARMUP and not _WARMUP_ENABLED:
logger.info(
"Skipping warmup phase %s (set ENABLE_WARMUP=1 to enable)",
phase.name,
)
continue
result = await self._run_phase(phase)
if result is not None:
phase_results.append(result)
Expand Down Expand Up @@ -318,6 +311,7 @@ async def _run_phase(self, phase: PhaseConfig) -> PhaseResult | None:
)

self._current_phase_issuer = phase_issuer
self._current_phase_type = phase.phase_type
self._current_strategy = strategy

# Performance phases get tracking events
Expand All @@ -333,8 +327,7 @@ async def _run_phase(self, phase: PhaseConfig) -> PhaseResult | None:
finally:
self._strategy_task = None

# Drain in-flight (skip for warmup — keep concurrency hot)
if phase.phase_type != PhaseType.WARMUP:
if phase.drain_after:
await self._drain_inflight(phase_issuer)

if phase.phase_type == PhaseType.PERFORMANCE:
Expand Down Expand Up @@ -363,9 +356,9 @@ async def _run_phase(self, phase: PhaseConfig) -> PhaseResult | None:
async def _drain_inflight(self, phase_issuer: PhaseIssuer) -> None:
"""Wait for all in-flight responses from this phase to complete.

Currently, there is no timeout for the drain step. In the future,
we can possibly add a dynamic timeout based on the rate of completion
throughout the current phase."""
Bounded by the global experiment timeout: if the caller schedules a
loop.call_later that calls stop(), stop() sets _drain_event, unblocking
this wait without leaving it hung indefinitely."""
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] medium · error-handling

The new docstring on _drain_inflight claims a safety guarantee that only holds in some configurations:

async def _drain_inflight(self, phase_issuer: PhaseIssuer) -> None:
    """Wait for all in-flight responses from this phase to complete.

    Bounded by the global experiment timeout: if the caller schedules a
    loop.call_later that calls stop(), stop() sets _drain_event, unblocking
    this wait without leaving it hung indefinitely."""
    ...
    self._drain_event.clear()
    await self._drain_event.wait()

The loop.call_later is only scheduled in execute.py:561 if ctx.rt_settings.max_duration_ms is not None. runtime_settings.from_config maps max_duration_ms == 0 to None, and the schema default is 0 — so the default offline/online CLI run has no global timer, and _drain_inflight retains its old unbounded await self._drain_event.wait(). A single dropped or hanging request will hang the benchmark forever.

The docstring suggests this case is solved; it isn't.

Suggested fixes:

  • Make the docstring conditional ("if a global timeout is configured…") and add a note that the drain is otherwise unbounded, OR
  • Add an intrinsic drain timeout (a fixed cap like 30–60s, or a multiple of observed completion latency) inside _drain_inflight itself so the safety property is a property of the function, not a contract on the caller.

if phase_issuer.inflight <= 0 or self._stop_requested:
return
logger.info("Draining %d in-flight responses...", phase_issuer.inflight)
Expand Down Expand Up @@ -425,7 +418,10 @@ def _handle_response(self, resp: QueryResult | StreamChunk) -> None:
self._drain_event.set()
if self._current_strategy:
self._current_strategy.on_query_complete(query_id)
if self._on_sample_complete:
if (
self._on_sample_complete
and self._current_phase_type != PhaseType.WARMUP
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] low · data-integrity

The _on_sample_complete callback is filtered for warmup, but the SampleEventType.COMPLETE event publish at line 396 (just above) is NOT filtered:

self._publisher.publish(EventRecord(
    event_type=SampleEventType.COMPLETE, ..., sample_uuid=query_id, ...
))

When drain_after=False (warmup default), late-arriving warmup QueryResults race the start of the perf phase. MetricsTable.update_sample discards events for unknown sample IDs so this doesn't corrupt counters today — but the events still travel over ZMQ, are decoded, and (if event_logger is enabled) get persisted to the JSONL/SQLite event log. Downstream tooling reading the raw event log will see warmup-named samples interleaved with perf samples without any phase tag.

Fix: tag warmup queries with a phase marker on the EventRecord (or the Query) so the event logger can filter or annotate, OR add the _current_phase_type != PhaseType.WARMUP check around the publish at line 396 as well.

):
self._on_sample_complete(resp)

elif isinstance(resp, StreamChunk):
Expand Down
Loading
Loading