Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_REQUEST_ROUTER_PATH,
RAY_SERVE_ENABLE_TASK_EVENTS,
RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S,
SERVE_LOGGER_NAME,
)
from ray.serve._private.deploy_utils import (
Expand Down Expand Up @@ -1192,6 +1193,8 @@ def __init__(
tag_keys=("application",),
)

self._app_status_gauge_cache: Dict[str, Tuple[int, float]] = {}

self._recover_from_checkpoint()

def _recover_from_checkpoint(self):
Expand Down Expand Up @@ -1460,16 +1463,35 @@ def update(self):
logger.debug(f"Application '{name}' deleted successfully.")

# Record application status metrics
now = time.time()
for name, app in self._application_states.items():
self._application_status_gauge.set(
app.status.to_numeric(),
tags={"application": name},
cached = self._app_status_gauge_cache.get(name)
value = app.status.to_numeric()

# Throttle gauge reporting to avoid redundant FFI calls each control loop.
# Two independent conditions trigger a write:
# - value_changed: reports status transitions (e.g. DEPLOYING -> RUNNING)
# immediately, without waiting for the interval to expire.
# - interval_elapsed: refreshes the gauge periodically even when status is
# unchanged, preventing stale/empty time series in Grafana/Prometheus.
# We skip ONLY when both say it is safe — value unchanged AND reported recently.
value_changed = cached is None or cached[0] != value
interval_elapsed = (
cached is None
or (now - cached[1]) >= RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S
)

if not value_changed and not interval_elapsed:
continue

self._application_status_gauge.set(value, tags={"application": name})
self._app_status_gauge_cache[name] = (value, now)

if len(apps_to_be_deleted) > 0:
for app_name in apps_to_be_deleted:
self._autoscaling_state_manager.deregister_application(app_name)
del self._application_states[app_name]
self._app_status_gauge_cache.pop(app_name, None)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

if any_target_state_changed:
Expand Down
11 changes: 6 additions & 5 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,12 @@
"RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0"
)

# How often (in seconds) the controller re-records an unchanged health-check
# gauge value for each replica. Setting this to 0 disables caching (every loop
# iteration records the gauge, matching pre-optimization behavior).
RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S = get_env_float_non_negative(
"RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S", 10.0
# How often (in seconds) the controller re-records an unchanged status gauge
# value for replicas and applications. Setting this to 0 disables caching
# (every control loop iteration records the gauge, matching pre-optimization
# behavior).
RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S = get_env_float_non_negative(
"RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S", 10.0
)

# Initial deadline for queue length responses in the router.
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
RAY_SERVE_ENABLE_TASK_EVENTS,
RAY_SERVE_FAIL_ON_RANK_ERROR,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S,
RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S,
RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
Expand Down Expand Up @@ -3572,7 +3572,7 @@ def _set_health_gauge(self, replica_unique_id: str, value: int) -> None:
if (
cached is not None
and cached[0] == value
and (now - cached[1]) < RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S
and (now - cached[1]) < RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S
):
return
self.health_check_gauge.set(value, tags={"replica": replica_unique_id})
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ py_test_module_list(
"RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S": "5",
"RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S": "1",
"RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS": "10",
"RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S": "0.1",
"RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S": "0.1",
},
files = [
"test_deploy_app.py",
Expand Down Expand Up @@ -696,7 +696,7 @@ py_test_module_list(
env = {
"RAY_SERVE_ENABLE_HA_PROXY": "1",
"RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S": "0.01",
"RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S": "0.1",
"RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S": "0.1",
},
files = [
"test_haproxy.py",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
DEFAULT_HEALTH_CHECK_TIMEOUT_S,
DEFAULT_MAX_ONGOING_REQUESTS,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S,
RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S,
)
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_state import (
Expand Down Expand Up @@ -1707,7 +1707,7 @@ def counting_set(*args, **kwargs):

# After the TTL expires, the gauge should be re-reported even though
# the value hasn't changed.
timer.advance(RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S + 1)
timer.advance(RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S + 1)
dsm.update()
assert call_count == len(replica_ids), (
f"Gauge.set was called {call_count} times after TTL expired; "
Expand Down
Loading