Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal sealed record AgentTraceFilterConfig(
List<string>? FilterTagsReject,
List<string>? FilterTagsRegexRequire,
List<string>? FilterTagsRegexReject,
List<string>? IgnoreResources)
List<string>? IgnoreResourcesRegex)
{
public static readonly AgentTraceFilterConfig Empty = new(null, null, null, null, null);

Expand All @@ -27,5 +27,5 @@ internal sealed record AgentTraceFilterConfig(
FilterTagsReject is { Count: > 0 } ||
FilterTagsRegexRequire is { Count: > 0 } ||
FilterTagsRegexReject is { Count: > 0 } ||
IgnoreResources is { Count: > 0 };
IgnoreResourcesRegex is { Count: > 0 };
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ private async Task ProcessDiscoveryResponse(IApiResponse response)
var clientDropP0 = jObject["client_drop_p0s"]?.Value<bool>() ?? false;
var spanMetaStructs = jObject["span_meta_structs"]?.Value<bool>() ?? false;
var spanEvents = jObject["span_events"]?.Value<bool>() ?? false;
var peerTags = (jObject["peer_tags"] as JArray)?.Values<string>().Where(x => !string.IsNullOrEmpty(x)).Distinct().OrderBy(x => x).ToList();
var peerTags = (jObject["peer_tags"] as JArray)?.Values<string>().ToList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:tears-of-joy:

var obfuscationVersion = jObject["obfuscation_version"]?.Value<int>() ?? 0;

// Parse trace filter configuration
Expand Down
2 changes: 1 addition & 1 deletion tracer/src/Datadog.Trace/Agent/IStatsAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ internal interface IStatsAggregator

Task DisposeAsync();

StatsAggregationKey BuildKey(Span span, out List<byte[]> utf8PeerTags);
StatsAggregationKey BuildKey(Span span);
}
}
4 changes: 1 addition & 3 deletions tracer/src/Datadog.Trace/Agent/NullStatsAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ public Task DisposeAsync()
return Task.CompletedTask;
}

public StatsAggregationKey BuildKey(Span span, out List<byte[]> utf8PeerTags)
public StatsAggregationKey BuildKey(Span span)
{
utf8PeerTags = [];

var rawHttpStatusCode = span.GetTag(Tags.HttpStatusCode);
if (rawHttpStatusCode is null || !int.TryParse(rawHttpStatusCode, out var httpStatusCode))
{
Expand Down
194 changes: 147 additions & 47 deletions tracer/src/Datadog.Trace/Agent/StatsAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -28,6 +29,7 @@ internal sealed class StatsAggregator : IStatsAggregator
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<StatsAggregator>();
private static readonly List<byte[]> EmptyPeerTags = [];
private static readonly byte[] PeerTagSeparator = [0];
private static readonly byte[] BaseServiceUtf8Prefix = EncodingHelpers.Utf8NoBom.GetBytes(Tags.BaseService + ":");

private readonly StatsBuffer[] _buffers;

Expand All @@ -54,7 +56,7 @@ internal sealed class StatsAggregator : IStatsAggregator

private int _tracerObfuscationVersion;
private TraceFilter _traceFilter;
private List<string> _peerTagKeys = [];
private List<PeerTagKey> _peerTagKeys = [];

internal StatsAggregator(IApi api, TracerSettings settings, IDiscoveryService discoveryService, bool isOtlp)
{
Expand Down Expand Up @@ -248,10 +250,16 @@ internal SpanCollection ObfuscateTrace(in SpanCollection trace)
return trace;
}

public StatsAggregationKey BuildKey(Span span, out List<byte[]> utf8PeerTags)
=> BuildKey(span, Volatile.Read(ref _peerTagKeys), out utf8PeerTags);
public StatsAggregationKey BuildKey(Span span)
=> BuildKey(span, Volatile.Read(ref _peerTagKeys), out _);

internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out List<byte[]> utf8PeerTags)
/// <summary>
/// Computes a <see cref="StatsAggregationKey"/> for the given span, including the peer tags hash.
/// The <paramref name="peerTagResults"/> carries context to <see cref="GetEncodedPeerTags"/>
/// so the cold path can skip re-deriving spanKind/baseService and pre-allocate the result list.
/// </summary>
[TestingAndPrivateOnly]
internal StatsAggregationKey BuildKey(Span span, List<PeerTagKey> peerTagKeys, out PeerTagResults peerTagResults)
{
var rawHttpStatusCode = span.GetTag(Tags.HttpStatusCode);

Expand Down Expand Up @@ -286,58 +294,48 @@ internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out L
}

// Based on https://github.com/DataDog/datadog-agent/blob/ce22e11ee71e55be717b9d9a3f8f3d7721a9c6d7/pkg/trace/stats/span_concentrator.go#L53-L99
// Peer tags are extracted for client/producer/consumer spans
// Peer tags are extracted for client/server/producer/consumer spans.
// If the span kind is missing or internal, and we have a "base service" tag `_dd.base_service`
// then we only aggregate based on the `_dd.base_service`
// TODO: work out how to optimize the peer tags allocations, to avoid all the extra utf8 allocations
// - on .NET Core these are only necessary for the first instance of the key, but this makes for a tricky
// chicken and egg - we need to convert everything to utf-8, so that we can get the hash, so that we
// know whether we need the tags as byte[] or not..
// then we only aggregate based on the `_dd.base_service`.
// This computes only the hash; see GetEncodedPeerTags() for the cold-path encoding.
ulong peerTagsHash;
if ((string.IsNullOrEmpty(spanKind) || spanKind is SpanKinds.Internal) && span.GetTag(Tags.BaseService) is { Length: >0 } baseService)
if ((string.IsNullOrEmpty(spanKind) || spanKind is SpanKinds.Internal) && span.GetTag(Tags.BaseService) is { Length: > 0 } baseService)
{
utf8PeerTags = [EncodingHelpers.Utf8NoBom.GetBytes($"{Tags.BaseService}:{baseService}")];
peerTagsHash = FnvHash64.GenerateHash(utf8PeerTags[0], FnvHash64.Version.V1A);
peerTagsHash = HashTag(BaseServiceUtf8Prefix, baseService, FnvHash64.Version.V1A);
peerTagResults = new PeerTagResults { BaseService = baseService };
}
else if (spanKind is SpanKinds.Client or SpanKinds.Server or SpanKinds.Producer or SpanKinds.Consumer)
{
// Hash should be generated as TAGNAME:TAGVALUE, and should be in sorted order (we sort ahead of time)
// peerTagKeys should already be in sorted order
// We serialize to the utf-8 bytes because we need to serialize them during sending anyway
// TODO: Verify we get the same results as the go code
utf8PeerTags = EmptyPeerTags;
peerTagsHash = 0;
foreach (var tagKey in peerTagKeys)
// Hash should be generated as TAGNAME:TAGVALUE, in sorted order (peerTagKeys is pre-sorted).
ulong? previousHash = null;
var peerTagCount = 0;
foreach (var peerTag in peerTagKeys)
{
var tagValue = span.GetTag(tagKey);
var tagValue = span.GetTag(peerTag.Name);
if (string.IsNullOrEmpty(tagValue))
{
continue;
}

tagValue = IpAddressObfuscationUtil.QuantizePeerIpAddresses(tagValue);

if (ReferenceEquals(utf8PeerTags, EmptyPeerTags))
if (previousHash.HasValue)
{
// We're not setting the capacity here, because there's
// a _lot_ of potential peer tags, and _most_ of them won't apply
utf8PeerTags = new();
}
else
{
// add the separator
peerTagsHash = FnvHash64.GenerateHash(PeerTagSeparator, FnvHash64.Version.V1A, peerTagsHash);
// add the separator between tags
previousHash = FnvHash64.GenerateHash(PeerTagSeparator, FnvHash64.Version.V1A, previousHash.Value);
}

var bytes = EncodingHelpers.Utf8NoBom.GetBytes($"{tagKey}:{tagValue}");
peerTagsHash = FnvHash64.GenerateHash(bytes, FnvHash64.Version.V1A, peerTagsHash);
utf8PeerTags.Add(bytes);
previousHash = HashTag(peerTag.Utf8Prefix, tagValue, FnvHash64.Version.V1A, previousHash);
peerTagCount++;
}

peerTagResults = new PeerTagResults { PeerTagCount = peerTagCount };
peerTagsHash = previousHash ?? 0;
}
else
{
peerTagsHash = 0;
utf8PeerTags = EmptyPeerTags;
peerTagResults = default;
}

// When submitting trace metrics over OTLP, we must create inidividual timeseries
Expand All @@ -362,6 +360,80 @@ internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out L
peerTagsHash);
}

/// <summary>
/// Hashes "keyPrefix + tagValue" using FNV-64.
/// The <paramref name="keyPrefix"/> is a pre-encoded UTF-8 byte array (e.g. "tagKey:") and is
/// hashed directly. Only the <paramref name="tagValue"/> needs UTF-8 encoding at call time.
/// </summary>
#if NETCOREAPP
[System.Runtime.CompilerServices.SkipLocalsInit]
#endif
private static ulong HashTag(byte[] keyPrefix, string tagValue, FnvHash64.Version version, ulong? initialHash = null)
{
// Hash the pre-encoded key prefix (e.g. "peer.service:") directly — no encoding needed
var hash = initialHash is { } h
? FnvHash64.GenerateHash(keyPrefix, version, h)
: FnvHash64.GenerateHash(keyPrefix, version);

// Now encode and hash just the tag value
var maxByteCount = EncodingHelpers.Utf8NoBom.GetMaxByteCount(tagValue.Length);
#if NETCOREAPP
const int maxStackLimit = 256;

if (maxByteCount <= maxStackLimit)
{
Span<byte> buffer = stackalloc byte[maxStackLimit];
var written = EncodingHelpers.Utf8NoBom.GetBytes(tagValue, buffer);
return FnvHash64.GenerateHash(buffer.Slice(0, written), version, hash);
}
#endif

var rented = ArrayPool<byte>.Shared.Rent(maxByteCount);
try
{
var written = EncodingHelpers.Utf8NoBom.GetBytes(tagValue, charIndex: 0, charCount: tagValue.Length, rented, byteIndex: 0);
return FnvHash64.GenerateHash(rented, 0, written, version, hash);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}

/// <summary>
/// Encodes the peer tags for a span into a <see cref="List{T}"/> of UTF-8 byte arrays.
/// Called only on the cold path (new bucket creation).
/// Uses <paramref name="results"/> from <see cref="BuildKey(Span, List{PeerTagKey}, out PeerTagResults)"/>
/// to skip re-deriving spanKind/baseService and to pre-allocate the result list.
/// </summary>
internal static List<byte[]> GetEncodedPeerTags(Span span, List<PeerTagKey> peerTagKeys, in PeerTagResults results)
{
if (results.BaseService is not null)
{
return [EncodingHelpers.Utf8NoBom.GetBytes($"{Tags.BaseService}:{results.BaseService}")];
}

if (results.PeerTagCount == 0)
{
return EmptyPeerTags;
}

var result = new List<byte[]>(results.PeerTagCount);
foreach (var peerTag in peerTagKeys)
{
var tagValue = span.GetTag(peerTag.Name);
if (string.IsNullOrEmpty(tagValue))
{
continue;
}

tagValue = IpAddressObfuscationUtil.QuantizePeerIpAddresses(tagValue);
result.Add(EncodingHelpers.Utf8NoBom.GetBytes($"{peerTag.Name}:{tagValue}"));
}

return result;
}

internal async Task Flush()
{
// Use a do/while loop to still flush once if _processExit is already completed (this makes testing easier)
Expand Down Expand Up @@ -440,13 +512,14 @@ private void AddToBuffer(Span span)
return;
}

var key = BuildKey(span, out var peerTags);

var buffer = CurrentBuffer;
var peerTagKeys = Volatile.Read(ref _peerTagKeys);
var key = BuildKey(span, peerTagKeys, out var peerTagResults);

if (!buffer.Buckets.TryGetValue(key, out var bucket))
{
bucket = new StatsBucket(key, peerTags);
// Cold path: encode the peer tags for storage in the new bucket
bucket = new StatsBucket(key, GetEncodedPeerTags(span, peerTagKeys, in peerTagResults));
buffer.Buckets.Add(key, bucket);
}

Expand Down Expand Up @@ -490,9 +563,32 @@ private void HandleConfigUpdate(AgentConfiguration config)
|| parsedVersion.Major >= 8
|| (parsedVersion.Major == 7 && parsedVersion.Minor >= 65));

if (config.PeerTags is not null)
if (CanComputeStats.Value)
{
Log.Debug("Stats computation enabled.");
}
else
{
Log.Warning("Stats computation disabled because the detected agent does not support this feature.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're almost certainly going to need an override for this when using the Rust agents, but that's a separate issue...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that whole question of how we identify the rust agent is a whole open question...

// early return, because there's no point doing all the extra work if stats isn't enabled anyway
return;
}

if (config.PeerTags is { Count: > 0 })
{
// Sort, deduplicate, and pre-compute the UTF-8 key prefixes so that
// BuildKey can hash without per-call string encoding.
var precomputed = new List<PeerTagKey>(config.PeerTags.Count);
foreach (var tag in config.PeerTags.Where(x => !string.IsNullOrEmpty(x)).Distinct().OrderBy(x => x))
{
precomputed.Add(new PeerTagKey(tag));
}

Interlocked.Exchange(ref _peerTagKeys, precomputed);
}
else
{
Interlocked.Exchange(ref _peerTagKeys, config.PeerTags);
Interlocked.Exchange(ref _peerTagKeys, []);
}

// Update trace filter from agent configuration
Expand All @@ -509,15 +605,19 @@ private void HandleConfigUpdate(AgentConfiguration config)
const int tracerObfuscationVersion = 1;
var agentVersion = config.ObfuscationVersion;
Volatile.Write(ref _tracerObfuscationVersion, agentVersion > 0 && agentVersion <= tracerObfuscationVersion ? tracerObfuscationVersion : 0);
}

if (CanComputeStats.Value)
{
Log.Debug("Stats computation has been enabled.");
}
else
{
Log.Warning("Stats computation disabled because the detected agent does not support this feature.");
}
internal readonly struct PeerTagKey(string name)
{
public readonly string Name = name;
public readonly byte[] Utf8Prefix = EncodingHelpers.Utf8NoBom.GetBytes(name + ":");
}

internal readonly struct PeerTagResults
{
public int PeerTagCount { get; init; }

public string BaseService { get; init; }
}
}
}
4 changes: 2 additions & 2 deletions tracer/src/Datadog.Trace/Agent/TraceSamplers/RareSampler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private bool SampleSpansAndUpdateSeenSpansIfKept(in SpanCollection trace)

private bool SampleSpan(Span span)
{
var key = _aggregator.BuildKey(span, out _);
var key = _aggregator.BuildKey(span);
var isNewKey = _keys.Add(key);

if (isNewKey)
Expand All @@ -105,7 +105,7 @@ private bool SampleSpan(Span span)

private void UpdateSpan(Span span)
{
var key = _aggregator.BuildKey(span, out _);
var key = _aggregator.BuildKey(span);
var isNewKey = _keys.Add(key);

if (isNewKey)
Expand Down
10 changes: 5 additions & 5 deletions tracer/src/Datadog.Trace/Agent/TraceSamplers/TraceFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal sealed class TraceFilter
private readonly List<KeyValuePair<string, string>> _filterTagKeyValuesReject;
private readonly List<RegexTagFilter> _filterTagsRegexRequire;
private readonly List<RegexTagFilter> _filterTagsRegexReject;
private readonly List<Regex> _ignoreResources;
private readonly List<Regex> _ignoreResourcesRegex;
private readonly bool _hasFilters;

public TraceFilter(AgentTraceFilterConfig config)
Expand All @@ -35,7 +35,7 @@ public TraceFilter(AgentTraceFilterConfig config)
BuildFilterTags(config.FilterTagsReject, out _filterTagKeysReject, out _filterTagKeyValuesReject);
_filterTagsRegexRequire = CompileTagFilters(config.FilterTagsRegexRequire);
_filterTagsRegexReject = CompileTagFilters(config.FilterTagsRegexReject);
_ignoreResources = CompilePatterns(config.IgnoreResources);
_ignoreResourcesRegex = CompilePatterns(config.IgnoreResourcesRegex);

// Short circuit because these are _relatively_ rare, so we can avoid all the work if needs be
_hasFilters = _filterTagKeysRequire.Count > 0
Expand All @@ -44,7 +44,7 @@ public TraceFilter(AgentTraceFilterConfig config)
|| _filterTagKeyValuesReject.Count > 0
|| _filterTagsRegexRequire.Count > 0
|| _filterTagsRegexReject.Count > 0
|| _ignoreResources.Count > 0;
|| _ignoreResourcesRegex.Count > 0;

static void BuildFilterTags(List<string>? filters, out List<string> keyFilters, out List<KeyValuePair<string, string>> keyValueFilters)
{
Expand Down Expand Up @@ -82,9 +82,9 @@ public bool ShouldKeepTrace(Span rootSpan)
}

// 1. Resource filtering: reject if resource matches any ignore_resources pattern
if (_ignoreResources.Count > 0 && !string.IsNullOrEmpty(rootSpan.ResourceName))
if (_ignoreResourcesRegex.Count > 0 && !string.IsNullOrEmpty(rootSpan.ResourceName))
{
foreach (var pattern in _ignoreResources)
foreach (var pattern in _ignoreResourcesRegex)
{
if (pattern.IsMatch(rootSpan.ResourceName))
{
Expand Down
Loading
Loading