Skip to content

Feat/tool sequences#285

Open
tianmu-li wants to merge 13 commits intomlcommons:mainfrom
tianmu-li:feat/tool_sequences
Open

Feat/tool sequences#285
tianmu-li wants to merge 13 commits intomlcommons:mainfrom
tianmu-li:feat/tool_sequences

Conversation

@tianmu-li
Copy link
Copy Markdown
Collaborator

What does this PR do?

Updated multi-turn implementation for #232. Added tool sequencing, fixed scheduler for concurrent requests.

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 17, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive multi-turn conversation benchmarking framework, including a new MultiTurnScheduler, ConversationManager, and MultiTurnDataset. These additions enable benchmarking of conversational AI workloads with turn sequencing, conversation history management, and optional concurrency control. My review identified potential issues regarding the usage of sentinel objects in the scheduler and the robustness of the timeout logic in the conversation manager.

Comment thread src/inference_endpoint/load_generator/scheduler.py Outdated
f"Turn {turn} of {conv_id} timed out waiting for prev turn"
)
break # Skip remaining turns to avoid cascade timeouts
ready.put((idx, 0))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _PIPELINE_DONE sentinel is pushed to the queue, but the consumption logic in the finally block or the iterator loop does not explicitly handle it as a termination signal for individual threads, relying instead on the queue being drained. Ensure this is the intended design to avoid potential race conditions.

Comment thread src/inference_endpoint/load_generator/conversation_manager.py Outdated
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review - will complete later.

Comment thread examples/09_MultiTurn/README.md Outdated
- Plain chat: `user → assistant → user → ...`
- Agentic: `user → assistant (with tool_calls) → tool → [tool | assistant (with tool_calls)]* → assistant → user → ...`
2. First turn must be "user" role
3. Turn numbers must be sequential (1, 2, 3, ...)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all the turns be ordered and consecutive in the dataset? For instance if i have turn 1 of all conversations followed by turn 2 of all conversations - is that a valid format. Would it not be easier to have a conversation as a top level object which contains a list of messages and associated metadata inside?
The challenge here would be reading in all the conversations to collect all the turns - unless we enforce some constraints.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all turns are now ordered and consecutive.

Comment thread examples/09_MultiTurn/README.md Outdated
Comment thread examples/09_MultiTurn/README.md Outdated
Comment thread examples/09_MultiTurn/agentic_coding_benchmark.yaml Outdated
type: performance
# Replace with the path where you saved the converted flat-row JSONL.
# Run: python scripts/convert_agentic_snapshot.py <input.jsonl> <output_flat.jsonl> --verify
path: /model/agentic_coding_flat.jsonl
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, these should be relative and follow the readme example. So if you follow the readme example, this config should work out of the box.

Comment thread examples/09_MultiTurn/agentic_workflow_benchmark.yaml
Comment thread docs/MULTI_TURN_QUICKSTART.md
Comment thread MULTI_TURN_QUICKSTART.md Outdated
Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment thread MULTI_TURN_QUICKSTART.md Outdated
tianmu-li added a commit to tianmu-li/endpoints that referenced this pull request Apr 22, 2026
- Remove sequential conversation mode (redundant with target_concurrency=1)
- Remove `enabled` field from MultiTurnConfig; presence of multi_turn: block implies enabled
- Add conversation grouping validation to MultiTurnDataset (raises InputValidationError if rows for a conversation_id are not consecutive)
- Update YAML example configs: model placeholder, relative dataset paths, removed redundant metrics.collect
- Move MULTI_TURN_QUICKSTART.md to docs/
- Update all documentation to remove sequential mode references

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
tianmu-li added a commit to tianmu-li/endpoints that referenced this pull request Apr 22, 2026
…wer comments

- Remove dead constant BLOCK_ON_PREVIOUS_TURN = -1 from scheduler.py
- Remove redundant outer with state.condition: in mark_turn_complete
- Remove ConversationMode import and explicit mode= args from integration tests
- Fix format: jsonl → format: ".jsonl" in example YAMLs and docs
- Add target_concurrency: 1 clarification to quickstart (preserves turn ordering)
- Remove broken HYBRID_SCHEDULER_GUIDE.md reference from quickstart

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li tianmu-li force-pushed the feat/tool_sequences branch from d8dfd64 to b127845 Compare April 25, 2026 22:53
tianmu-li and others added 7 commits April 27, 2026 11:44
Add MultiTurnDataset, MultiTurnConfig schema, tool-calling types,
Query.metadata transport field, adapter tools= kwarg, and multi-turn
factory routing.
Add per-conversation asyncio.Event sequencing (ConversationManager),
async turn pipeline (MultiTurnStrategy), and benchmark execution wiring
(execute.py, session.py PhaseIssuer data_override).
Add unit tests for MultiTurnDataset, ConversationManager, and
MultiTurnStrategy; add integration tests including tool-use scenarios
and large-concurrency stress tests.
Consolidate multi-turn dataset with single-turn transform pipeline,
fix prior-row extraction, live-history mode, system prompt injection,
tool_calls preservation, and asyncio.Event-based sequencing.
Add MULTI_TURN_QUICKSTART.md, examples/09_MultiTurn/ configs and sample
data, scripts/convert_agentic_snapshot.py, and README clarifications
including conversion script output destination.
…ring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…efault path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li tianmu-li force-pushed the feat/tool_sequences branch from b127845 to 0a7ad37 Compare April 27, 2026 20:09
@arekay-nv
Copy link
Copy Markdown
Collaborator

arekay-nv commented Apr 27, 2026

⚠️ Superseded. This comment was posted while a pending review was blocking inline-comment delivery. The review has since been re-posted with all 14 inline comments attached: see review #4184417968 and the updated summary. The findings are unchanged.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Claude (Codex review failed with a CLI config error — invalid 'features' requirement 'browser_use' from cloud requirements) | Depth: thorough

Found 15 issues (0 critical, 2 high, 5 medium, 8 low). 14 posted as inline comments, 1 in summary table only (line outside diff hunk).

id=result_id or response.id,
response_output=TextModelOutput(output=response.choices[0].message.content),
response_output=TextModelOutput(output=choice.message.content or ""),
metadata=metadata if metadata else None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 HIGH · bug · [Claude]

from_endpoint_response violates the QueryResult.metadata type contract by passing metadata=None when no tool_calls/finish_reason/reasoning_content are present. QueryResult.metadata is typed dict[str, Any] with default_factory=dict. The constructed result is then passed to worker._handle_non_streaming_body which calls result.with_metadata(req.query_metadata). with_metadata short-circuits on empty additional_metadata (current callers pass {}), but as soon as any caller routes a non-empty Query.metadata (e.g., the conversation_id round-trip explicitly mentioned in Query.metadata's docstring), dict(self.metadata) will raise TypeError: 'NoneType' object is not iterable. Likewise, any code that does result.metadata.get(...) on this result will raise AttributeError. Fix:

return QueryResult(
    id=result_id or response.id,
    response_output=TextModelOutput(output=choice.message.content or ""),
    metadata=metadata,  # always pass dict; let omit_defaults handle empty
)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. metadata initialized to empty dict

for conv_id, turns in conv_samples.items()
]

await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 HIGH · error-handling · [Claude]

await asyncio.gather(*tasks, return_exceptions=True) silently swallows every exception raised by a _conv_pipeline coroutine. The result list is discarded — there is no for r in results: if isinstance(r, Exception): .... Programmer bugs (KeyError on missing metadata key, AttributeError on result.metadata, dataset metadata-format drift, etc.) become invisible: the strategy returns phase_issuer.issued_count as if everything succeeded, and the only sign of failure is missing samples. AGENTS.md explicitly flags swallowed exceptions as forbidden. Fix: collect the result list and re-raise the first non-CancelledError exception, or at least log each one before discarding:

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError):
        logger.error("conv_pipeline crashed: %r", r, exc_info=r)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Errors are collected and logged.

logger.warning(
f"Turn {turn} of {conv_id} timed out waiting for previous turn"
)
state.failed_turns += 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · bug · [Claude]

When a turn times out waiting for the previous turn's response, the pipeline does state.failed_turns += 1 but never increments state.completed_turns, then breaks out of the loop. As a result, ConversationState.is_complete() (completed_turns >= expected_client_turns) returns False forever, the conversation never logs completion, and downstream _log_if_complete reporting silently drops it. Worse, if the late response arrives later, mark_turn_complete runs on the abandoned state and bumps completed_turns once, leaving completed_turns=1, failed_turns=1 for what was a multi-turn conversation, with the remaining turns never issued — but no error is raised. Fix: bump both counters on timeout (or call self._conv_manager.mark_turn_failed(...)), so the failure is properly accounted, and consider also draining/discarding the now-orphan _inflight entries for this conversation so a late response doesn't mutate state for a dead pipeline.

except TimeoutError:
    logger.warning(...)
    state.completed_turns += 1
    state.failed_turns += 1
    break

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Timeout calls mark_turn_failed() which bumps the counters.

query_id = uuid.uuid4().hex
data = self._dataset.load_sample(sample_index)
if data_override is not None:
data = {**data, **data_override}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · data-integrity · [Claude]

PromptData.text = data.get("prompt") and token_ids = data.get("input_tokens") or data.get("token_ids") — neither is set on multi-turn samples. MultiTurnDataset.load() writes the conversation into sample["messages"] (a list of message dicts) and does not populate prompt, input_tokens, or token_ids. The result is that every multi-turn ISSUED event records PromptData(text=None, token_ids=None), so the metrics aggregator computes ISL from zero or just the bare current-turn user text instead of the full prompt-with-history actually sent over the wire. ISL/throughput/TPOT-derived numbers reported for any multi-turn benchmark will therefore be wrong. Either serialize the merged messages into a synthetic prompt for ISL accounting, or extend PromptData to carry the OpenAI messages list and have the aggregator handle it. At minimum, document the limitation prominently and emit a one-shot warning when running multi-turn without a tokenizer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Contents are jointed into prompt_text

if val and isinstance(val, str):
system_content = val
break
system_prompts_by_conv[str(conv_id)] = system_content
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · bug · [Claude]

system_prompts_by_conv is keyed by str(conv_id) (line 212), but pre_built_messages_by_key and current_turn_messages_by_key are keyed by the raw conv_id returned from groupby (lines 266–267), and samples stores the raw value too. MultiTurnStrategy.execute() then looks up sys_prompts.get(conv_id) with the raw value (multi_turn_strategy.py line 116), so the system prompt is silently dropped whenever a JSONL has a non-string conversation_id (integer ids, mixed types after pandas type inference, etc.). Pick one canonicalization (preferably str(conv_id)) and apply it consistently across all dictionaries — including the samples entries and the _conv_states/_inflight keys in the strategy.

Copy link
Copy Markdown
Collaborator Author

@tianmu-li tianmu-li Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. All keys now use str(conv_id)

messages.append({"role": "system", "content": system_content})

# All dataset rows strictly before this client turn (includes
# assistant rows and prior tool results).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · bug · [Claude]

When iterating prior_rows to build pre-built messages, a tool row whose tool_results field is an empty list (e.g. tool_results: []) is collected into msg, then _expand_tool_results(msg) returns [] (line 244 falls through), and the bare malformed msg (role='tool', no tool_call_id, no content, just the empty tool_results) is appended to the message history at line 246. This produces an OpenAI-invalid tool message. Either skip rows with empty tool_results entirely, or treat an empty list as a validation error in _validate_conversation_structure.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Empty tool results return []


@pytest.mark.integration
@pytest.mark.asyncio
async def test_tools_field_forwarded_to_endpoint(echo_server):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · testing · [Claude]

The "tool use" integration tests verify that tools, tool_calls, and tool_results are forwarded to the endpoint and that all client turns are issued, but they only run against an EchoServer that does not validate tool_call_id pairing or generate fresh tool_call ids. As a result:

  • The live-history tool_call_id mismatch issue (see related issue) cannot be detected by these tests.
  • There is no test covering pipeline error propagation (today, a crash inside _conv_pipeline is silently swallowed by gather(return_exceptions=True)).
  • There is no concurrent-conversation stress test (>10 conversations under a non-trivial target_concurrency) that exercises the semaphore + timeout interaction.

Add at least: (1) a test using a fake server that asserts each tool message's tool_call_id matches a prior assistant tool_calls id under live-history mode (currently expected to fail), (2) a test asserting that an exception in a pipeline propagates out of MultiTurnStrategy.execute, and (3) a stress test with 50+ concurrent conversations and target_concurrency set, asserting all complete and the semaphore reaches its rest value.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test with concurrent conversations and multiple turns.

"tool": {"assistant", "user"},
}

for conv_id, group in self.dataframe.groupby("conversation_id"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · performance · [Claude]

_validate_conversation_structure, _validate_turn_numbering, and _build_metadata each call self.dataframe.groupby("conversation_id") independently (lines 142, 164, 201, 280). For a dataset with thousands of conversations, these are repeated O(N log N) operations on top of a to_dict(orient="records") walk in _validate_conversation_grouping. Cache the groupby once at the top of __init__ and pass the grouped object into each helper. This is dataset-load-time only (cold path), so it's a low-priority cleanup, but the redundancy is currently visible in the code.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""

conversation_id: str
turn_done: asyncio.Event = field(default_factory=asyncio.Event)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · concurrency · [Claude]

turn_done: asyncio.Event = field(default_factory=asyncio.Event) is constructed at ConversationState instantiation time, before MultiTurnStrategy.execute runs. In Python 3.10+ this no longer requires a running event loop at construction time, but the resulting Event is still bound to whichever loop first awaits/sets it. MultiTurnStrategy.__init__ similarly constructs asyncio.Semaphore(target_concurrency) (multi_turn_strategy.py line 84). Today _run_benchmark_async happens to construct both inside the eventual benchmark loop, but that's fragile — anyone who instantiates a MultiTurnStrategy outside the loop (tests, CLI helpers, future REPL) will silently get cross-loop primitives that hang on wait(). Defer creation to execute() (where the running loop is guaranteed to exist) or assert asyncio.get_running_loop() in __init__.

)

# Maps query_id -> conversation_id for routing completions.
self._inflight: dict[str, str] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · design · [Claude]

self._inflight: dict[str, str] = {} is mutated from two execution contexts (the per-conversation pipeline tasks at line 190, and on_sample_complete invoked from the recv task at line 218) and is never bounded. While Python's GIL makes the dict ops atomic and single-threaded asyncio prevents true concurrent access, every entry that doesn't get a response (timeouts, dropped responses on shutdown, etc.) leaks for the lifetime of the strategy. For a long-running benchmark with many timeouts, this is a silent memory/diagnosability issue. Consider clearing _inflight entries explicitly in the timeout branch (line 165) and on session-stop, and add a debug-time assertion that _inflight is empty when execute returns.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@arekay-nv
Copy link
Copy Markdown
Collaborator

Review Council — Multi-AI Code Review

Reviewed by: Claude | Depth: thorough | Commit: 0a7ad37

14 inline comments posted in review #4184417968. 1 finding (openai_adapter.py:131) is summary-only because the line falls outside the diff hunk.

Found 15 issues (0 critical, 2 high, 5 medium, 8 low). Codex review failed with a CLI config error (invalid features requirement browser_use from cloud requirements) — Claude-only review.

🟠 Must Fix (high)

Issues that will cause incorrect behavior users will hit in normal usage.

# Location Category Summary
1 src/inference_endpoint/openai/openai_msgspec_adapter.py:222 bug from_endpoint_response violates the QueryResult.metadata type contract by passing metadata=None when no…
2 src/inference_endpoint/load_generator/multi_turn_strategy.py:137 error-handling await asyncio.gather(*tasks, return_exceptions=True) silently swallows every exception raised by a _conv_pipeline coroutine. The result…

🟡 Should Fix (medium)

Real issues that trigger under specific conditions, or design flaws that will compound.

# Location Category Summary
3 src/inference_endpoint/load_generator/multi_turn_strategy.py:164 bug When a turn times out waiting for the previous turn's response, the pipeline does state.failed_turns += 1 but never increments…
4 src/inference_endpoint/load_generator/session.py:206 data-integrity PromptData.text = data.get("prompt") and token_ids = data.get("input_tokens") or data.get("token_ids") — neither is set on multi-turn…
5 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:212 bug system_prompts_by_conv is keyed by str(conv_id) (line 212), but pre_built_messages_by_key and current_turn_messages_by_key are…
6 src/inference_endpoint/load_generator/multi_turn_strategy.py:180 bug In live-history mode (use_dataset_history=False), the per-turn tool message reuses the dataset's hardcoded tool_call_id (e.g.…
7 src/inference_endpoint/load_generator/conversation_manager.py:142 error-handling mark_turn_complete and mark_turn_failed raise KeyError if conversation_id is missing. These are invoked from…

🔵 Consider (low)

Valid improvements that could be follow-ups.

# Location Category Summary
8 src/inference_endpoint/config/runtime_settings.py:200 design self.load_pattern.type.value == "multi_turn" compares the enum value as a string literal instead of the enum:…
9 src/inference_endpoint/config/schema.py:253 design MultiTurnConfig uses model_config = {"extra": "forbid"} (raw dict) and is missing frozen=True, while every other config model in this…
10 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:222 bug When iterating prior_rows to build pre-built messages, a tool row whose tool_results field is an empty list (e.g. tool_results: [])…
11 tests/integration/test_multi_turn.py:559 testing The "tool use" integration tests verify that tools, tool_calls, and tool_results are forwarded to the endpoint and that all client…
12 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:142 performance _validate_conversation_structure, _validate_turn_numbering, and _build_metadata each call self.dataframe.groupby("conversation_id")
13 src/inference_endpoint/load_generator/conversation_manager.py:45 concurrency turn_done: asyncio.Event = field(default_factory=asyncio.Event) is constructed at ConversationState instantiation time, before…
14 src/inference_endpoint/load_generator/multi_turn_strategy.py:95 design self._inflight: dict[str, str] = {} is mutated from two execution contexts (the per-conversation pipeline tasks at line 190, and…
15 src/inference_endpoint/openai/openai_adapter.py:131 (summary-only) api-contract OpenAIAdapter.from_endpoint_response (the non-msgspec adapter, used when callers explicitly select it) does not extract tool_calls,…

tianmu-li and others added 2 commits April 28, 2026 10:37
…tation

Fix 15 review issues across severity levels:
- HIGH: metadata=None crash in msgspec adapter, silent exception swallowing in gather
- MEDIUM: timeout state consistency, conv_id canonicalization, PromptData fallback, conv_id guard
- LOW: enum comparison, frozen config, empty tool_results warning, adapter metadata extraction,
  groupby deduplication, live-history tool warning, asyncio.Event docs, test TODO

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use newline separators (instead of spaces) when flattening messages to
text for ISL estimation, and add a 12-conversation concurrent stress test.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li
Copy link
Copy Markdown
Collaborator Author

Hi @arekay-nv, I've addressed the comments. Appreciate it if you could take another look.

@tianmu-li tianmu-li marked this pull request as ready for review May 1, 2026 19:16
@tianmu-li tianmu-li requested review from a team and Copilot May 1, 2026 19:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the benchmarking system to support multi-turn conversational workloads (including tool-calling sequences), adds a dedicated multi-turn dataset format + conversion/validation tooling, and wires a new multi-turn load strategy into the benchmarking session and OpenAI adapters.

Changes:

  • Add MultiTurnDataset (flat-row JSONL format) with validation, metadata precomputation, and adapter-default handling for per-turn parameters/tools.
  • Add MultiTurnStrategy + ConversationManager to enforce per-conversation turn sequencing with optional global concurrency limiting, and integrate it into BenchmarkSession.
  • Extend OpenAI request/response handling for messages, tools, tool-call metadata, and streaming tool-call accumulation; add extensive unit/integration tests and multi-turn docs/examples/scripts.

Reviewed changes

Copilot reviewed 43 out of 44 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/unit/openai/test_openai_adapter.py New unit tests for OpenAIAdapter tool serialization and tools forwarding.
tests/unit/openai/test_msgspec_adapter.py New unit tests for msgspec OpenAI adapter tool-call fields and message dict conversion.
tests/unit/load_generator/test_multi_turn_strategy.py New unit tests for turn sequencing, concurrency semaphore behavior, and metadata propagation.
tests/unit/load_generator/test_multi_turn_conversation_manager.py New unit tests for conversation state bookkeeping and event gating.
tests/unit/dataset_manager/test_transforms.py Add coverage for the new AddDefaultColumns transform.
tests/unit/dataset_manager/test_multi_turn_dataset.py Comprehensive tests for MultiTurnDataset, including tool sequences and metadata correctness.
tests/unit/core/test_types.py Add tests for QueryResult.with_metadata() and Query.metadata round-tripping.
tests/unit/config/test_schema.py Add tests for multi-turn config validation and multi-turn sample counting logic.
tests/integration/test_multi_turn.py End-to-end integration tests exercising dataset-history/live-history modes and tool-use conversations.
src/inference_endpoint/openai/types.py Extend msgspec OpenAI types to include tool_calls, tool_call_id, and tools.
src/inference_endpoint/openai/openai_msgspec_adapter.py Support messages input, tool-call fields, tools forwarding, and richer response metadata.
src/inference_endpoint/openai/openai_adapter.py Support messages input, tools forwarding, and return richer response metadata.
src/inference_endpoint/openai/accumulator.py Accumulate streamed tool_calls + finish_reason into final QueryResult.metadata.
src/inference_endpoint/load_generator/strategy.py Extend PhaseIssuerProtocol.issue() to accept an optional data_override.
src/inference_endpoint/load_generator/session.py Allow injecting a per-phase strategy; support data overrides in sample issuance.
src/inference_endpoint/load_generator/multi_turn_strategy.py New multi-turn strategy implementing per-conversation sequencing + global concurrency limiting.
src/inference_endpoint/load_generator/conversation_manager.py New synchronous conversation state manager used by multi-turn strategy.
src/inference_endpoint/endpoint_client/worker.py Propagate Query.metadata through requests and merge into results.
src/inference_endpoint/endpoint_client/http.py Add query_metadata field to InFlightRequest.
src/inference_endpoint/endpoint_client/adapter_protocol.py Generalize SSE decoding/parse APIs to return adapter-specific chunk objects.
src/inference_endpoint/dataset_manager/transforms.py Add AddDefaultColumns (fill-missing-only) transform.
src/inference_endpoint/dataset_manager/multi_turn_dataset.py New multi-turn dataset implementation with tool-sequence handling and metadata building.
src/inference_endpoint/dataset_manager/factory.py Select MultiTurnDataset when dataset config includes multi_turn; skip prompt-based transforms for it.
src/inference_endpoint/dataset_manager/init.py Export MultiTurnDataset and AddDefaultColumns.
src/inference_endpoint/core/types.py Add Query.metadata and QueryResult.with_metadata().
src/inference_endpoint/config/templates/online_template_full.yaml Expose multi_turn dataset block and multi_turn load pattern option in template.
src/inference_endpoint/config/templates/online_template.yaml Expose multi_turn load pattern option in template.
src/inference_endpoint/config/templates/offline_template_full.yaml Expose multi_turn dataset block and load pattern option in template.
src/inference_endpoint/config/templates/concurrency_template_full.yaml Expose multi_turn dataset block and load pattern option in template.
src/inference_endpoint/config/templates/concurrency_template.yaml Expose multi_turn load pattern option in template.
src/inference_endpoint/config/schema.py Add multi-turn schema objects and cross-validate dataset.multi_turn ↔ load_pattern.type.
src/inference_endpoint/config/runtime_settings.py Make multi-turn sample count issue all dataset client turns (min-sample-count aware).
src/inference_endpoint/commands/benchmark/execute.py Instantiate and wire MultiTurnStrategy automatically when using MultiTurnDataset.
scripts/validate_jsonl_schema.py New CLI script to validate multi-turn JSONL rows against schema.
scripts/multi_turn_dataset_schema.json New JSON Schema for multi-turn flat-row JSONL datasets.
scripts/convert_agentic_snapshot.py New conversion+verification script from snapshot-style agentic datasets to flat-row JSONL.
examples/09_MultiTurn/multi_turn_with_concurrency.yaml Example config: multi-turn with global concurrency limiting.
examples/09_MultiTurn/multi_turn_benchmark.yaml Example config: basic multi-turn benchmark.
examples/09_MultiTurn/datasets/.gitkeep Placeholder for converted example datasets.
examples/09_MultiTurn/customer_support_conversations.jsonl Example multi-turn dataset.
examples/09_MultiTurn/agentic_workflow_benchmark.yaml Example config for converted agentic workflow dataset.
examples/09_MultiTurn/agentic_coding_benchmark.yaml Example config for converted agentic coding dataset.
examples/09_MultiTurn/README.md Multi-turn feature documentation and agentic conversion guidance.
docs/MULTI_TURN_QUICKSTART.md Quickstart guide for running multi-turn benchmarks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +275 to +281
samples.append(
{
"index": idx,
"conversation_id": str_conv_id,
"turn": t_n,
}
)
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_metadata() currently appends sample entries with "index": idx where idx is the DataFrame row index from iterrows(). That index is not guaranteed to be a dense 0..N-1 mapping to Dataset.load_sample() indices (especially after filtering to client turns). To avoid incorrect sample issuance, store a dense sample_index that matches the position in self.data (client_turn_samples) and have the scheduler use that for PhaseIssuer.issue().

Copilot uses AI. Check for mistakes.
Comment on lines +108 to +112
conv_samples: dict[str, list[tuple[int, int]]] = defaultdict(list)
for sample_index, sample_meta in enumerate(self._dataset_metadata["samples"]):
conv_id = sample_meta["conversation_id"]
conv_samples[conv_id].append((sample_index, sample_meta["turn"]))

Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute() uses enumerate(dataset_metadata["samples"]) to derive sample_index for phase_issuer.issue(). This assumes the metadata list order exactly matches the dataset's load_sample() indexing. With MultiTurnDataset, that coupling is fragile (and can break if metadata ordering differs from self.data). Prefer reading an explicit sample_index field from metadata (or another stable mapping) that is guaranteed to refer to the dataset sample index.

Copilot uses AI. Check for mistakes.
Comment thread src/inference_endpoint/openai/openai_adapter.py
Comment on lines +54 to +63
return [
{
"role": "tool",
"tool_call_id": result.get("tool_call_id"),
"content": result.get("content"),
}
for result in tool_results
]


Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_expand_tool_results() will emit tool messages even if a tool_results entry is missing tool_call_id or content (they become None). That produces invalid OpenAI wire-format messages and will likely fail downstream. Since tool rows are required to have non-empty tool_results with required fields (per schema/docs), consider validating each entry and raising InputValidationError when required keys are absent.

Suggested change
return [
{
"role": "tool",
"tool_call_id": result.get("tool_call_id"),
"content": result.get("content"),
}
for result in tool_results
]
expanded_results: list[dict] = []
for index, result in enumerate(tool_results):
if not isinstance(result, dict):
raise InputValidationError(
"Each tool_results entry must be an object "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
if "tool_call_id" not in result or result["tool_call_id"] is None:
raise InputValidationError(
"Each tool_results entry must include a non-null 'tool_call_id' "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
if "content" not in result or result["content"] is None:
raise InputValidationError(
"Each tool_results entry must include non-null 'content' "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
expanded_results.append(
{
"role": "tool",
"tool_call_id": result["tool_call_id"],
"content": result["content"],
}
)
return expanded_results

Copilot uses AI. Check for mistakes.
Comment on lines +106 to +112
super().__init__(dataframe, **kwargs)
assert self.dataframe is not None, "Dataframe must be initialized"
self._conv_groups = dict(list(self.dataframe.groupby("conversation_id")))
self._validate_conversation_grouping()
self._validate_conversation_structure()
self._validate_turn_numbering()
self.conversation_metadata = self._build_metadata()
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiTurnDataset builds self._conv_groups via dataframe.groupby("conversation_id"), which defaults to sorting keys. That can reorder conversations compared to file order, while load() later builds self.data in raw row order. If conversation_ids are not already lexicographically sorted, conversation_metadata["samples"] ordering can diverge from load_sample() indices, causing MultiTurnStrategy to issue the wrong samples/turns. Consider using groupby(..., sort=False) and/or explicitly constructing a dense sample_index in file order that is guaranteed to match self.data indices.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some confusion around the issue policy, lets sync today.

Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment on lines +155 to +159
**Sizing guide**:

- Small (< 50 convs): `target_concurrency: 32`
- Medium (50-500 convs): `target_concurrency: 64`
- Large (500+ convs): `target_concurrency: 96` or higher
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the metrics for these suggestions? For Small (< 50 convs) does a concurrency of 32 give better performance? I would assume the model size/server capability also plays a role in the concurrency settings.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this part. Only keeping concurrency definition.

Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
workers: 16 # More workers for parallel conversations
```

### Long Conversations
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarify with "conversations with large number of turns" - a long conversation can also mean small number of long turns as end to end time could also be large there.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Removed this exact phrasing.

Comment on lines +216 to +218
**Problem**: Your dataset doesn't follow a valid role sequence.

**Fix**: Check your JSONL. Valid sequences:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a utility that parses the dataset to make sure it is compliant so devs can use it instead of running the benchmark for testing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation to use scripts/validate_jsonl_schema.py for validation

)

# Maps query_id -> conversation_id for routing completions.
self._inflight: dict[str, str] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +129 to +135
tasks = [
asyncio.create_task(
self._conv_pipeline(conv_id, turns, phase_issuer),
name=f"mt-pipeline-{conv_id}",
)
for conv_id, turns in conv_samples.items()
]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one task for each conversation?

Comment on lines +110 to +127
def test_encode_request_produces_valid_json_bytes():
"""encode_request returns bytes that msgspec can decode back."""
messages = [{"role": "user", "content": "Hello"}]
query = Query(
id="q2",
data={
"model": "m",
"messages": messages,
"max_completion_tokens": 64,
"stream": False,
},
)
request = OpenAIAdapter.to_endpoint_request(query)
encoded = OpenAIAdapter.encode_request(request)

assert isinstance(encoded, bytes)
decoded = msgspec.json.decode(encoded)
assert decoded["messages"][0]["role"] == "user"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the multi-turn testing?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, more specifically for tool call handling

Comment on lines +180 to +182
# Acquire concurrency slot before issuing.
if self._sem is not None:
await self._sem.acquire()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that needs fixing/clarification. As-is, this would mean that we have multiple conversations open, with N of them (N being the concurrency) sending requests. If i understand correctly, we only want to have N conversations open, with a turn from each currently being processed.
Launching tasks for each conversation will have this drawback - we cannot schedule tasks from a subset of conversations as all conversations will be available to run.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to advance turn on on_sample_complete. Only N conversations are kept open, and a new conversation only starts when an existing one finishes.

Comment on lines +229 to +230
if self._sem is not None:
self._sem.release()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to issue turns from a conversation before moving onto a new conversation, this needs to move to the sample_complete request.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored. Same as above



@dataclass(frozen=True, slots=True)
@dataclass(frozen=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

tianmu-li and others added 4 commits May 4, 2026 11:09
…Strategy

target_concurrency now limits active conversations (not in-flight requests).
N worker tasks pull from asyncio.Queue, each processing one full conversation
before taking the next. Also adds slots=True back to PhaseConfig and sort=False
to groupby for file-order preservation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n implementation

- openai_adapter: normalize null content to "" instead of literal "None"
  to avoid polluting conversation history in tool-calling responses
- multi_turn_dataset: validate tool_results entries have required
  tool_call_id and content fields; raise InputValidationError at load time
- multi_turn_dataset: remove unused "index" field from samples metadata
- multi_turn_strategy: wrap mark_turn_complete/mark_turn_failed in
  try/except KeyError in on_sample_complete
- multi_turn_strategy: clear _inflight at end of execute() with warning
  if entries remain (transport failure or session abort)
- docs: remove prescriptive concurrency sizing guide; replace with
  definition of what target_concurrency controls
- docs: rename "Long Conversations" to "Conversations with Many Turns"
- docs: add dataset validation utility reference in Troubleshooting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix refusal field set to literal string "None" instead of "" in
  openai_adapter.py — made downstream refusal checks incorrectly truthy
- Add test_pipeline_error_propagated to verify execute() re-raises
  worker exceptions instead of swallowing them via gather(return_exceptions=True)
- Clarify MultiTurnStrategy docstring and MULTI_TURN_QUICKSTART.md:
  target_concurrency = simultaneous conversations (not requests);
  each active conversation has exactly 1 in-flight turn at a time
- Remove unjustified "Common Configurations" section from quickstart
- Correct misleading "workers = concurrent conversations" tip; clarify
  client.workers and target_concurrency are independent layers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ategy

Rewrites MultiTurnStrategy to issue subsequent turns synchronously inside
on_sample_complete() (zero event-loop delay), removing pre-spawned worker
tasks and per-conversation asyncio.Event waiting. ConversationState no
longer holds an asyncio.Event; sequencing is driven entirely by the
strategy. Addresses PR mlcommons#285 reviewer request to move turn issuance into
the sample-complete handler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 4, 2026 22:17
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 43 out of 44 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

src/inference_endpoint/endpoint_client/adapter_protocol.py:134

  • parse_sse_chunk now appends whatever decode_sse_message returns, including None (e.g., when an SSE message has no choices). This contradicts the docstring (“Silently ignores non-content SSE messages”) and forces downstream accumulators to defensively handle None. Consider filtering out None return values here (and/or handling exceptions per JSON doc) so call sites only see meaningful chunk objects.
    def parse_sse_chunk(cls, buffer: bytes, end_pos: int) -> list[Any]:
        """
        Parse SSE chunk and extract all chunk objects.

        Extracts JSON documents from SSE stream and decodes them to chunk objects.
        Silently ignores non-content SSE messages (role, finish_reason, etc).

        Args:
            buffer: Byte buffer containing SSE data
            end_pos: End position in buffer to parse up to

        Returns:
            List of chunk objects extracted from the SSE chunk
        """
        json_docs = cls.SSE_DATA_PATTERN.findall(buffer[:end_pos])
        parsed_contents = []

        try:
            for json_doc in json_docs:
                content = cls.decode_sse_message(json_doc)
                parsed_contents.append(content)
        except Exception:
            # Normal for non-content SSE messages (role, finish_reason, etc)
            pass

        return parsed_contents

src/inference_endpoint/core/types.py:242

  • The Query docstring’s gc=False note only mentions data/headers, but Query now also has a mutable metadata dict. To avoid future misuse (and to match the more explicit QueryResult guidance), consider updating this note to include metadata as well and/or adding an AT-RISK (gc=False) warning that data/metadata/headers must not be mutated to introduce cycles.
    Attributes:
        id: Unique identifier for this query (auto-generated UUID).
        data: Request payload as a dictionary (typically contains prompt, model, etc.).
        metadata: Internal metadata that round-trips through transport (e.g., conversation_id).
        headers: HTTP headers to include in the request (e.g., authorization).
        created_at: Timestamp when query was created (seconds since epoch).

    Example:
        >>> query = Query(
        ...     data={"prompt": "Hello", "model": "Qwen/Qwen3-8B", "max_tokens": 100},
        ...     headers={"Authorization": "Bearer token123"},
        ... )

    Note:
        gc=False: Safe because data/headers are simple key-value pairs without cycles.
        Do NOT store self-referential or cyclic structures in data/headers fields.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 44 to 50
class SSEDelta(msgspec.Struct, frozen=True, kw_only=True, omit_defaults=True, gc=False): # type: ignore[call-arg]
"""SSE delta object containing content."""

content: str = ""
reasoning: str = ""
tool_calls: list[dict[str, Any]] | None = None

raise ValueError(
f"Conversation {conv_id} has invalid role sequence at turn "
f"{row['turn']}: got '{role}' after state '{state}'"
)
@tianmu-li
Copy link
Copy Markdown
Collaborator Author

Hi @arekay-nv, I've addressed the comments. Main change is refactoring to an event-based model and limiting number of active conversations to concurrency. Appreciate it if you could take another look.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there - the only issue is the stickiness of the client-turns.

Comment on lines +548 to +552
def _on_sample_complete(result: QueryResult) -> None:
if multi_turn_strategy is not None:
multi_turn_strategy.on_sample_complete(result)
collector.on_complete_hook(result)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create two versions of this and pass it to benchmark session conditionally. This way, we are not checking the strategy on each sample-complete event.

Comment on lines +68 to +73
class ConversationMode(str, Enum):
"""Multi-turn conversation scheduling modes."""

INDEPENDENT = "independent" # Per-conv pipelines; no cross-conv turn barrier


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Can we remove it until we have two different modes.

Comment on lines +130 to +153
class AddDefaultColumns(Transform):
"""Add columns only where values are missing (NaN or absent).

Unlike AddStaticColumns which unconditionally overwrites, this preserves
existing non-null values — dataset per-row overrides take precedence over
the supplied defaults.
"""

def __init__(self, data: dict[str, Any]):
"""Initialize the AddDefaultColumns transform."""
self.data = data

def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing columns with defaults without overwriting existing values."""
for key, value in self.data.items():
if value is None:
continue
if key in df.columns:
df[key] = df[key].where(pd.notna(df[key]), value)
else:
df[key] = value
return df


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be implemented by using AddStaticColumn and a boolean indicating whether to overwrite or not?

else:
return "<EMPTY>"

def with_metadata(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for anything other than conversation id? Can we use the query-id for this instead of having to wire the metadata through the whole rount-trip.

@pytest.mark.unit
def test_multi_turn_valid_config(self):
config = BenchmarkConfig(**self._make_online_multi_turn(concurrency=16))
from inference_endpoint.config.schema import LoadPatternType
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move import to top


@pytest.mark.unit
def test_multi_turn_uses_dataset_size_ignoring_duration(self):
from inference_endpoint.config.runtime_settings import RuntimeSettings
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move import to top.

@arekay-nv
Copy link
Copy Markdown
Collaborator

@viraatc can you check the perf implications - there might be non-negligible overhead for non-agentic/multi-turn workloads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants