[Feat][Router] Add conditional disaggregated routing with threshold-based request classification#890
Conversation
…ased request classification Add routing_threshold parameter to DisaggregatedPrefillRouter that routes requests based on estimated input token count. Requests with input tokens above the threshold go through the DPD path (remote prefill on prefiller, then stream from decoder). Requests at or below the threshold bypass DPD and are sent directly to the decoder for local chunked prefill, which is faster for short inputs. Key changes: - Token estimation via char/4 heuristic (string prompts, chat messages) or direct length (token ID lists) - Configurable --routing-threshold CLI arg (default 0 = always disaggregate, preserving backward compatibility) - Round-robin load balancing across multiple prefill and decode endpoints for xPyD topology support - Direct-to-decoder path for short-input requests - kv_transfer_params forwarding from prefiller to decoder response for NixlConnector/LMCache DPD protocol support Validated with 410+ workload configurations on P5 (8xH100) cross-node DPD. Zero regression vs NixlConnector on TTFT, ITL, E2E latency, and throughput. Recommended threshold: 4096 for 70B models on P5. 18 unit tests covering token estimation, conditional routing, round-robin selection, and backward compatibility. All 28 existing tests continue to pass. Signed-off-by: Xuan Lu <xuanlubio@gmail.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the DisaggregatedPrefillRouter by introducing conditional routing, optimizing request handling based on input length. Short requests can now bypass the remote prefill step, leading to reduced latency, while longer requests continue to leverage disaggregated prefill. The update also incorporates round-robin load balancing for improved endpoint utilization and includes necessary adjustments for advanced DPD protocols like NixlConnector, making the routing system more efficient and scalable. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request enhances the DisaggregatedPrefillRouter by introducing conditional disaggregation and round-robin load balancing. A new routing_threshold parameter allows requests with estimated input tokens below the threshold to bypass the Disaggregated Prefill/Decode (DPD) path and be routed directly to a decoder for local prefill. The router now includes logic to estimate input tokens from various request formats and selects prefill/decode endpoints using a round-robin strategy. Additionally, support for NixlConnector-based DPD is added by forwarding kv_transfer_params between prefill and decode stages. The review comments suggest improving the conciseness of total_chars calculation in _estimate_input_tokens and refactoring duplicated streaming and error handling logic in _route_direct_to_decoder to reduce code duplication, along with removing an unused parameter.
| elif isinstance(content, list): | ||
| for part in content: | ||
| if isinstance(part, dict) and part.get("type") == "text": | ||
| total_chars += len(part.get("text", "")) |
There was a problem hiding this comment.
The logic for calculating total_chars from a list of message parts can be made more concise and Pythonic by using a generator expression with sum(). This improves readability and maintainability.
| elif isinstance(content, list): | |
| for part in content: | |
| if isinstance(part, dict) and part.get("type") == "text": | |
| total_chars += len(part.get("text", "")) | |
| elif isinstance(content, list): | |
| total_chars += sum( | |
| len(part.get("text", "")) | |
| for part in content | |
| if isinstance(part, dict) and part.get("type") == "text" | |
| ) |
| async def _route_direct_to_decoder( | ||
| request: Request, | ||
| endpoint: str, | ||
| request_json: dict, | ||
| request_id: str, | ||
| background_tasks: BackgroundTasks, | ||
| ): | ||
| """Route request directly to decoder, bypassing DPD. | ||
|
|
||
| Used for short-input requests where local chunked prefill on the decoder | ||
| is faster than remote prefill + KV transfer. | ||
| """ | ||
| st = time.time() | ||
|
|
||
| async def generate_stream(): | ||
| try: | ||
| async for chunk in send_request_to_decode( | ||
| request.app.state.decode_client, endpoint, request_json, request_id | ||
| ): | ||
| yield chunk | ||
| except aiohttp.ClientResponseError as e: | ||
| logger.error(f"HTTP error in direct-to-decoder: {e}", exc_info=True) | ||
| error_response = { | ||
| "error": { | ||
| "message": f"Decoder error: {e.message}", | ||
| "type": "decoder_error", | ||
| "code": e.status, | ||
| } | ||
| } | ||
| yield json.dumps(error_response).encode("utf-8") | ||
| except Exception as e: | ||
| logger.error(f"Unexpected error in direct-to-decoder: {e}", exc_info=True) | ||
| error_response = { | ||
| "error": { | ||
| "message": f"Decoder error: {str(e)}", | ||
| "type": "decoder_error", | ||
| "code": 500, | ||
| } | ||
| } | ||
| yield json.dumps(error_response).encode("utf-8") | ||
|
|
||
| curr_time = time.time() | ||
| logger.info( | ||
| f"Direct-to-decoder request {request_id} to " | ||
| f"{request.app.state.decode_client._base_url} at {curr_time}, " | ||
| f"process time = {curr_time - st:.4f}" | ||
| ) | ||
|
|
||
| return StreamingResponse( | ||
| generate_stream(), | ||
| media_type="application/json", | ||
| headers={"X-Request-Id": request_id}, | ||
| ) |
There was a problem hiding this comment.
The new function _route_direct_to_decoder contains a significant amount of logic that is duplicated from the generate_stream async generator within route_disaggregated_prefill_request (lines 855-897), particularly the error handling and streaming logic.
To improve maintainability and reduce code duplication, consider refactoring this shared logic into a separate helper function. This new function could handle streaming from the decoder and error reporting, and then be called by both _route_direct_to_decoder and route_disaggregated_prefill_request.
Additionally, the background_tasks parameter in _route_direct_to_decoder is unused and can be removed.
Add multi-endpoint load balancing (round-robin and least-loaded strategies) for disaggregated prefill/decode routing across xPyD topologies. Add LMCache DPD connector support with disagg_spec protocol, decoder registry for dynamic per-request endpoint selection, and ret_first_tok handling. Changes: - Add load_balancing_strategy param with least_loaded fallback to round-robin - Add _build_disagg_spec() for dynamic xPyD decoder registry lookup - Add base_url param to send_request_to_prefiller/decode for shared HTTP client - Add LMCache protocol: disagg_spec injection, first token extraction - Add CLI args: --load-balancing-strategy, --disagg-connector-type, --decoder-host/init-port/alloc-port, --decoder-registry - Fix >= boundary condition on routing threshold (was > causing regression) - Add least-loaded and xPyD unit tests Signed-off-by: Xuan Lu <xuanlubio@gmail.com>
9ac4317 to
6618cfb
Compare
Add conditional disaggregated prefill/decode (DPD) routing and multi-endpoint load balancing to DisaggregatedPrefillRouter.
Conditional Routing
Add
routing_thresholdparameter that routes requests based on estimated input token count. Requests at or above the threshold go through the DPD path (remote prefill, then stream from decoder). Requests below the threshold bypass DPD and go directly to the decoder for local chunked prefill, which is faster for short inputs.--routing-thresholdCLI arg (default 0 = always disaggregate, preserving backward compatibility)_route_direct_to_decoder()for short-input requestskv_transfer_paramsforwarding from prefiller to decoder for NixlConnector DPD support>=boundary: requests exactly at threshold go through DPDxPyD Load Balancing
Add multi-endpoint load balancing across prefill and decode pools for xPyD topologies (e.g., 2P:1D, 1P:2D).
--load-balancing-strategyCLI arg:round_robin(default) orleast_loadedengine_stats(running + queued requests), falls back to round-robin when stats unavailable--disagg-connector-typeCLI arg:nixl(default) orlmcachedisagg_specinjection,ret_first_tokhandling, first token extraction and prompt augmentation--decoder-registryJSON arg for xPyD mode with per-decoder NIXL metadata--decoder-host/init-port/alloc-portfor single-decoder legacy mode_build_disagg_spec()for dynamic per-request decoder spec constructionbase_urlparameter onsend_request_to_prefiller/send_request_to_decodefor shared HTTP clientTests
_build_disagg_spec(static/dynamic/fallback), round-robin distribution across 2P:4D topology, disagg_spec-decoder consistency-swhen doinggit commitchanges, such as
[Bugfix],[Feat], and[CI].Detailed Checklist (Click to Expand)
Thank you for your contribution to production-stack! Before submitting the pull request, please ensure the PR meets the following criteria. This helps us maintain the code quality and improve the efficiency of the review process.
PR Title and Classification
Please try to classify PRs for easy understanding of the type of changes. The PR title is prefixed appropriately to indicate the type of change. Please use one of the following:
[Bugfix]for bug fixes.[CI/Build]for build or continuous integration improvements.[Doc]for documentation fixes and improvements.[Feat]for new features in the cluster (e.g., autoscaling, disaggregated prefill, etc.).[Router]for changes to thevllm_router(e.g., routing algorithm, router observability, etc.).[Misc]for PRs that do not fit the above categories. Please use this sparingly.Note: If the PR spans more than one category, please include all relevant prefixes.
Code Quality
The PR need to meet the following code quality standards:
pre-committo format your code. SeeREADME.mdfor installation.DCO and Signed-off-by
When contributing changes to this project, you must agree to the DCO. Commits must include a
Signed-off-by:header which certifies agreement with the terms of the DCO.Using
-swithgit commitwill automatically add this header.What to Expect for the Reviews
We aim to address all PRs in a timely manner. If no one reviews your PR within 5 days, please @-mention one of YuhanLiu11
, Shaoting-Feng or ApostaC.