Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions tests/test_library_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import pytest
from typing import Any

from utils import weblog, interfaces, scenarios, features
from utils.dd_types import DataDogAgentSpan, AgentTraceFormat
from utils._context.header_tag_vars import (
Expand Down Expand Up @@ -430,44 +432,81 @@ def test_trace_header_tags(self):
TRACECONTEXT_FLAGS_SET = 1 << 31


def retrieve_span_links(span: DataDogAgentSpan) -> list[dict] | None:
"""Retrieves span links from a span.
Returns the format of the span links as it may differ from the trace format emitted by the agent
"""
if span.get("spanLinks") is not None:
return span["spanLinks"]

if span.trace.format == AgentTraceFormat.efficient_trace_payload_format and span.get("links") is not None:
return span["links"]
def _non_empty_agent_span_links(span: DataDogAgentSpan) -> list[dict[str, Any]] | None:
"""Agent payloads may use `spanLinks` (all formats) or `links` (v1/idx only), and may include empty lists."""
# spanLinks is present in both legacy and v1 payloads
raw = span.get("spanLinks")
if isinstance(raw, list) and raw:
return [link for link in raw if isinstance(link, dict)]
# `links` is v1/idx-only
if span.trace.format == AgentTraceFormat.efficient_trace_payload_format:
raw = span.get("links")
if isinstance(raw, list) and raw:
return [link for link in raw if isinstance(link, dict)]
return None


def _dd_span_links_meta_entries(raw: object | None) -> list[dict[str, Any]]:
"""Tracer `_dd.span_links` is usually a JSON string; v1 / convert-traces may store a decoded list on the span."""
if raw is None:
return []
if isinstance(raw, str):
parsed: object = json.loads(raw)
elif isinstance(raw, list):
parsed = raw
else:
raise TypeError(f"Unexpected _dd.span_links type: {type(raw).__name__}")
if not isinstance(parsed, list):
return []
return [entry for entry in parsed if isinstance(entry, dict)]

span_meta = span.meta

if span_meta.get("_dd.span_links") is None:
return None

# Convert span_links tags into msgpack v0.4 format
json_links = json.loads(span_meta["_dd.span_links"])
links = []
def _span_links_from_dd_meta(json_links: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert span_links tags into msgpack v0.4-style dicts used by this file's assertions."""
links: list[dict[str, Any]] = []
for json_link in json_links:
link = {}
link["traceID"] = int(json_link["trace_id"][-16:], base=16)
link["spanID"] = int(json_link["span_id"], base=16)
if len(json_link["trace_id"]) > 16:
link["traceIDHigh"] = int(json_link["trace_id"][:16], base=16)
link: dict[str, Any] = {}
trace_id_val = json_link["trace_id"]
if isinstance(trace_id_val, str):
link["traceID"] = int(trace_id_val[-16:], base=16)
if len(trace_id_val) > 16:
link["traceIDHigh"] = int(trace_id_val[:16], base=16)
else:
link["traceID"] = int(trace_id_val) & 0xFFFFFFFFFFFFFFFF
if "trace_id_high" in json_link:
link["traceIDHigh"] = int(json_link["trace_id_high"])
span_id_val = json_link["span_id"]
link["spanID"] = int(span_id_val, base=16) if isinstance(span_id_val, str) else int(span_id_val)
if "attributes" in json_link:
link["attributes"] = json_link.get("attributes")
if "tracestate" in json_link:
link["tracestate"] = json_link.get("tracestate")
elif "trace_state" in json_link:
link["tracestate"] = json_link.get("trace_state")
if "flags" in json_link:
link["flags"] = json_link.get("flags") | TRACECONTEXT_FLAGS_SET
link["flags"] = int(json_link["flags"]) | TRACECONTEXT_FLAGS_SET
else:
link["flags"] = 0
links.append(link)
return links


def retrieve_span_links(span: DataDogAgentSpan) -> list[dict] | None:
"""Retrieves span links from a span.
Returns the format of the span links as it may differ from the trace format emitted by the agent
"""
from_raw = _non_empty_agent_span_links(span)
if from_raw is not None:
return from_raw

span_meta = span.meta
meta_entries = _dd_span_links_meta_entries(span_meta.get("_dd.span_links"))
if not meta_entries:
return None

return _span_links_from_dd_meta(meta_entries)


@scenarios.default
@features.context_propagation_extract_behavior
class Test_ExtractBehavior_Default:
Expand Down Expand Up @@ -721,8 +760,14 @@ def test_multiple_tracecontexts_with_overrides(self):
def _get_span_link_trace_id(link: dict, span_format: AgentTraceFormat) -> tuple[int, int]:
"""Returns the trace ID of a span link according to its format split into high and low 64 bits"""
if span_format == AgentTraceFormat.efficient_trace_payload_format:
trace_id_low = int(link["traceID"], 16) & 0xFFFFFFFFFFFFFFFF
trace_id_high = (int(link["traceID"], 16) >> 64) & 0xFFFFFFFFFFFFFFFF
tid = link["traceID"]
if isinstance(tid, str):
full = int(tid, 16)
trace_id_low = full & 0xFFFFFFFFFFFFFFFF
trace_id_high = (full >> 64) & 0xFFFFFFFFFFFFFFFF
else:
trace_id_low = int(tid) & 0xFFFFFFFFFFFFFFFF
trace_id_high = int(link.get("traceIDHigh", 0))
else:
trace_id_low = int(link["traceID"])
trace_id_high = int(link["traceIDHigh"])
Expand Down
58 changes: 58 additions & 0 deletions tests/test_the_test/test_deserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,64 @@ def test_uncompress_agent_v1_trace_with_span_links():
assert "tracestateRef" not in span_link


@scenarios.test_the_test
def test_uncompress_agent_v1_trace_span_links_snake_case_gets_camel_aliases():
"""Protobuf JSON may use trace_id / span_id / trace_id_high; proxy mirrors camelCase for readers."""
trace_id_bytes = bytes(
[0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0x78, 0x90, 0x12]
)
trace_id_base64 = base64.b64encode(trace_id_bytes).decode("utf-8")
chunk_trace_id_bytes = bytes(
[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x21, 0xE3]
)
chunk_trace_id_base64 = base64.b64encode(chunk_trace_id_bytes).decode("utf-8")

data = {
"idxTracerPayloads": [
{
"strings": ["", "my-service", "span-name", "web", "link-key", "link-value", "tracestate-value"],
"attributes": {},
"chunks": [
{
"traceID": chunk_trace_id_base64,
"spans": [
{
"service": "my-service",
"name_value": "span-name",
"typeRef": "web",
"attributes": {},
"links": [
{
"trace_id": trace_id_base64,
"span_id": "424242",
"trace_id_high": 99,
"attributes": {
"4": {"stringValueRef": 5},
},
"tracestateRef": 6,
"flags": 1,
}
],
}
],
"attributes": {},
}
],
}
]
}

result = _uncompress_agent_v1_trace(data, "agent")
span_link = result["idxTracerPayloads"][0]["chunks"][0]["spans"][0]["links"][0]

assert span_link["trace_id"] == "0x12345678901234567890123456789012"
assert span_link["traceID"] == "0x12345678901234567890123456789012"
assert span_link["span_id"] == "424242"
assert span_link["spanID"] == "424242"
assert span_link["trace_id_high"] == 99
assert span_link["traceIDHigh"] == 99


@scenarios.test_the_test
def test_uncompress_array_direct():
"""Test _uncompress_array with (type, value) pairs: string ref, double, bool."""
Expand Down
30 changes: 27 additions & 3 deletions utils/proxy/traces/trace_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,21 @@ def _uncompress_span_links_list(span_links: list | None, strings: list[str]) ->
return uncompressed_links


def _mirror_span_link_snake_case_to_camel_case(link: dict[str, Any]) -> None:
"""Copy protobuf / enum snake_case link fields to camelCase keys consumers expect.

``_uncompress_span_link`` may leave ``trace_id`` / ``span_id`` / ``trace_id_high`` in place
while other payloads use ``traceID`` / ``spanID`` / ``traceIDHigh``. Populate the camelCase
names when missing so agent idx traces have a single readable contract.
"""
if "traceID" not in link and "trace_id" in link:
link["traceID"] = link["trace_id"]
if "spanID" not in link and "span_id" in link:
link["spanID"] = link["span_id"]
if "traceIDHigh" not in link and "trace_id_high" in link:
link["traceIDHigh"] = link["trace_id_high"]


def _uncompress_span_events_list(span_events: list | None, strings: list[str]) -> list | None:
"""Uncompress a list of span events by converting integer keys to string keys."""
if span_events is None or not isinstance(span_events, list):
Expand Down Expand Up @@ -569,6 +584,8 @@ def _uncompress_span_link(link: dict, strings: list[str]) -> None:
if isinstance(trace_state, int) and trace_state < len(strings):
link["tracestate"] = strings[trace_state]

_mirror_span_link_snake_case_to_camel_case(link)


def _uncompress_span_event(event: dict, strings: list[str]) -> None:
"""Uncompress a span event by deserializing time, name, and attributes.
Expand Down Expand Up @@ -632,9 +649,16 @@ def _uncompress_agent_v1_trace(data: dict, interface: str):
chunk["origin"] = strings[origin_ref]
for span in chunk.get("spans", []):
span["attributes"] = _uncompress_attributes(span.get("attributes", {}), strings)
# Uncompress span links
for link in span.get("links", []):
_uncompress_span_link(link, strings)
# Uncompress span links (protobuf JSON may expose `spanLinks` or `links`)
span_links_raw: list[Any] | None = None
for key in ("links", "spanLinks"):
candidate = span.get(key)
if isinstance(candidate, list) and candidate:
span_links_raw = candidate
break
if span_links_raw is not None:
for link in span_links_raw:
_uncompress_span_link(link, strings)
# Uncompress span events (handle both camelCase and snake_case field names)
span_events = span.get("spanEvents") or span.get("span_events")
if span_events:
Expand Down
Loading