From e61f0df9d0293636c59c30165a793e687d663946 Mon Sep 17 00:00:00 2001 From: Limark Dcunha <83493294+limarkdcunha@users.noreply.github.com> Date: Sun, 15 Mar 2026 14:29:55 -0400 Subject: [PATCH] throttle application status gauge reporting in control loop (#61603) ## Description Application status metrics were being reported on every control loop iteration which is redundant when the status hasn't changed. Introduced a cache the call when the application status value is unchanged and was reported recently, following the same pattern used for replica health gauges (_set_health_gauge / _health_gauge_cache). "Closes #61565". --------- Signed-off-by: Limark Dcunha --- .../ray/serve/_private/application_state.py | 28 +++++++++++++++++-- python/ray/serve/_private/constants.py | 11 ++++---- python/ray/serve/_private/deployment_state.py | 4 +-- python/ray/serve/tests/BUILD.bazel | 4 +-- .../serve/tests/unit/test_deployment_state.py | 4 +-- 5 files changed, 37 insertions(+), 14 deletions(-) diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index 234c56cf342c..3ecc7213c10e 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -28,6 +28,7 @@ DEFAULT_AUTOSCALING_POLICY_NAME, DEFAULT_REQUEST_ROUTER_PATH, RAY_SERVE_ENABLE_TASK_EVENTS, + RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S, SERVE_LOGGER_NAME, ) from ray.serve._private.deploy_utils import ( @@ -1192,6 +1193,8 @@ def __init__( tag_keys=("application",), ) + self._app_status_gauge_cache: Dict[str, Tuple[int, float]] = {} + self._recover_from_checkpoint() def _recover_from_checkpoint(self): @@ -1460,16 +1463,35 @@ def update(self): logger.debug(f"Application '{name}' deleted successfully.") # Record application status metrics + now = time.time() for name, app in self._application_states.items(): - self._application_status_gauge.set( - app.status.to_numeric(), - tags={"application": name}, + cached = self._app_status_gauge_cache.get(name) + value = app.status.to_numeric() + + # Throttle gauge reporting to avoid redundant FFI calls each control loop. + # Two independent conditions trigger a write: + # - value_changed: reports status transitions (e.g. DEPLOYING -> RUNNING) + # immediately, without waiting for the interval to expire. + # - interval_elapsed: refreshes the gauge periodically even when status is + # unchanged, preventing stale/empty time series in Grafana/Prometheus. + # We skip ONLY when both say it is safe — value unchanged AND reported recently. + value_changed = cached is None or cached[0] != value + interval_elapsed = ( + cached is None + or (now - cached[1]) >= RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S ) + if not value_changed and not interval_elapsed: + continue + + self._application_status_gauge.set(value, tags={"application": name}) + self._app_status_gauge_cache[name] = (value, now) + if len(apps_to_be_deleted) > 0: for app_name in apps_to_be_deleted: self._autoscaling_state_manager.deregister_application(app_name) del self._application_states[app_name] + self._app_status_gauge_cache.pop(app_name, None) ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) if any_target_state_changed: diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 0dea590fe608..e2d6798341aa 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -419,11 +419,12 @@ "RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0" ) -# How often (in seconds) the controller re-records an unchanged health-check -# gauge value for each replica. Setting this to 0 disables caching (every loop -# iteration records the gauge, matching pre-optimization behavior). -RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S = get_env_float_non_negative( - "RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S", 10.0 +# How often (in seconds) the controller re-records an unchanged status gauge +# value for replicas and applications. Setting this to 0 disables caching +# (every control loop iteration records the gauge, matching pre-optimization +# behavior). +RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S = get_env_float_non_negative( + "RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S", 10.0 ) # Initial deadline for queue length responses in the router. diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 0303806fa940..162f5e291415 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -46,7 +46,7 @@ RAY_SERVE_ENABLE_TASK_EVENTS, RAY_SERVE_FAIL_ON_RANK_ERROR, RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS, - RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S, + RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S, RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY, REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD, REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS, @@ -3572,7 +3572,7 @@ def _set_health_gauge(self, replica_unique_id: str, value: int) -> None: if ( cached is not None and cached[0] == value - and (now - cached[1]) < RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S + and (now - cached[1]) < RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S ): return self.health_check_gauge.set(value, tags={"replica": replica_unique_id}) diff --git a/python/ray/serve/tests/BUILD.bazel b/python/ray/serve/tests/BUILD.bazel index f4301fe398eb..2ce49d636e5d 100644 --- a/python/ray/serve/tests/BUILD.bazel +++ b/python/ray/serve/tests/BUILD.bazel @@ -277,7 +277,7 @@ py_test_module_list( "RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S": "5", "RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S": "1", "RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS": "10", - "RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S": "0.1", + "RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S": "0.1", }, files = [ "test_deploy_app.py", @@ -696,7 +696,7 @@ py_test_module_list( env = { "RAY_SERVE_ENABLE_HA_PROXY": "1", "RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S": "0.01", - "RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S": "0.1", + "RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S": "0.1", }, files = [ "test_haproxy.py", diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 5cd49f7cf0e3..5b5d360700b4 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -30,7 +30,7 @@ DEFAULT_HEALTH_CHECK_TIMEOUT_S, DEFAULT_MAX_ONGOING_REQUESTS, RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, - RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S, + RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S, ) from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.deployment_state import ( @@ -1707,7 +1707,7 @@ def counting_set(*args, **kwargs): # After the TTL expires, the gauge should be re-reported even though # the value hasn't changed. - timer.advance(RAY_SERVE_REPLICA_HEALTH_GAUGE_REPORT_INTERVAL_S + 1) + timer.advance(RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S + 1) dsm.update() assert call_count == len(replica_ids), ( f"Gauge.set was called {call_count} times after TTL expired; "