Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions doc/source/llm/doc_code/serve/multi_gpu/dp_autoscaling_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
This file serves as a documentation example and CI test for autoscaling data parallel attention deployment.

Structure:
1. Monkeypatch setup: Ensures serve.run is non-blocking and removes accelerator requirements for CI testing.
2. Docs example (between __dp_autoscaling_example_start/end__): Embedded in Sphinx docs via literalinclude.
3. Test validation (deployment status polling + cleanup)
"""

import time
from ray import serve
from ray.serve.schema import ApplicationStatus
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve import llm

_original_serve_run = serve.run
_original_build_dp_openai_app = llm.build_dp_openai_app


def _non_blocking_serve_run(app, **kwargs):
"""Forces blocking=False for testing"""
kwargs["blocking"] = False
return _original_serve_run(app, **kwargs)


def _testing_build_dp_openai_app(builder_config, **kwargs):
"""Removes accelerator requirements for testing"""
if "llm_config" in builder_config:
config = builder_config["llm_config"]
if hasattr(config, "accelerator_type") and config.accelerator_type is not None:
config.accelerator_type = None
return _original_build_dp_openai_app(builder_config, **kwargs)


serve.run = _non_blocking_serve_run
llm.build_dp_openai_app = _testing_build_dp_openai_app


from ray import serve
from ray.serve.llm import LLMConfig, build_dp_openai_app

# __dp_autoscaling_example_start__
config = LLMConfig(
model_loading_config={
"model_id": "microsoft/Phi-tiny-MoE-instruct"
},
deployment_config={
"num_replicas": "auto",
"autoscaling_config": {
"min_replicas": 1, # Min number of DP groups
"max_replicas": 2, # Max number of DP groups
"initial_replicas": 1, # Initial number of DP groups
# Other Ray Serve autoscaling knobs still apply
"upscale_delay_s": 0.1,
"downscale_delay_s": 2,
},
},
engine_kwargs={
"data_parallel_size": 2, # Number of DP replicas
"tensor_parallel_size": 1, # TP size per replica
"max_model_len": 1024,
"max_num_seqs": 32,
},
)

app = build_dp_openai_app({
"llm_config": config
})
# __dp_autoscaling_example_end__

serve.run(app, blocking=True)

status = ApplicationStatus.NOT_STARTED
timeout_seconds = 300
start_time = time.time()

while (
status != ApplicationStatus.RUNNING and time.time() - start_time < timeout_seconds
):
status = serve.status().applications[SERVE_DEFAULT_APP_NAME].status

if status in [ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY]:
raise AssertionError(f"Deployment failed with status: {status}")

time.sleep(1)

if status != ApplicationStatus.RUNNING:
raise AssertionError(
f"Deployment failed to reach RUNNING status within {timeout_seconds}s. Current status: {status}"
)

serve.shutdown()

7 changes: 3 additions & 4 deletions doc/source/llm/doc_code/serve/multi_gpu/dp_basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ def _testing_build_dp_openai_app(builder_config, **kwargs):
model_loading_config={
"model_id": "microsoft/Phi-tiny-MoE-instruct"
},
deployment_config={
"num_replicas": 2
},
engine_kwargs={
"data_parallel_size": 2, # Number of DP replicas
"tensor_parallel_size": 1, # TP size per replica
# Reduced for CI compatibility
"max_model_len": 1024,
"max_num_seqs": 32,
},
experimental_configs={
# This is a temporary required config. We will remove this in future versions.
"dp_size_per_node": 2, # DP replicas per node
},
)

app = build_dp_openai_app({
Expand Down
6 changes: 0 additions & 6 deletions doc/source/llm/doc_code/serve/multi_gpu/dp_pd_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ def _testing_build_dp_deployment(llm_config, **kwargs):
"max_model_len": 1024,
"max_num_seqs": 32,
},
experimental_configs={
"dp_size_per_node": 2,
},
)

# Configure decode with data parallel attention
Expand All @@ -91,9 +88,6 @@ def _testing_build_dp_deployment(llm_config, **kwargs):
"max_model_len": 1024,
"max_num_seqs": 32,
},
experimental_configs={
"dp_size_per_node": 2,
},
)

# Build prefill and decode deployments with DP
Expand Down
92 changes: 92 additions & 0 deletions doc/source/serve/advanced-guides/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,98 @@ You can also configure each option individually. The following table details the

You may want to enable throughput-optimized serving while customizing the options above. You can do this by setting `RAY_SERVE_THROUGHPUT_OPTIMIZED=1` and overriding the specific options. For example, to enable throughput-optimized serving and continue logging to stderr, you should set `RAY_SERVE_THROUGHPUT_OPTIMIZED=1` and override with `RAY_SERVE_LOG_TO_STDERR=1`.

(serve-haproxy)=
### Use HAProxy load balancing

By default, Ray Serve uses a Python-based HTTP/gRPC proxy to route requests to replicas. You can replace this with [HAProxy](https://www.haproxy.org/), a high-performance C-based load balancer, for improved throughput and lower latency at high request rates.

When HAProxy mode is enabled:
- An `HAProxyManager` actor runs on each node (by default) and translates Serve's routing table into HAProxy configuration reloads.
- Each ingress replica opens a port, and HAProxy routes traffic directly to replicas, replacing the Python proxy entirely.
- Live traffic flows through the HAProxy subprocess, not through any Python actor.

#### Prerequisites

HAProxy must be installed and available on `$PATH` as `haproxy` on every node that runs a Serve proxy. The [official Ray Docker images](https://hub.docker.com/r/rayproject/ray) (2.55+) include HAProxy pre-built. No additional installation is needed when using `rayproject/ray` images.

#### Enabling HAProxy

Set the `RAY_SERVE_ENABLE_HA_PROXY` environment variable to `1` on all nodes before starting Ray:

```bash
export RAY_SERVE_ENABLE_HA_PROXY=1
```

This environment variable must be set on all nodes in the ray cluster.

::::{tab-set}

:::{tab-item} KubeRay
```yaml
# In the Ray container spec for head and worker groups:
env:
- name: RAY_SERVE_ENABLE_HA_PROXY
value: "1"
```
:::

:::{tab-item} VM cluster
```bash
# On every node (head and workers)
export RAY_SERVE_ENABLE_HA_PROXY=1
ray start --head # or ray start --address=<head-ip>:6379 on workers
```
:::

::::

#### Installing HAProxy manually (example)

If you are not using the official Ray Docker images, install HAProxy 2.8+ from source on every node. These steps are provided as an example only. In the future, HAProxy will be bundled with the `ray` Python package.

The following steps are for Ubuntu/Debian:
```bash
# Install build dependencies
apt-get update -y && apt-get install -y --no-install-recommends \
build-essential ca-certificates curl libc6-dev \
liblua5.3-dev libpcre3-dev libssl-dev zlib1g-dev

# Build HAProxy from source
export HAPROXY_VERSION="2.8.12"
curl -sSfL -o /tmp/haproxy.tar.gz \
"https://www.haproxy.org/download/2.8/src/haproxy-${HAPROXY_VERSION}.tar.gz"
mkdir -p /tmp/haproxy-build && tar -xzf /tmp/haproxy.tar.gz -C /tmp/haproxy-build --strip-components=1
make -C /tmp/haproxy-build TARGET=linux-glibc \
USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1 -j$(nproc)
make -C /tmp/haproxy-build install SBINDIR=/usr/local/bin
rm -rf /tmp/haproxy-build /tmp/haproxy.tar.gz

# Install runtime dependencies
apt-get install -y --no-install-recommends socat liblua5.3-0

# Create required directories
mkdir -p /etc/haproxy /run/haproxy /var/log/haproxy
```

The required build flags are `USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1`. The runtime dependencies are `socat` (for the admin socket) and `liblua5.3-0` (Lua runtime library).

(serve-interdeployment-grpc)=
### Use gRPC for interdeployment communication

By default, when one deployment calls another via a `DeploymentHandle`, requests are sent through Ray's actor RPC system. You can switch this internal transport to gRPC by setting `RAY_SERVE_USE_GRPC_BY_DEFAULT=1` on all nodes before starting Ray. This makes all `DeploymentHandle` calls use gRPC transport, which serializes requests and sends them directly to the target replica's gRPC server. gRPC transport is most beneficial for high-throughput workloads with small payloads (under ~1 MB), where bypassing Ray's object store reduces per-request overhead.

#### When not to use gRPC

1. Since gRPC is required to serialize every payload, it should not be used for large payloads (greater than ~1 MB). If gRPC was enabled by default, individual handles' transport mechanism can be manually set to actor RPC with `handle.options(_by_reference=True)`, and passes larger objects by reference.

2. When passing a `DeploymentResponse` from one deployment into another (i.e., without `await`-ing it first), gRPC resolves the value at the caller and serializes it over the wire. With actor RPC, the underlying `ObjectRef` is forwarded without materializing the data. If your pipeline chains `DeploymentResponse` objects through multiple deployments with payload sizes >100KB, avoid using gRPC for those handles.

```{literalinclude} ../doc_code/interdeployment_grpc.py
:start-after: __start_grpc_override__
:end-before: __end_grpc_override__
:language: python
```

## Debugging performance issues in controller

The Serve Controller runs on the Ray head node and is responsible for a variety of tasks,
Expand Down
32 changes: 32 additions & 0 deletions doc/source/serve/doc_code/interdeployment_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# flake8: noqa

from ray import serve
from ray.serve.handle import DeploymentHandle

# __start_grpc_override__
@serve.deployment
class Caller:
def __init__(self, target: DeploymentHandle):
# Override this specific handle to use actor RPC instead of gRPC.
# This is useful for large payloads (over ~1 MB) where passing
# objects by reference through Ray's object store is more efficient.
self._target = target.options(_by_reference=True)

async def __call__(self, data: bytes) -> str:
return await self._target.remote(data)


@serve.deployment
class LargePayloadProcessor:
def __call__(self, data: bytes) -> str:
return f"processed {len(data)} bytes"


processor = LargePayloadProcessor.bind()
app = Caller.bind(processor)

handle: DeploymentHandle = serve.run(app)
assert handle.remote(b"x" * 1024).result() == "processed 1024 bytes"
# __end_grpc_override__

serve.shutdown()
Loading
Loading