Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions utils/_remote_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# Copyright 2021 Datadog, Inc.

import base64
import contextlib
import hashlib
import json
import re
import time
import uuid
from typing import Any, Literal
from collections.abc import Mapping
Expand Down Expand Up @@ -82,6 +82,12 @@ def send_state(

assert api_enabled, f"Remote config API is not enabled on {context.scenario}"

# Collect runtime_ids of all processes currently polling RC so we can wait for each one to acknowledge.
known_runtime_ids: set[str] = set()
for _d in library.get_data(path_filters="/v0.7/config"):
with contextlib.suppress(KeyError, TypeError):
known_runtime_ids.add(_d["request"]["content"]["client"]["client_tracer"]["runtime_id"])

if target == "backend":
assert backend_enabled, f"Remote config backend is not enabled on {context.scenario}"
# Build protobuf on test runner side, send bytes to proxy
Expand All @@ -107,41 +113,56 @@ def send_state(
"apply_error": "<No known response from the library>",
}

state = {}
acknowledged_runtime_ids: set[str] = set()

def _all_processes_acknowledged() -> bool:
return not known_runtime_ids or acknowledged_runtime_ids >= known_runtime_ids

def remote_config_applied(data: dict) -> bool:
nonlocal state
if data["path"] != "/v0.7/config":
return False

state = data.get("request", {}).get("content", {}).get("client", {}).get("state", {})
client_state = data.get("request", {}).get("content", {}).get("client", {}).get("state", {})

try:
rid = data["request"]["content"]["client"]["client_tracer"]["runtime_id"]
except (KeyError, TypeError):
rid = None

if len(client_configs) == 0:
found = state["targets_version"] == state_version and state.get("config_states", []) == []
found = client_state.get("targets_version") == state_version and client_state.get("config_states", []) == []
if found:
current_states.state = ApplyState.ACKNOWLEDGED
return found
if rid:
acknowledged_runtime_ids.add(rid)
if _all_processes_acknowledged():
current_states.state = ApplyState.ACKNOWLEDGED
return True
return False

if state["targets_version"] != version:
if client_state.get("targets_version") != version:
return False

config_states = state.get("config_states", [])
for state in config_states:
config_state = current_states.configs.get(state["id"])
if config_state and state["product"] == config_state["product"]:
logger.debug(f"Remote config state: {state}")
config_state.update(state)
for cs in client_state.get("config_states", []):
config_state = current_states.configs.get(cs["id"])
if config_state and cs["product"] == config_state["product"]:
logger.debug(f"Remote config state: {cs}")
config_state.update(cs)

if wait_for_acknowledged_status:
for state in current_states.configs.values():
if state["apply_state"] == ApplyState.UNKNOWN:
for config_state in current_states.configs.values():
if config_state["apply_state"] == ApplyState.UNKNOWN:
return False

current_states.state = ApplyState.ACKNOWLEDGED
return True
if rid:
acknowledged_runtime_ids.add(rid)

if _all_processes_acknowledged():
current_states.state = ApplyState.ACKNOWLEDGED
return True

return False

library.wait_for(remote_config_applied, timeout=30)
# ensure the library has enough time to apply the config to all subprocesses
time.sleep(2)

return current_states

Expand Down
Loading