-
Notifications
You must be signed in to change notification settings - Fork 159
Improve peer-tag calculation fast-path performance #8445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: andrew/client-side-stats/fnvhash
Are you sure you want to change the base?
Changes from all commits
b14195e
2c0e776
8e94927
d99fa34
0a12140
5dceaf4
cfefb70
7817c48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Runtime.CompilerServices; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
@@ -28,6 +29,7 @@ internal sealed class StatsAggregator : IStatsAggregator | |
| private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<StatsAggregator>(); | ||
| private static readonly List<byte[]> EmptyPeerTags = []; | ||
| private static readonly byte[] PeerTagSeparator = [0]; | ||
| private static readonly byte[] BaseServiceUtf8Prefix = EncodingHelpers.Utf8NoBom.GetBytes(Tags.BaseService + ":"); | ||
|
|
||
| private readonly StatsBuffer[] _buffers; | ||
|
|
||
|
|
@@ -54,7 +56,7 @@ internal sealed class StatsAggregator : IStatsAggregator | |
|
|
||
| private int _tracerObfuscationVersion; | ||
| private TraceFilter _traceFilter; | ||
| private List<string> _peerTagKeys = []; | ||
| private List<PeerTagKey> _peerTagKeys = []; | ||
|
|
||
| internal StatsAggregator(IApi api, TracerSettings settings, IDiscoveryService discoveryService, bool isOtlp) | ||
| { | ||
|
|
@@ -248,10 +250,16 @@ internal SpanCollection ObfuscateTrace(in SpanCollection trace) | |
| return trace; | ||
| } | ||
|
|
||
| public StatsAggregationKey BuildKey(Span span, out List<byte[]> utf8PeerTags) | ||
| => BuildKey(span, Volatile.Read(ref _peerTagKeys), out utf8PeerTags); | ||
| public StatsAggregationKey BuildKey(Span span) | ||
| => BuildKey(span, Volatile.Read(ref _peerTagKeys), out _); | ||
|
|
||
| internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out List<byte[]> utf8PeerTags) | ||
| /// <summary> | ||
| /// Computes a <see cref="StatsAggregationKey"/> for the given span, including the peer tags hash. | ||
| /// The <paramref name="peerTagResults"/> carries context to <see cref="GetEncodedPeerTags"/> | ||
| /// so the cold path can skip re-deriving spanKind/baseService and pre-allocate the result list. | ||
| /// </summary> | ||
| [TestingAndPrivateOnly] | ||
| internal StatsAggregationKey BuildKey(Span span, List<PeerTagKey> peerTagKeys, out PeerTagResults peerTagResults) | ||
| { | ||
| var rawHttpStatusCode = span.GetTag(Tags.HttpStatusCode); | ||
|
|
||
|
|
@@ -286,58 +294,48 @@ internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out L | |
| } | ||
|
|
||
| // Based on https://github.com/DataDog/datadog-agent/blob/ce22e11ee71e55be717b9d9a3f8f3d7721a9c6d7/pkg/trace/stats/span_concentrator.go#L53-L99 | ||
| // Peer tags are extracted for client/producer/consumer spans | ||
| // Peer tags are extracted for client/server/producer/consumer spans. | ||
| // If the span kind is missing or internal, and we have a "base service" tag `_dd.base_service` | ||
| // then we only aggregate based on the `_dd.base_service` | ||
| // TODO: work out how to optimize the peer tags allocations, to avoid all the extra utf8 allocations | ||
| // - on .NET Core these are only necessary for the first instance of the key, but this makes for a tricky | ||
| // chicken and egg - we need to convert everything to utf-8, so that we can get the hash, so that we | ||
| // know whether we need the tags as byte[] or not.. | ||
| // then we only aggregate based on the `_dd.base_service`. | ||
| // This computes only the hash; see GetEncodedPeerTags() for the cold-path encoding. | ||
| ulong peerTagsHash; | ||
| if ((string.IsNullOrEmpty(spanKind) || spanKind is SpanKinds.Internal) && span.GetTag(Tags.BaseService) is { Length: >0 } baseService) | ||
| if ((string.IsNullOrEmpty(spanKind) || spanKind is SpanKinds.Internal) && span.GetTag(Tags.BaseService) is { Length: > 0 } baseService) | ||
| { | ||
| utf8PeerTags = [EncodingHelpers.Utf8NoBom.GetBytes($"{Tags.BaseService}:{baseService}")]; | ||
| peerTagsHash = FnvHash64.GenerateHash(utf8PeerTags[0], FnvHash64.Version.V1A); | ||
| peerTagsHash = HashTag(BaseServiceUtf8Prefix, baseService, FnvHash64.Version.V1A); | ||
| peerTagResults = new PeerTagResults { BaseService = baseService }; | ||
| } | ||
| else if (spanKind is SpanKinds.Client or SpanKinds.Server or SpanKinds.Producer or SpanKinds.Consumer) | ||
| { | ||
| // Hash should be generated as TAGNAME:TAGVALUE, and should be in sorted order (we sort ahead of time) | ||
| // peerTagKeys should already be in sorted order | ||
| // We serialize to the utf-8 bytes because we need to serialize them during sending anyway | ||
| // TODO: Verify we get the same results as the go code | ||
| utf8PeerTags = EmptyPeerTags; | ||
| peerTagsHash = 0; | ||
| foreach (var tagKey in peerTagKeys) | ||
| // Hash should be generated as TAGNAME:TAGVALUE, in sorted order (peerTagKeys is pre-sorted). | ||
| ulong? previousHash = null; | ||
| var peerTagCount = 0; | ||
| foreach (var peerTag in peerTagKeys) | ||
| { | ||
| var tagValue = span.GetTag(tagKey); | ||
| var tagValue = span.GetTag(peerTag.Name); | ||
| if (string.IsNullOrEmpty(tagValue)) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| tagValue = IpAddressObfuscationUtil.QuantizePeerIpAddresses(tagValue); | ||
|
|
||
| if (ReferenceEquals(utf8PeerTags, EmptyPeerTags)) | ||
| if (previousHash.HasValue) | ||
| { | ||
| // We're not setting the capacity here, because there's | ||
| // a _lot_ of potential peer tags, and _most_ of them won't apply | ||
| utf8PeerTags = new(); | ||
| } | ||
| else | ||
| { | ||
| // add the separator | ||
| peerTagsHash = FnvHash64.GenerateHash(PeerTagSeparator, FnvHash64.Version.V1A, peerTagsHash); | ||
| // add the separator between tags | ||
| previousHash = FnvHash64.GenerateHash(PeerTagSeparator, FnvHash64.Version.V1A, previousHash.Value); | ||
| } | ||
|
|
||
| var bytes = EncodingHelpers.Utf8NoBom.GetBytes($"{tagKey}:{tagValue}"); | ||
| peerTagsHash = FnvHash64.GenerateHash(bytes, FnvHash64.Version.V1A, peerTagsHash); | ||
| utf8PeerTags.Add(bytes); | ||
| previousHash = HashTag(peerTag.Utf8Prefix, tagValue, FnvHash64.Version.V1A, previousHash); | ||
| peerTagCount++; | ||
| } | ||
|
|
||
| peerTagResults = new PeerTagResults { PeerTagCount = peerTagCount }; | ||
| peerTagsHash = previousHash ?? 0; | ||
| } | ||
| else | ||
| { | ||
| peerTagsHash = 0; | ||
| utf8PeerTags = EmptyPeerTags; | ||
| peerTagResults = default; | ||
| } | ||
|
|
||
| // When submitting trace metrics over OTLP, we must create inidividual timeseries | ||
|
|
@@ -362,6 +360,80 @@ internal StatsAggregationKey BuildKey(Span span, List<string> peerTagKeys, out L | |
| peerTagsHash); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Hashes "keyPrefix + tagValue" using FNV-64. | ||
| /// The <paramref name="keyPrefix"/> is a pre-encoded UTF-8 byte array (e.g. "tagKey:") and is | ||
| /// hashed directly. Only the <paramref name="tagValue"/> needs UTF-8 encoding at call time. | ||
| /// </summary> | ||
| #if NETCOREAPP | ||
| [System.Runtime.CompilerServices.SkipLocalsInit] | ||
| #endif | ||
| private static ulong HashTag(byte[] keyPrefix, string tagValue, FnvHash64.Version version, ulong? initialHash = null) | ||
| { | ||
| // Hash the pre-encoded key prefix (e.g. "peer.service:") directly — no encoding needed | ||
| var hash = initialHash is { } h | ||
| ? FnvHash64.GenerateHash(keyPrefix, version, h) | ||
| : FnvHash64.GenerateHash(keyPrefix, version); | ||
|
|
||
| // Now encode and hash just the tag value | ||
| var maxByteCount = EncodingHelpers.Utf8NoBom.GetMaxByteCount(tagValue.Length); | ||
| #if NETCOREAPP | ||
| const int maxStackLimit = 256; | ||
|
|
||
| if (maxByteCount <= maxStackLimit) | ||
| { | ||
| Span<byte> buffer = stackalloc byte[maxStackLimit]; | ||
| var written = EncodingHelpers.Utf8NoBom.GetBytes(tagValue, buffer); | ||
| return FnvHash64.GenerateHash(buffer.Slice(0, written), version, hash); | ||
| } | ||
| #endif | ||
|
|
||
| var rented = ArrayPool<byte>.Shared.Rent(maxByteCount); | ||
| try | ||
| { | ||
| var written = EncodingHelpers.Utf8NoBom.GetBytes(tagValue, charIndex: 0, charCount: tagValue.Length, rented, byteIndex: 0); | ||
| return FnvHash64.GenerateHash(rented, 0, written, version, hash); | ||
| } | ||
| finally | ||
| { | ||
| ArrayPool<byte>.Shared.Return(rented); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Encodes the peer tags for a span into a <see cref="List{T}"/> of UTF-8 byte arrays. | ||
| /// Called only on the cold path (new bucket creation). | ||
| /// Uses <paramref name="results"/> from <see cref="BuildKey(Span, List{PeerTagKey}, out PeerTagResults)"/> | ||
| /// to skip re-deriving spanKind/baseService and to pre-allocate the result list. | ||
| /// </summary> | ||
| internal static List<byte[]> GetEncodedPeerTags(Span span, List<PeerTagKey> peerTagKeys, in PeerTagResults results) | ||
| { | ||
| if (results.BaseService is not null) | ||
| { | ||
| return [EncodingHelpers.Utf8NoBom.GetBytes($"{Tags.BaseService}:{results.BaseService}")]; | ||
| } | ||
|
|
||
| if (results.PeerTagCount == 0) | ||
| { | ||
| return EmptyPeerTags; | ||
| } | ||
|
|
||
| var result = new List<byte[]>(results.PeerTagCount); | ||
| foreach (var peerTag in peerTagKeys) | ||
| { | ||
| var tagValue = span.GetTag(peerTag.Name); | ||
| if (string.IsNullOrEmpty(tagValue)) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| tagValue = IpAddressObfuscationUtil.QuantizePeerIpAddresses(tagValue); | ||
| result.Add(EncodingHelpers.Utf8NoBom.GetBytes($"{peerTag.Name}:{tagValue}")); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| internal async Task Flush() | ||
| { | ||
| // Use a do/while loop to still flush once if _processExit is already completed (this makes testing easier) | ||
|
|
@@ -440,13 +512,14 @@ private void AddToBuffer(Span span) | |
| return; | ||
| } | ||
|
|
||
| var key = BuildKey(span, out var peerTags); | ||
|
|
||
| var buffer = CurrentBuffer; | ||
| var peerTagKeys = Volatile.Read(ref _peerTagKeys); | ||
| var key = BuildKey(span, peerTagKeys, out var peerTagResults); | ||
|
|
||
| if (!buffer.Buckets.TryGetValue(key, out var bucket)) | ||
| { | ||
| bucket = new StatsBucket(key, peerTags); | ||
| // Cold path: encode the peer tags for storage in the new bucket | ||
| bucket = new StatsBucket(key, GetEncodedPeerTags(span, peerTagKeys, in peerTagResults)); | ||
| buffer.Buckets.Add(key, bucket); | ||
| } | ||
|
|
||
|
|
@@ -490,9 +563,32 @@ private void HandleConfigUpdate(AgentConfiguration config) | |
| || parsedVersion.Major >= 8 | ||
| || (parsedVersion.Major == 7 && parsedVersion.Minor >= 65)); | ||
|
|
||
| if (config.PeerTags is not null) | ||
| if (CanComputeStats.Value) | ||
| { | ||
| Log.Debug("Stats computation enabled."); | ||
| } | ||
| else | ||
| { | ||
| Log.Warning("Stats computation disabled because the detected agent does not support this feature."); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're almost certainly going to need an override for this when using the Rust agents, but that's a separate issue...
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that whole question of how we identify the rust agent is a whole open question... |
||
| // early return, because there's no point doing all the extra work if stats isn't enabled anyway | ||
| return; | ||
| } | ||
|
|
||
| if (config.PeerTags is { Count: > 0 }) | ||
| { | ||
| // Sort, deduplicate, and pre-compute the UTF-8 key prefixes so that | ||
| // BuildKey can hash without per-call string encoding. | ||
| var precomputed = new List<PeerTagKey>(config.PeerTags.Count); | ||
| foreach (var tag in config.PeerTags.Where(x => !string.IsNullOrEmpty(x)).Distinct().OrderBy(x => x)) | ||
| { | ||
| precomputed.Add(new PeerTagKey(tag)); | ||
| } | ||
|
|
||
| Interlocked.Exchange(ref _peerTagKeys, precomputed); | ||
| } | ||
| else | ||
| { | ||
| Interlocked.Exchange(ref _peerTagKeys, config.PeerTags); | ||
| Interlocked.Exchange(ref _peerTagKeys, []); | ||
| } | ||
|
|
||
| // Update trace filter from agent configuration | ||
|
|
@@ -509,15 +605,19 @@ private void HandleConfigUpdate(AgentConfiguration config) | |
| const int tracerObfuscationVersion = 1; | ||
| var agentVersion = config.ObfuscationVersion; | ||
| Volatile.Write(ref _tracerObfuscationVersion, agentVersion > 0 && agentVersion <= tracerObfuscationVersion ? tracerObfuscationVersion : 0); | ||
| } | ||
|
|
||
| if (CanComputeStats.Value) | ||
| { | ||
| Log.Debug("Stats computation has been enabled."); | ||
| } | ||
| else | ||
| { | ||
| Log.Warning("Stats computation disabled because the detected agent does not support this feature."); | ||
| } | ||
| internal readonly struct PeerTagKey(string name) | ||
| { | ||
| public readonly string Name = name; | ||
| public readonly byte[] Utf8Prefix = EncodingHelpers.Utf8NoBom.GetBytes(name + ":"); | ||
| } | ||
|
|
||
| internal readonly struct PeerTagResults | ||
| { | ||
| public int PeerTagCount { get; init; } | ||
|
|
||
| public string BaseService { get; init; } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:tears-of-joy: