diff --git a/utils/_remote_config.py b/utils/_remote_config.py index c8dd28f7976..18d5efc040f 100644 --- a/utils/_remote_config.py +++ b/utils/_remote_config.py @@ -3,10 +3,10 @@ # Copyright 2021 Datadog, Inc. import base64 +import contextlib import hashlib import json import re -import time import uuid from typing import Any, Literal from collections.abc import Mapping @@ -82,6 +82,12 @@ def send_state( assert api_enabled, f"Remote config API is not enabled on {context.scenario}" + # Collect runtime_ids of all processes currently polling RC so we can wait for each one to acknowledge. + known_runtime_ids: set[str] = set() + for _d in library.get_data(path_filters="/v0.7/config"): + with contextlib.suppress(KeyError, TypeError): + known_runtime_ids.add(_d["request"]["content"]["client"]["client_tracer"]["runtime_id"]) + if target == "backend": assert backend_enabled, f"Remote config backend is not enabled on {context.scenario}" # Build protobuf on test runner side, send bytes to proxy @@ -107,41 +113,56 @@ def send_state( "apply_error": "", } - state = {} + acknowledged_runtime_ids: set[str] = set() + + def _all_processes_acknowledged() -> bool: + return not known_runtime_ids or acknowledged_runtime_ids >= known_runtime_ids def remote_config_applied(data: dict) -> bool: - nonlocal state if data["path"] != "/v0.7/config": return False - state = data.get("request", {}).get("content", {}).get("client", {}).get("state", {}) + client_state = data.get("request", {}).get("content", {}).get("client", {}).get("state", {}) + + try: + rid = data["request"]["content"]["client"]["client_tracer"]["runtime_id"] + except (KeyError, TypeError): + rid = None + if len(client_configs) == 0: - found = state["targets_version"] == state_version and state.get("config_states", []) == [] + found = client_state.get("targets_version") == state_version and client_state.get("config_states", []) == [] if found: - current_states.state = ApplyState.ACKNOWLEDGED - return found + if rid: + acknowledged_runtime_ids.add(rid) + if _all_processes_acknowledged(): + current_states.state = ApplyState.ACKNOWLEDGED + return True + return False - if state["targets_version"] != version: + if client_state.get("targets_version") != version: return False - config_states = state.get("config_states", []) - for state in config_states: - config_state = current_states.configs.get(state["id"]) - if config_state and state["product"] == config_state["product"]: - logger.debug(f"Remote config state: {state}") - config_state.update(state) + for cs in client_state.get("config_states", []): + config_state = current_states.configs.get(cs["id"]) + if config_state and cs["product"] == config_state["product"]: + logger.debug(f"Remote config state: {cs}") + config_state.update(cs) if wait_for_acknowledged_status: - for state in current_states.configs.values(): - if state["apply_state"] == ApplyState.UNKNOWN: + for config_state in current_states.configs.values(): + if config_state["apply_state"] == ApplyState.UNKNOWN: return False - current_states.state = ApplyState.ACKNOWLEDGED - return True + if rid: + acknowledged_runtime_ids.add(rid) + + if _all_processes_acknowledged(): + current_states.state = ApplyState.ACKNOWLEDGED + return True + + return False library.wait_for(remote_config_applied, timeout=30) - # ensure the library has enough time to apply the config to all subprocesses - time.sleep(2) return current_states