Feat/tool sequences#285
Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive multi-turn conversation benchmarking framework, including a new MultiTurnScheduler, ConversationManager, and MultiTurnDataset. These additions enable benchmarking of conversational AI workloads with turn sequencing, conversation history management, and optional concurrency control. My review identified potential issues regarding the usage of sentinel objects in the scheduler and the robustness of the timeout logic in the conversation manager.
| f"Turn {turn} of {conv_id} timed out waiting for prev turn" | ||
| ) | ||
| break # Skip remaining turns to avoid cascade timeouts | ||
| ready.put((idx, 0)) |
There was a problem hiding this comment.
The _PIPELINE_DONE sentinel is pushed to the queue, but the consumption logic in the finally block or the iterator loop does not explicitly handle it as a termination signal for individual threads, relying instead on the queue being drained. Ensure this is the intended design to avoid potential race conditions.
arekay-nv
left a comment
There was a problem hiding this comment.
Partial review - will complete later.
| - Plain chat: `user → assistant → user → ...` | ||
| - Agentic: `user → assistant (with tool_calls) → tool → [tool | assistant (with tool_calls)]* → assistant → user → ...` | ||
| 2. First turn must be "user" role | ||
| 3. Turn numbers must be sequential (1, 2, 3, ...) |
There was a problem hiding this comment.
Should all the turns be ordered and consecutive in the dataset? For instance if i have turn 1 of all conversations followed by turn 2 of all conversations - is that a valid format. Would it not be easier to have a conversation as a top level object which contains a list of messages and associated metadata inside?
The challenge here would be reading in all the conversations to collect all the turns - unless we enforce some constraints.
There was a problem hiding this comment.
Yes, all turns are now ordered and consecutive.
| type: performance | ||
| # Replace with the path where you saved the converted flat-row JSONL. | ||
| # Run: python scripts/convert_agentic_snapshot.py <input.jsonl> <output_flat.jsonl> --verify | ||
| path: /model/agentic_coding_flat.jsonl |
There was a problem hiding this comment.
Ideally, these should be relative and follow the readme example. So if you follow the readme example, this config should work out of the box.
- Remove sequential conversation mode (redundant with target_concurrency=1) - Remove `enabled` field from MultiTurnConfig; presence of multi_turn: block implies enabled - Add conversation grouping validation to MultiTurnDataset (raises InputValidationError if rows for a conversation_id are not consecutive) - Update YAML example configs: model placeholder, relative dataset paths, removed redundant metrics.collect - Move MULTI_TURN_QUICKSTART.md to docs/ - Update all documentation to remove sequential mode references Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…wer comments - Remove dead constant BLOCK_ON_PREVIOUS_TURN = -1 from scheduler.py - Remove redundant outer with state.condition: in mark_turn_complete - Remove ConversationMode import and explicit mode= args from integration tests - Fix format: jsonl → format: ".jsonl" in example YAMLs and docs - Add target_concurrency: 1 clarification to quickstart (preserves turn ordering) - Remove broken HYBRID_SCHEDULER_GUIDE.md reference from quickstart Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
d8dfd64 to
b127845
Compare
Add MultiTurnDataset, MultiTurnConfig schema, tool-calling types, Query.metadata transport field, adapter tools= kwarg, and multi-turn factory routing.
Add per-conversation asyncio.Event sequencing (ConversationManager), async turn pipeline (MultiTurnStrategy), and benchmark execution wiring (execute.py, session.py PhaseIssuer data_override).
Add unit tests for MultiTurnDataset, ConversationManager, and MultiTurnStrategy; add integration tests including tool-use scenarios and large-concurrency stress tests.
Consolidate multi-turn dataset with single-turn transform pipeline, fix prior-row extraction, live-history mode, system prompt injection, tool_calls preservation, and asyncio.Event-based sequencing.
Add MULTI_TURN_QUICKSTART.md, examples/09_MultiTurn/ configs and sample data, scripts/convert_agentic_snapshot.py, and README clarifications including conversion script output destination.
…ring Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…efault path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
b127845 to
0a7ad37
Compare
|
arekay-nv
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by: Claude (Codex review failed with a CLI config error — invalid 'features' requirement 'browser_use' from cloud requirements) | Depth: thorough
Found 15 issues (0 critical, 2 high, 5 medium, 8 low). 14 posted as inline comments, 1 in summary table only (line outside diff hunk).
| id=result_id or response.id, | ||
| response_output=TextModelOutput(output=response.choices[0].message.content), | ||
| response_output=TextModelOutput(output=choice.message.content or ""), | ||
| metadata=metadata if metadata else None, |
There was a problem hiding this comment.
🟠 HIGH · bug · [Claude]
from_endpoint_response violates the QueryResult.metadata type contract by passing metadata=None when no tool_calls/finish_reason/reasoning_content are present. QueryResult.metadata is typed dict[str, Any] with default_factory=dict. The constructed result is then passed to worker._handle_non_streaming_body which calls result.with_metadata(req.query_metadata). with_metadata short-circuits on empty additional_metadata (current callers pass {}), but as soon as any caller routes a non-empty Query.metadata (e.g., the conversation_id round-trip explicitly mentioned in Query.metadata's docstring), dict(self.metadata) will raise TypeError: 'NoneType' object is not iterable. Likewise, any code that does result.metadata.get(...) on this result will raise AttributeError. Fix:
return QueryResult(
id=result_id or response.id,
response_output=TextModelOutput(output=choice.message.content or ""),
metadata=metadata, # always pass dict; let omit_defaults handle empty
)There was a problem hiding this comment.
Fixed. metadata initialized to empty dict
| for conv_id, turns in conv_samples.items() | ||
| ] | ||
|
|
||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
🟠 HIGH · error-handling · [Claude]
await asyncio.gather(*tasks, return_exceptions=True) silently swallows every exception raised by a _conv_pipeline coroutine. The result list is discarded — there is no for r in results: if isinstance(r, Exception): .... Programmer bugs (KeyError on missing metadata key, AttributeError on result.metadata, dataset metadata-format drift, etc.) become invisible: the strategy returns phase_issuer.issued_count as if everything succeeded, and the only sign of failure is missing samples. AGENTS.md explicitly flags swallowed exceptions as forbidden. Fix: collect the result list and re-raise the first non-CancelledError exception, or at least log each one before discarding:
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError):
logger.error("conv_pipeline crashed: %r", r, exc_info=r)There was a problem hiding this comment.
Fixed. Errors are collected and logged.
| logger.warning( | ||
| f"Turn {turn} of {conv_id} timed out waiting for previous turn" | ||
| ) | ||
| state.failed_turns += 1 |
There was a problem hiding this comment.
🟡 MEDIUM · bug · [Claude]
When a turn times out waiting for the previous turn's response, the pipeline does state.failed_turns += 1 but never increments state.completed_turns, then breaks out of the loop. As a result, ConversationState.is_complete() (completed_turns >= expected_client_turns) returns False forever, the conversation never logs completion, and downstream _log_if_complete reporting silently drops it. Worse, if the late response arrives later, mark_turn_complete runs on the abandoned state and bumps completed_turns once, leaving completed_turns=1, failed_turns=1 for what was a multi-turn conversation, with the remaining turns never issued — but no error is raised. Fix: bump both counters on timeout (or call self._conv_manager.mark_turn_failed(...)), so the failure is properly accounted, and consider also draining/discarding the now-orphan _inflight entries for this conversation so a late response doesn't mutate state for a dead pipeline.
except TimeoutError:
logger.warning(...)
state.completed_turns += 1
state.failed_turns += 1
breakThere was a problem hiding this comment.
Fixed. Timeout calls mark_turn_failed() which bumps the counters.
| query_id = uuid.uuid4().hex | ||
| data = self._dataset.load_sample(sample_index) | ||
| if data_override is not None: | ||
| data = {**data, **data_override} |
There was a problem hiding this comment.
🟡 MEDIUM · data-integrity · [Claude]
PromptData.text = data.get("prompt") and token_ids = data.get("input_tokens") or data.get("token_ids") — neither is set on multi-turn samples. MultiTurnDataset.load() writes the conversation into sample["messages"] (a list of message dicts) and does not populate prompt, input_tokens, or token_ids. The result is that every multi-turn ISSUED event records PromptData(text=None, token_ids=None), so the metrics aggregator computes ISL from zero or just the bare current-turn user text instead of the full prompt-with-history actually sent over the wire. ISL/throughput/TPOT-derived numbers reported for any multi-turn benchmark will therefore be wrong. Either serialize the merged messages into a synthetic prompt for ISL accounting, or extend PromptData to carry the OpenAI messages list and have the aggregator handle it. At minimum, document the limitation prominently and emit a one-shot warning when running multi-turn without a tokenizer.
There was a problem hiding this comment.
Fixed. Contents are jointed into prompt_text
| if val and isinstance(val, str): | ||
| system_content = val | ||
| break | ||
| system_prompts_by_conv[str(conv_id)] = system_content |
There was a problem hiding this comment.
🟡 MEDIUM · bug · [Claude]
system_prompts_by_conv is keyed by str(conv_id) (line 212), but pre_built_messages_by_key and current_turn_messages_by_key are keyed by the raw conv_id returned from groupby (lines 266–267), and samples stores the raw value too. MultiTurnStrategy.execute() then looks up sys_prompts.get(conv_id) with the raw value (multi_turn_strategy.py line 116), so the system prompt is silently dropped whenever a JSONL has a non-string conversation_id (integer ids, mixed types after pandas type inference, etc.). Pick one canonicalization (preferably str(conv_id)) and apply it consistently across all dictionaries — including the samples entries and the _conv_states/_inflight keys in the strategy.
There was a problem hiding this comment.
Fixed. All keys now use str(conv_id)
| messages.append({"role": "system", "content": system_content}) | ||
|
|
||
| # All dataset rows strictly before this client turn (includes | ||
| # assistant rows and prior tool results). |
There was a problem hiding this comment.
🔵 LOW · bug · [Claude]
When iterating prior_rows to build pre-built messages, a tool row whose tool_results field is an empty list (e.g. tool_results: []) is collected into msg, then _expand_tool_results(msg) returns [] (line 244 falls through), and the bare malformed msg (role='tool', no tool_call_id, no content, just the empty tool_results) is appended to the message history at line 246. This produces an OpenAI-invalid tool message. Either skip rows with empty tool_results entirely, or treat an empty list as a validation error in _validate_conversation_structure.
There was a problem hiding this comment.
Fixed. Empty tool results return []
|
|
||
| @pytest.mark.integration | ||
| @pytest.mark.asyncio | ||
| async def test_tools_field_forwarded_to_endpoint(echo_server): |
There was a problem hiding this comment.
🔵 LOW · testing · [Claude]
The "tool use" integration tests verify that tools, tool_calls, and tool_results are forwarded to the endpoint and that all client turns are issued, but they only run against an EchoServer that does not validate tool_call_id pairing or generate fresh tool_call ids. As a result:
- The live-history
tool_call_idmismatch issue (see related issue) cannot be detected by these tests. - There is no test covering pipeline error propagation (today, a crash inside
_conv_pipelineis silently swallowed bygather(return_exceptions=True)). - There is no concurrent-conversation stress test (>10 conversations under a non-trivial
target_concurrency) that exercises the semaphore + timeout interaction.
Add at least: (1) a test using a fake server that asserts each tool message's tool_call_id matches a prior assistant tool_calls id under live-history mode (currently expected to fail), (2) a test asserting that an exception in a pipeline propagates out of MultiTurnStrategy.execute, and (3) a stress test with 50+ concurrent conversations and target_concurrency set, asserting all complete and the semaphore reaches its rest value.
There was a problem hiding this comment.
Added a test with concurrent conversations and multiple turns.
| "tool": {"assistant", "user"}, | ||
| } | ||
|
|
||
| for conv_id, group in self.dataframe.groupby("conversation_id"): |
There was a problem hiding this comment.
🔵 LOW · performance · [Claude]
_validate_conversation_structure, _validate_turn_numbering, and _build_metadata each call self.dataframe.groupby("conversation_id") independently (lines 142, 164, 201, 280). For a dataset with thousands of conversations, these are repeated O(N log N) operations on top of a to_dict(orient="records") walk in _validate_conversation_grouping. Cache the groupby once at the top of __init__ and pass the grouped object into each helper. This is dataset-load-time only (cold path), so it's a low-priority cleanup, but the redundancy is currently visible in the code.
| """ | ||
|
|
||
| conversation_id: str | ||
| turn_done: asyncio.Event = field(default_factory=asyncio.Event) |
There was a problem hiding this comment.
🔵 LOW · concurrency · [Claude]
turn_done: asyncio.Event = field(default_factory=asyncio.Event) is constructed at ConversationState instantiation time, before MultiTurnStrategy.execute runs. In Python 3.10+ this no longer requires a running event loop at construction time, but the resulting Event is still bound to whichever loop first awaits/sets it. MultiTurnStrategy.__init__ similarly constructs asyncio.Semaphore(target_concurrency) (multi_turn_strategy.py line 84). Today _run_benchmark_async happens to construct both inside the eventual benchmark loop, but that's fragile — anyone who instantiates a MultiTurnStrategy outside the loop (tests, CLI helpers, future REPL) will silently get cross-loop primitives that hang on wait(). Defer creation to execute() (where the running loop is guaranteed to exist) or assert asyncio.get_running_loop() in __init__.
| ) | ||
|
|
||
| # Maps query_id -> conversation_id for routing completions. | ||
| self._inflight: dict[str, str] = {} |
There was a problem hiding this comment.
🔵 LOW · design · [Claude]
self._inflight: dict[str, str] = {} is mutated from two execution contexts (the per-conversation pipeline tasks at line 190, and on_sample_complete invoked from the recv task at line 218) and is never bounded. While Python's GIL makes the dict ops atomic and single-threaded asyncio prevents true concurrent access, every entry that doesn't get a response (timeouts, dropped responses on shutdown, etc.) leaks for the lifetime of the strategy. For a long-running benchmark with many timeouts, this is a silent memory/diagnosability issue. Consider clearing _inflight entries explicitly in the timeout branch (line 165) and on session-stop, and add a debug-time assertion that _inflight is empty when execute returns.
Review Council — Multi-AI Code ReviewReviewed by: Claude | Depth: thorough | Commit: 14 inline comments posted in review #4184417968. 1 finding (openai_adapter.py:131) is summary-only because the line falls outside the diff hunk. Found 15 issues (0 critical, 2 high, 5 medium, 8 low). Codex review failed with a CLI config error ( 🟠 Must Fix (high)Issues that will cause incorrect behavior users will hit in normal usage.
🟡 Should Fix (medium)Real issues that trigger under specific conditions, or design flaws that will compound.
🔵 Consider (low)Valid improvements that could be follow-ups.
|
…tation Fix 15 review issues across severity levels: - HIGH: metadata=None crash in msgspec adapter, silent exception swallowing in gather - MEDIUM: timeout state consistency, conv_id canonicalization, PromptData fallback, conv_id guard - LOW: enum comparison, frozen config, empty tool_results warning, adapter metadata extraction, groupby deduplication, live-history tool warning, asyncio.Event docs, test TODO Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use newline separators (instead of spaces) when flattening messages to text for ISL estimation, and add a 12-conversation concurrent stress test. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi @arekay-nv, I've addressed the comments. Appreciate it if you could take another look. |
There was a problem hiding this comment.
Pull request overview
This PR updates the benchmarking system to support multi-turn conversational workloads (including tool-calling sequences), adds a dedicated multi-turn dataset format + conversion/validation tooling, and wires a new multi-turn load strategy into the benchmarking session and OpenAI adapters.
Changes:
- Add
MultiTurnDataset(flat-row JSONL format) with validation, metadata precomputation, and adapter-default handling for per-turn parameters/tools. - Add
MultiTurnStrategy+ConversationManagerto enforce per-conversation turn sequencing with optional global concurrency limiting, and integrate it intoBenchmarkSession. - Extend OpenAI request/response handling for
messages,tools, tool-call metadata, and streaming tool-call accumulation; add extensive unit/integration tests and multi-turn docs/examples/scripts.
Reviewed changes
Copilot reviewed 43 out of 44 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/openai/test_openai_adapter.py | New unit tests for OpenAIAdapter tool serialization and tools forwarding. |
| tests/unit/openai/test_msgspec_adapter.py | New unit tests for msgspec OpenAI adapter tool-call fields and message dict conversion. |
| tests/unit/load_generator/test_multi_turn_strategy.py | New unit tests for turn sequencing, concurrency semaphore behavior, and metadata propagation. |
| tests/unit/load_generator/test_multi_turn_conversation_manager.py | New unit tests for conversation state bookkeeping and event gating. |
| tests/unit/dataset_manager/test_transforms.py | Add coverage for the new AddDefaultColumns transform. |
| tests/unit/dataset_manager/test_multi_turn_dataset.py | Comprehensive tests for MultiTurnDataset, including tool sequences and metadata correctness. |
| tests/unit/core/test_types.py | Add tests for QueryResult.with_metadata() and Query.metadata round-tripping. |
| tests/unit/config/test_schema.py | Add tests for multi-turn config validation and multi-turn sample counting logic. |
| tests/integration/test_multi_turn.py | End-to-end integration tests exercising dataset-history/live-history modes and tool-use conversations. |
| src/inference_endpoint/openai/types.py | Extend msgspec OpenAI types to include tool_calls, tool_call_id, and tools. |
| src/inference_endpoint/openai/openai_msgspec_adapter.py | Support messages input, tool-call fields, tools forwarding, and richer response metadata. |
| src/inference_endpoint/openai/openai_adapter.py | Support messages input, tools forwarding, and return richer response metadata. |
| src/inference_endpoint/openai/accumulator.py | Accumulate streamed tool_calls + finish_reason into final QueryResult.metadata. |
| src/inference_endpoint/load_generator/strategy.py | Extend PhaseIssuerProtocol.issue() to accept an optional data_override. |
| src/inference_endpoint/load_generator/session.py | Allow injecting a per-phase strategy; support data overrides in sample issuance. |
| src/inference_endpoint/load_generator/multi_turn_strategy.py | New multi-turn strategy implementing per-conversation sequencing + global concurrency limiting. |
| src/inference_endpoint/load_generator/conversation_manager.py | New synchronous conversation state manager used by multi-turn strategy. |
| src/inference_endpoint/endpoint_client/worker.py | Propagate Query.metadata through requests and merge into results. |
| src/inference_endpoint/endpoint_client/http.py | Add query_metadata field to InFlightRequest. |
| src/inference_endpoint/endpoint_client/adapter_protocol.py | Generalize SSE decoding/parse APIs to return adapter-specific chunk objects. |
| src/inference_endpoint/dataset_manager/transforms.py | Add AddDefaultColumns (fill-missing-only) transform. |
| src/inference_endpoint/dataset_manager/multi_turn_dataset.py | New multi-turn dataset implementation with tool-sequence handling and metadata building. |
| src/inference_endpoint/dataset_manager/factory.py | Select MultiTurnDataset when dataset config includes multi_turn; skip prompt-based transforms for it. |
| src/inference_endpoint/dataset_manager/init.py | Export MultiTurnDataset and AddDefaultColumns. |
| src/inference_endpoint/core/types.py | Add Query.metadata and QueryResult.with_metadata(). |
| src/inference_endpoint/config/templates/online_template_full.yaml | Expose multi_turn dataset block and multi_turn load pattern option in template. |
| src/inference_endpoint/config/templates/online_template.yaml | Expose multi_turn load pattern option in template. |
| src/inference_endpoint/config/templates/offline_template_full.yaml | Expose multi_turn dataset block and load pattern option in template. |
| src/inference_endpoint/config/templates/concurrency_template_full.yaml | Expose multi_turn dataset block and load pattern option in template. |
| src/inference_endpoint/config/templates/concurrency_template.yaml | Expose multi_turn load pattern option in template. |
| src/inference_endpoint/config/schema.py | Add multi-turn schema objects and cross-validate dataset.multi_turn ↔ load_pattern.type. |
| src/inference_endpoint/config/runtime_settings.py | Make multi-turn sample count issue all dataset client turns (min-sample-count aware). |
| src/inference_endpoint/commands/benchmark/execute.py | Instantiate and wire MultiTurnStrategy automatically when using MultiTurnDataset. |
| scripts/validate_jsonl_schema.py | New CLI script to validate multi-turn JSONL rows against schema. |
| scripts/multi_turn_dataset_schema.json | New JSON Schema for multi-turn flat-row JSONL datasets. |
| scripts/convert_agentic_snapshot.py | New conversion+verification script from snapshot-style agentic datasets to flat-row JSONL. |
| examples/09_MultiTurn/multi_turn_with_concurrency.yaml | Example config: multi-turn with global concurrency limiting. |
| examples/09_MultiTurn/multi_turn_benchmark.yaml | Example config: basic multi-turn benchmark. |
| examples/09_MultiTurn/datasets/.gitkeep | Placeholder for converted example datasets. |
| examples/09_MultiTurn/customer_support_conversations.jsonl | Example multi-turn dataset. |
| examples/09_MultiTurn/agentic_workflow_benchmark.yaml | Example config for converted agentic workflow dataset. |
| examples/09_MultiTurn/agentic_coding_benchmark.yaml | Example config for converted agentic coding dataset. |
| examples/09_MultiTurn/README.md | Multi-turn feature documentation and agentic conversion guidance. |
| docs/MULTI_TURN_QUICKSTART.md | Quickstart guide for running multi-turn benchmarks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| samples.append( | ||
| { | ||
| "index": idx, | ||
| "conversation_id": str_conv_id, | ||
| "turn": t_n, | ||
| } | ||
| ) |
There was a problem hiding this comment.
_build_metadata() currently appends sample entries with "index": idx where idx is the DataFrame row index from iterrows(). That index is not guaranteed to be a dense 0..N-1 mapping to Dataset.load_sample() indices (especially after filtering to client turns). To avoid incorrect sample issuance, store a dense sample_index that matches the position in self.data (client_turn_samples) and have the scheduler use that for PhaseIssuer.issue().
| conv_samples: dict[str, list[tuple[int, int]]] = defaultdict(list) | ||
| for sample_index, sample_meta in enumerate(self._dataset_metadata["samples"]): | ||
| conv_id = sample_meta["conversation_id"] | ||
| conv_samples[conv_id].append((sample_index, sample_meta["turn"])) | ||
|
|
There was a problem hiding this comment.
execute() uses enumerate(dataset_metadata["samples"]) to derive sample_index for phase_issuer.issue(). This assumes the metadata list order exactly matches the dataset's load_sample() indexing. With MultiTurnDataset, that coupling is fragile (and can break if metadata ordering differs from self.data). Prefer reading an explicit sample_index field from metadata (or another stable mapping) that is guaranteed to refer to the dataset sample index.
| return [ | ||
| { | ||
| "role": "tool", | ||
| "tool_call_id": result.get("tool_call_id"), | ||
| "content": result.get("content"), | ||
| } | ||
| for result in tool_results | ||
| ] | ||
|
|
||
|
|
There was a problem hiding this comment.
_expand_tool_results() will emit tool messages even if a tool_results entry is missing tool_call_id or content (they become None). That produces invalid OpenAI wire-format messages and will likely fail downstream. Since tool rows are required to have non-empty tool_results with required fields (per schema/docs), consider validating each entry and raising InputValidationError when required keys are absent.
| return [ | |
| { | |
| "role": "tool", | |
| "tool_call_id": result.get("tool_call_id"), | |
| "content": result.get("content"), | |
| } | |
| for result in tool_results | |
| ] | |
| expanded_results: list[dict] = [] | |
| for index, result in enumerate(tool_results): | |
| if not isinstance(result, dict): | |
| raise InputValidationError( | |
| "Each tool_results entry must be an object " | |
| f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, " | |
| f"tool_result_index={index})." | |
| ) | |
| if "tool_call_id" not in result or result["tool_call_id"] is None: | |
| raise InputValidationError( | |
| "Each tool_results entry must include a non-null 'tool_call_id' " | |
| f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, " | |
| f"tool_result_index={index})." | |
| ) | |
| if "content" not in result or result["content"] is None: | |
| raise InputValidationError( | |
| "Each tool_results entry must include non-null 'content' " | |
| f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, " | |
| f"tool_result_index={index})." | |
| ) | |
| expanded_results.append( | |
| { | |
| "role": "tool", | |
| "tool_call_id": result["tool_call_id"], | |
| "content": result["content"], | |
| } | |
| ) | |
| return expanded_results |
| super().__init__(dataframe, **kwargs) | ||
| assert self.dataframe is not None, "Dataframe must be initialized" | ||
| self._conv_groups = dict(list(self.dataframe.groupby("conversation_id"))) | ||
| self._validate_conversation_grouping() | ||
| self._validate_conversation_structure() | ||
| self._validate_turn_numbering() | ||
| self.conversation_metadata = self._build_metadata() |
There was a problem hiding this comment.
MultiTurnDataset builds self._conv_groups via dataframe.groupby("conversation_id"), which defaults to sorting keys. That can reorder conversations compared to file order, while load() later builds self.data in raw row order. If conversation_ids are not already lexicographically sorted, conversation_metadata["samples"] ordering can diverge from load_sample() indices, causing MultiTurnStrategy to issue the wrong samples/turns. Consider using groupby(..., sort=False) and/or explicitly constructing a dense sample_index in file order that is guaranteed to match self.data indices.
arekay-nv
left a comment
There was a problem hiding this comment.
There is some confusion around the issue policy, lets sync today.
| **Sizing guide**: | ||
|
|
||
| - Small (< 50 convs): `target_concurrency: 32` | ||
| - Medium (50-500 convs): `target_concurrency: 64` | ||
| - Large (500+ convs): `target_concurrency: 96` or higher |
There was a problem hiding this comment.
What are the metrics for these suggestions? For Small (< 50 convs) does a concurrency of 32 give better performance? I would assume the model size/server capability also plays a role in the concurrency settings.
There was a problem hiding this comment.
Removed this part. Only keeping concurrency definition.
| workers: 16 # More workers for parallel conversations | ||
| ``` | ||
|
|
||
| ### Long Conversations |
There was a problem hiding this comment.
clarify with "conversations with large number of turns" - a long conversation can also mean small number of long turns as end to end time could also be large there.
There was a problem hiding this comment.
Same as above. Removed this exact phrasing.
| **Problem**: Your dataset doesn't follow a valid role sequence. | ||
|
|
||
| **Fix**: Check your JSONL. Valid sequences: |
There was a problem hiding this comment.
Is it possible to add a utility that parses the dataset to make sure it is compliant so devs can use it instead of running the benchmark for testing.
There was a problem hiding this comment.
Added documentation to use scripts/validate_jsonl_schema.py for validation
| ) | ||
|
|
||
| # Maps query_id -> conversation_id for routing completions. | ||
| self._inflight: dict[str, str] = {} |
| tasks = [ | ||
| asyncio.create_task( | ||
| self._conv_pipeline(conv_id, turns, phase_issuer), | ||
| name=f"mt-pipeline-{conv_id}", | ||
| ) | ||
| for conv_id, turns in conv_samples.items() | ||
| ] |
There was a problem hiding this comment.
So one task for each conversation?
| def test_encode_request_produces_valid_json_bytes(): | ||
| """encode_request returns bytes that msgspec can decode back.""" | ||
| messages = [{"role": "user", "content": "Hello"}] | ||
| query = Query( | ||
| id="q2", | ||
| data={ | ||
| "model": "m", | ||
| "messages": messages, | ||
| "max_completion_tokens": 64, | ||
| "stream": False, | ||
| }, | ||
| ) | ||
| request = OpenAIAdapter.to_endpoint_request(query) | ||
| encoded = OpenAIAdapter.encode_request(request) | ||
|
|
||
| assert isinstance(encoded, bytes) | ||
| decoded = msgspec.json.decode(encoded) | ||
| assert decoded["messages"][0]["role"] == "user" |
There was a problem hiding this comment.
Is this related to the multi-turn testing?
There was a problem hiding this comment.
Yes, more specifically for tool call handling
| # Acquire concurrency slot before issuing. | ||
| if self._sem is not None: | ||
| await self._sem.acquire() |
There was a problem hiding this comment.
This is something that needs fixing/clarification. As-is, this would mean that we have multiple conversations open, with N of them (N being the concurrency) sending requests. If i understand correctly, we only want to have N conversations open, with a turn from each currently being processed.
Launching tasks for each conversation will have this drawback - we cannot schedule tasks from a subset of conversations as all conversations will be available to run.
There was a problem hiding this comment.
Refactored to advance turn on on_sample_complete. Only N conversations are kept open, and a new conversation only starts when an existing one finishes.
| if self._sem is not None: | ||
| self._sem.release() |
There was a problem hiding this comment.
If we are going to issue turns from a conversation before moving onto a new conversation, this needs to move to the sample_complete request.
There was a problem hiding this comment.
Refactored. Same as above
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| @dataclass(frozen=True) |
…Strategy target_concurrency now limits active conversations (not in-flight requests). N worker tasks pull from asyncio.Queue, each processing one full conversation before taking the next. Also adds slots=True back to PhaseConfig and sort=False to groupby for file-order preservation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n implementation - openai_adapter: normalize null content to "" instead of literal "None" to avoid polluting conversation history in tool-calling responses - multi_turn_dataset: validate tool_results entries have required tool_call_id and content fields; raise InputValidationError at load time - multi_turn_dataset: remove unused "index" field from samples metadata - multi_turn_strategy: wrap mark_turn_complete/mark_turn_failed in try/except KeyError in on_sample_complete - multi_turn_strategy: clear _inflight at end of execute() with warning if entries remain (transport failure or session abort) - docs: remove prescriptive concurrency sizing guide; replace with definition of what target_concurrency controls - docs: rename "Long Conversations" to "Conversations with Many Turns" - docs: add dataset validation utility reference in Troubleshooting Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix refusal field set to literal string "None" instead of "" in openai_adapter.py — made downstream refusal checks incorrectly truthy - Add test_pipeline_error_propagated to verify execute() re-raises worker exceptions instead of swallowing them via gather(return_exceptions=True) - Clarify MultiTurnStrategy docstring and MULTI_TURN_QUICKSTART.md: target_concurrency = simultaneous conversations (not requests); each active conversation has exactly 1 in-flight turn at a time - Remove unjustified "Common Configurations" section from quickstart - Correct misleading "workers = concurrent conversations" tip; clarify client.workers and target_concurrency are independent layers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ategy Rewrites MultiTurnStrategy to issue subsequent turns synchronously inside on_sample_complete() (zero event-loop delay), removing pre-spawned worker tasks and per-conversation asyncio.Event waiting. ConversationState no longer holds an asyncio.Event; sequencing is driven entirely by the strategy. Addresses PR mlcommons#285 reviewer request to move turn issuance into the sample-complete handler. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 43 out of 44 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (2)
src/inference_endpoint/endpoint_client/adapter_protocol.py:134
parse_sse_chunknow appends whateverdecode_sse_messagereturns, includingNone(e.g., when an SSE message has no choices). This contradicts the docstring (“Silently ignores non-content SSE messages”) and forces downstream accumulators to defensively handleNone. Consider filtering outNonereturn values here (and/or handling exceptions per JSON doc) so call sites only see meaningful chunk objects.
def parse_sse_chunk(cls, buffer: bytes, end_pos: int) -> list[Any]:
"""
Parse SSE chunk and extract all chunk objects.
Extracts JSON documents from SSE stream and decodes them to chunk objects.
Silently ignores non-content SSE messages (role, finish_reason, etc).
Args:
buffer: Byte buffer containing SSE data
end_pos: End position in buffer to parse up to
Returns:
List of chunk objects extracted from the SSE chunk
"""
json_docs = cls.SSE_DATA_PATTERN.findall(buffer[:end_pos])
parsed_contents = []
try:
for json_doc in json_docs:
content = cls.decode_sse_message(json_doc)
parsed_contents.append(content)
except Exception:
# Normal for non-content SSE messages (role, finish_reason, etc)
pass
return parsed_contents
src/inference_endpoint/core/types.py:242
- The
Querydocstring’sgc=Falsenote only mentionsdata/headers, butQuerynow also has a mutablemetadatadict. To avoid future misuse (and to match the more explicitQueryResultguidance), consider updating this note to includemetadataas well and/or adding an AT-RISK (gc=False) warning thatdata/metadata/headersmust not be mutated to introduce cycles.
Attributes:
id: Unique identifier for this query (auto-generated UUID).
data: Request payload as a dictionary (typically contains prompt, model, etc.).
metadata: Internal metadata that round-trips through transport (e.g., conversation_id).
headers: HTTP headers to include in the request (e.g., authorization).
created_at: Timestamp when query was created (seconds since epoch).
Example:
>>> query = Query(
... data={"prompt": "Hello", "model": "Qwen/Qwen3-8B", "max_tokens": 100},
... headers={"Authorization": "Bearer token123"},
... )
Note:
gc=False: Safe because data/headers are simple key-value pairs without cycles.
Do NOT store self-referential or cyclic structures in data/headers fields.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class SSEDelta(msgspec.Struct, frozen=True, kw_only=True, omit_defaults=True, gc=False): # type: ignore[call-arg] | ||
| """SSE delta object containing content.""" | ||
|
|
||
| content: str = "" | ||
| reasoning: str = "" | ||
| tool_calls: list[dict[str, Any]] | None = None | ||
|
|
| raise ValueError( | ||
| f"Conversation {conv_id} has invalid role sequence at turn " | ||
| f"{row['turn']}: got '{role}' after state '{state}'" | ||
| ) |
|
Hi @arekay-nv, I've addressed the comments. Main change is refactoring to an event-based model and limiting number of active conversations to concurrency. Appreciate it if you could take another look. |
arekay-nv
left a comment
There was a problem hiding this comment.
Almost there - the only issue is the stickiness of the client-turns.
| def _on_sample_complete(result: QueryResult) -> None: | ||
| if multi_turn_strategy is not None: | ||
| multi_turn_strategy.on_sample_complete(result) | ||
| collector.on_complete_hook(result) | ||
|
|
There was a problem hiding this comment.
Can we create two versions of this and pass it to benchmark session conditionally. This way, we are not checking the strategy on each sample-complete event.
| class ConversationMode(str, Enum): | ||
| """Multi-turn conversation scheduling modes.""" | ||
|
|
||
| INDEPENDENT = "independent" # Per-conv pipelines; no cross-conv turn barrier | ||
|
|
||
|
|
There was a problem hiding this comment.
Is this needed? Can we remove it until we have two different modes.
| class AddDefaultColumns(Transform): | ||
| """Add columns only where values are missing (NaN or absent). | ||
|
|
||
| Unlike AddStaticColumns which unconditionally overwrites, this preserves | ||
| existing non-null values — dataset per-row overrides take precedence over | ||
| the supplied defaults. | ||
| """ | ||
|
|
||
| def __init__(self, data: dict[str, Any]): | ||
| """Initialize the AddDefaultColumns transform.""" | ||
| self.data = data | ||
|
|
||
| def __call__(self, df: pd.DataFrame) -> pd.DataFrame: | ||
| """Fill missing columns with defaults without overwriting existing values.""" | ||
| for key, value in self.data.items(): | ||
| if value is None: | ||
| continue | ||
| if key in df.columns: | ||
| df[key] = df[key].where(pd.notna(df[key]), value) | ||
| else: | ||
| df[key] = value | ||
| return df | ||
|
|
||
|
|
There was a problem hiding this comment.
Can this be implemented by using AddStaticColumn and a boolean indicating whether to overwrite or not?
| else: | ||
| return "<EMPTY>" | ||
|
|
||
| def with_metadata( |
There was a problem hiding this comment.
Is this needed for anything other than conversation id? Can we use the query-id for this instead of having to wire the metadata through the whole rount-trip.
| @pytest.mark.unit | ||
| def test_multi_turn_valid_config(self): | ||
| config = BenchmarkConfig(**self._make_online_multi_turn(concurrency=16)) | ||
| from inference_endpoint.config.schema import LoadPatternType |
|
|
||
| @pytest.mark.unit | ||
| def test_multi_turn_uses_dataset_size_ignoring_duration(self): | ||
| from inference_endpoint.config.runtime_settings import RuntimeSettings |
|
@viraatc can you check the perf implications - there might be non-negligible overhead for non-agentic/multi-turn workloads. |
What does this PR do?
Updated multi-turn implementation for #232. Added tool sequencing, fixed scheduler for concurrent requests.
Type of change
Related issues
Testing
Checklist