From 3850cdb84958c404d02774496bccbf4b622a244f Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Mon, 13 Apr 2026 15:18:03 -0500 Subject: [PATCH 1/6] DSM overhead optimizations --- .../AutoInstrumentation/Kafka/KafkaHelper.cs | 24 ++-- .../Kafka/ProduceEdgeTagCacheKey.cs | 43 +++++++ .../DataStreamsContextPropagator.cs | 24 ++++ .../DataStreamsManager.cs | 110 +++++++++++++++++- .../DataStreamsMonitoring/EdgeTagCache.cs | 20 ++++ .../PathwayContextEncoder.cs | 25 ++++ .../Utils/BinaryPrimitivesHelper.cs | 5 + .../DataStreamsManagerTests.cs | 61 ++++++++++ .../PathwayContextEncoderTests.cs | 17 +++ 9 files changed, 321 insertions(+), 8 deletions(-) create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs index 0ba7bcb8bf54..5b61f290ec57 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs @@ -142,6 +142,19 @@ private static long GetMessageSize(T message) return size; } + // NOTE: tags must be sorted alphabetically — called only on edge-tag cache miss + private static string[] BuildProduceEdgeTags(string clusterId, string topic) + { + if (!StringUtil.IsNullOrEmpty(clusterId)) + { + return StringUtil.IsNullOrEmpty(topic) + ? ["direction:out", $"kafka_cluster_id:{clusterId}", "type:kafka"] + : ["direction:out", $"kafka_cluster_id:{clusterId}", $"topic:{topic}", "type:kafka"]; + } + + return ["direction:out", $"topic:{topic}", "type:kafka"]; + } + internal static Scope? CreateConsumerScope( Tracer tracer, DataStreamsManager dataStreamsManager, @@ -398,17 +411,14 @@ internal static void TryInjectHeaders( ProducerCache.TryGetProducer(producer, out _, out var producerClusterId); string[] edgeTags; - if (!StringUtil.IsNullOrEmpty(producerClusterId)) + if (StringUtil.IsNullOrEmpty(topic) && StringUtil.IsNullOrEmpty(producerClusterId)) { - edgeTags = StringUtil.IsNullOrEmpty(topic) - ? ["direction:out", $"kafka_cluster_id:{producerClusterId}", "type:kafka"] - : ["direction:out", $"kafka_cluster_id:{producerClusterId}", $"topic:{topic}", "type:kafka"]; + edgeTags = DefaultProduceEdgeTags; } else { - edgeTags = StringUtil.IsNullOrEmpty(topic) - ? DefaultProduceEdgeTags - : ["direction:out", $"topic:{topic}", "type:kafka"]; + var cacheKey = new ProduceEdgeTagCacheKey(producerClusterId ?? string.Empty, topic ?? string.Empty); + edgeTags = dataStreamsManager.GetOrCreateEdgeTags(cacheKey, static k => BuildProduceEdgeTags(k.ClusterId, k.Topic)); } var msgSize = dataStreamsManager.IsInDefaultState ? 0 : GetMessageSize(message); diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceEdgeTagCacheKey.cs new file mode 100644 index 000000000000..8efdefaa23fc --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceEdgeTagCacheKey.cs @@ -0,0 +1,43 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; + +/// +/// Value-type cache key for produce edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct ProduceEdgeTagCacheKey : IEquatable +{ + public readonly string ClusterId; + public readonly string Topic; + + public ProduceEdgeTagCacheKey(string clusterId, string topic) + { + ClusterId = clusterId; + Topic = topic; + } + + public bool Equals(ProduceEdgeTagCacheKey other) + => ClusterId == other.ClusterId && Topic == other.Topic; + + public override bool Equals(object? obj) + => obj is ProduceEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (ClusterId?.GetHashCode() ?? 0); + hash = (hash * 31) + (Topic?.GetHashCode() ?? 0); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs index 9f838c2f25c5..827339e9df3d 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs @@ -34,6 +34,29 @@ internal void Inject(PathwayContext context, TCarrier headers, bool is { if (headers is null) { ThrowHelper.ThrowArgumentNullException(nameof(headers)); } +#if NETCOREAPP3_1_OR_GREATER + // Encode directly into stack buffers to avoid heap allocations for the intermediate byte arrays. + // The only unavoidable allocation is the final ToArray() for headers.Add, since Kafka takes ownership. + Span encodedBytes = stackalloc byte[PathwayContextEncoder.MaxEncodedSize]; + var encodedLen = PathwayContextEncoder.EncodeInto(context, encodedBytes); + var encodedSlice = encodedBytes.Slice(0, encodedLen); + + Span base64Bytes = stackalloc byte[PathwayContextEncoder.MaxBase64EncodedSize]; + var status = Base64.EncodeToUtf8(encodedSlice, base64Bytes, out _, out int bytesWritten); + + if (status != OperationStatus.Done) + { + Log.Error("Failed to encode Data Streams context to Base64. OperationStatus: {Status}", status); + return; + } + + headers.Add(DataStreamsPropagationHeaders.PropagationKeyBase64, base64Bytes.Slice(0, bytesWritten).ToArray()); + + if (isDataStreamsLegacyHeadersEnabled) + { + headers.Add(DataStreamsPropagationHeaders.PropagationKey, encodedSlice.ToArray()); + } +#else var encodedBytes = PathwayContextEncoder.Encode(context); // Calculate the maximum length of the base64 encoded data @@ -62,6 +85,7 @@ internal void Inject(PathwayContext context, TCarrier headers, bool is { headers.Add(DataStreamsPropagationHeaders.PropagationKey, encodedBytes); } +#endif } /// diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs index de18f186aca4..f08eb5ecb0ae 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Datadog.Trace.Agent.DiscoveryService; @@ -27,6 +28,13 @@ namespace Datadog.Trace.DataStreamsMonitoring; /// internal sealed class DataStreamsManager { + /// + /// Maximum number of distinct keys stored in a single per-type edge-tag cache. + /// When the limit is reached, new keys are computed on the fly without caching to + /// prevent unbounded memory growth caused by high-cardinality identifiers. + /// + internal const int MaxEdgeTagCacheSize = 1000; + private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(); private static readonly AsyncLocal LastConsumePathway = new(); // saves the context on consume checkpointing only private readonly object _nodeHashUpdateLock = new(); @@ -36,6 +44,7 @@ internal sealed class DataStreamsManager private readonly IDisposable _updateSubscription; private readonly bool _isLegacyDsmHeadersEnabled; private readonly bool _isInDefaultState; + private readonly ConditionalWeakTable _nodeHashCache = new(); private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for private MutableSettings _previousMutableSettings; private string? _previousContainerTagsHash; @@ -303,7 +312,24 @@ public void InjectPathwayContextAsBase64String(PathwayContext? context // Don't blame me, blame the fact we can't do Volatile.Read with a ulong in .NET FX... var nodeHashBase = new NodeHashBase(unchecked((ulong)Volatile.Read(ref _nodeHashBase))); - var nodeHash = HashHelper.CalculateNodeHash(nodeHashBase, edgeTags); + var cacheEntry = _nodeHashCache.GetOrCreateValue(edgeTags); + NodeHash nodeHash; + + // Fast lock-free path: snapshot is an immutable object published via a volatile field. + // If the base still matches we avoid taking any lock on the hot path. + if (!cacheEntry.TryGetNodeHash(nodeHashBase, out nodeHash)) + { + lock (cacheEntry) + { + // Double-check under lock in case another thread raced to update + if (!cacheEntry.TryGetNodeHash(nodeHashBase, out nodeHash)) + { + nodeHash = HashHelper.CalculateNodeHash(nodeHashBase, edgeTags); + cacheEntry.Store(nodeHashBase, nodeHash); + } + } + } + var parentHash = previousContext?.Hash ?? default; var pathwayHash = HashHelper.CalculatePathwayHash(nodeHash, parentHash); @@ -351,6 +377,34 @@ public void InjectPathwayContextAsBase64String(PathwayContext? context } } + /// + /// Returns a cached edge-tag array for the given key, creating and caching it on first use. + /// On cache hits, zero heap allocations occur. The factory is only invoked on the first call + /// per unique key, making this safe to use on high-throughput hot paths. + /// Once the cache reaches entries the result is computed + /// fresh each time (no caching) to bound memory usage for high-cardinality key spaces. + /// + /// A value type (struct) used as the cache key — no boxing. + /// The cache key derived from the caller's natural identifiers. + /// A static factory that builds the edge-tag array from the key on cache miss. + public string[] GetOrCreateEdgeTags(TKey key, Func factory) + where TKey : notnull, IEquatable + { + var cache = EdgeTagCache.Cache; + if (cache.TryGetValue(key, out var existing)) + { + return existing; + } + + if (cache.Count >= MaxEdgeTagCacheSize) + { + // High-cardinality key space — bypass cache to prevent unbounded memory growth + return factory(key); + } + + return cache.GetOrAdd(key, factory); + } + /// /// Make sure we only extract the schema (a costly operation) on select occasions /// @@ -371,4 +425,58 @@ public bool ShouldExtractSchema(Span span, string operation, out int weight) weight = 0; return false; } + + /// + /// Memoized NodeHash associated with a specific edge-tag array instance and nodeHashBase value. + /// The volatile field enables a lock-free fast path: callers read the + /// snapshot without a lock, and only acquire the lock when the base has changed or is missing. + /// + private sealed class NodeHashCacheEntry + { + // Immutable snapshot published via volatile write; null until first computation. + private volatile NodeHashSnapshot? _snapshot; + + /// + /// Tries to return the cached for + /// without acquiring any lock (lock-free read via volatile field). + /// + public bool TryGetNodeHash(NodeHashBase nodeHashBase, out NodeHash nodeHash) + { + var snap = _snapshot; // volatile read — acts as a load-acquire barrier + if (snap is not null && snap.Base == nodeHashBase.Value) + { + nodeHash = snap.Hash; + return true; + } + + nodeHash = default; + return false; + } + + /// + /// Stores a newly-computed . Must be called under a lock held by the caller. + /// The volatile write ensures the snapshot is visible to all threads before the lock is released. + /// + public void Store(NodeHashBase nodeHashBase, NodeHash nodeHash) + { + _snapshot = new NodeHashSnapshot(nodeHashBase.Value, nodeHash); // volatile write + } + + /// Immutable payload published atomically via the volatile field. + private sealed class NodeHashSnapshot + { + private readonly ulong _base; + private readonly NodeHash _hash; + + internal NodeHashSnapshot(ulong @base, NodeHash hash) + { + _base = @base; + _hash = hash; + } + + internal ulong Base => _base; + + internal NodeHash Hash => _hash; + } + } } diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs new file mode 100644 index 000000000000..8c06cfc2154d --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs @@ -0,0 +1,20 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Concurrent; + +namespace Datadog.Trace.DataStreamsMonitoring; + +/// +/// Process-wide cache of edge tag arrays, keyed by a caller-supplied value type. +/// One dictionary instance exists per distinct TKey type (static generic class pattern). +/// This avoids boxing and lets each integration use its own natural key shape. +/// +internal static class EdgeTagCache + where TKey : notnull, IEquatable +{ + internal static readonly ConcurrentDictionary Cache = new(); +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs index a38747dd7be7..9bdada6c501d 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs @@ -15,6 +15,12 @@ internal static class PathwayContextEncoder { private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(PathwayContextEncoder)); + /// Maximum byte length produced by EncodeInto: 8 (hash) + 9 (pathway ms) + 9 (edge ms). + internal const int MaxEncodedSize = 26; + + /// Maximum byte length of the Base64 encoding of bytes: ceil(26/3)*4 = 36. + internal const int MaxBase64EncodedSize = 36; + /// /// Encodes a as a series of bytes /// NOTE: the encoding is lossy, in that we convert @@ -38,6 +44,25 @@ public static byte[] Encode(PathwayContext pathway) return bytes; } +#if NETCOREAPP3_1_OR_GREATER + /// + /// Zero-allocation alternative to : writes the encoded pathway directly into + /// a caller-supplied (must be at least bytes). + /// + /// Number of bytes written into . + public static int EncodeInto(PathwayContext pathway, Span buffer) + { + var pathwayStartMs = ToMilliseconds(pathway.PathwayStart); + var edgeStartMs = ToMilliseconds(pathway.EdgeStart); + + BinaryPrimitivesHelper.WriteUInt64LittleEndian(buffer, pathway.Hash.Value); + var pathwayBytes = VarEncodingHelper.WriteVarLongZigZag(buffer.Slice(8), pathwayStartMs); + var edgeBytes = VarEncodingHelper.WriteVarLongZigZag(buffer.Slice(8 + pathwayBytes), edgeStartMs); + + return 8 + pathwayBytes + edgeBytes; + } +#endif + /// /// Tries to decode a from a byte[]. /// NOTE: the encoding process is lossy, so the decoded diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Utils/BinaryPrimitivesHelper.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Utils/BinaryPrimitivesHelper.cs index 4f574a9703cf..de066df55aa0 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Utils/BinaryPrimitivesHelper.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Utils/BinaryPrimitivesHelper.cs @@ -13,6 +13,11 @@ namespace Datadog.Trace.DataStreamsMonitoring.Utils; internal static class BinaryPrimitivesHelper { +#if NETCOREAPP3_1_OR_GREATER + public static void WriteUInt64LittleEndian(Span bytes, ulong value) + => System.Buffers.Binary.BinaryPrimitives.WriteUInt64LittleEndian(bytes, value); +#endif + public static void WriteUInt64LittleEndian(byte[] bytes, ulong value) { #if NETCOREAPP3_1_OR_GREATER diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs index b0916cacc7be..9a6bae8ef1c4 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Datadog.Trace.Agent.DiscoveryService; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; using Datadog.Trace.Configuration; using Datadog.Trace.DataStreamsMonitoring; using Datadog.Trace.DataStreamsMonitoring.Aggregation; @@ -405,6 +406,49 @@ public async Task WhenDisposedTwice_DisposesWriterOnce() writer.DisposeCount.Should().Be(1); } + [Fact] + public void GetOrCreateEdgeTags_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new ProduceEdgeTagCacheKey("cluster1", "topic1"); + + var first = dsm.GetOrCreateEdgeTags(key, static k => [$"kafka_cluster_id:{k.ClusterId}", $"topic:{k.Topic}", "type:kafka"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => [$"kafka_cluster_id:{k.ClusterId}", $"topic:{k.Topic}", "type:kafka"]); + + second.Should().BeSameAs(first); + } + + [Fact] + public void GetOrCreateEdgeTags_ReturnsDifferentArrayReferences_ForDifferentKeys() + { + var dsm = GetDataStreamManager(true, out _); + + var forTopic1 = dsm.GetOrCreateEdgeTags(new ProduceEdgeTagCacheKey(string.Empty, "topic1"), static k => [$"topic:{k.Topic}", "type:kafka"]); + var forTopic2 = dsm.GetOrCreateEdgeTags(new ProduceEdgeTagCacheKey(string.Empty, "topic2"), static k => [$"topic:{k.Topic}", "type:kafka"]); + + forTopic2.Should().NotBeSameAs(forTopic1); + } + + [Fact] + public void GetOrCreateEdgeTags_BypassesCache_WhenAtMaxCapacity() + { + var dsm = GetDataStreamManager(true, out _); + + // Fill the per-type cache to the cap using a key type private to this test + // (OverflowTestKey has its own static dictionary, isolated from ProduceEdgeTagCacheKey) + for (var i = 0; i < DataStreamsManager.MaxEdgeTagCacheSize; i++) + { + dsm.GetOrCreateEdgeTags(new OverflowTestKey(i), static k => [$"tag:{k.Value}"]); + } + + // The cache is now full; a new key should bypass caching and return a fresh array each call + var overflowKey = new OverflowTestKey(DataStreamsManager.MaxEdgeTagCacheSize); + var first = dsm.GetOrCreateEdgeTags(overflowKey, static k => [$"tag:{k.Value}"]); + var second = dsm.GetOrCreateEdgeTags(overflowKey, static k => [$"tag:{k.Value}"]); + + second.Should().NotBeSameAs(first); + } + private static DataStreamsManager GetDataStreamManager(bool enabled, out DataStreamsWriterMock writer) { writer = enabled ? new DataStreamsWriterMock() : null; @@ -422,6 +466,23 @@ private static DataStreamsManager GetDataStreamManager(bool enabled, out DataStr return new DataStreamsManager(settings, writer, Mock.Of()); } + /// + /// Private key type used exclusively by . + /// Having a unique type gives an isolated EdgeTagCache<OverflowTestKey> dictionary. + /// + private readonly struct OverflowTestKey : IEquatable + { + public readonly int Value; + + public OverflowTestKey(int value) => Value = value; + + public bool Equals(OverflowTestKey other) => Value == other.Value; + + public override bool Equals(object obj) => obj is OverflowTestKey other && Equals(other); + + public override int GetHashCode() => Value; + } + internal class DataStreamsWriterMock : IDataStreamsWriter { private int _disposeCount; diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/PathwayContextEncoderTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/PathwayContextEncoderTests.cs index 14f91d3f975a..62f0a23a0cfc 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/PathwayContextEncoderTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/PathwayContextEncoderTests.cs @@ -105,6 +105,23 @@ public void DecoderFailure_OutOfRangePathwayBytes() decoded.Should().BeNull(); } +#if NETCOREAPP3_1_OR_GREATER + [Theory] + [InlineData(0, 0, 0)] + [InlineData(12345, 1_000_000_000L, 2_000_000_000L)] + public void EncodeInto_ProducesSameBytesAsEncode(ulong hash, long pathwayStartNs, long edgeStartNs) + { + var pathway = new PathwayContext(new PathwayHash(hash), pathwayStartNs, edgeStartNs); + + var expected = PathwayContextEncoder.Encode(pathway); + + Span buffer = stackalloc byte[PathwayContextEncoder.MaxEncodedSize]; + var bytesWritten = PathwayContextEncoder.EncodeInto(pathway, buffer); + + buffer.Slice(0, bytesWritten).ToArray().Should().Equal(expected); + } +#endif + private static void EncodeTest(ulong hash, long pathwayStartNs, long edgeStartNs) { var pathway = new PathwayContext( From adeec8fff58bb389a1be5854ffdc4a2bb8ede06e Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Mon, 13 Apr 2026 16:12:58 -0500 Subject: [PATCH 2/6] Implement optimizations for all integrations --- .../AWS/Kinesis/ContextPropagation.cs | 4 +- .../AWS/Kinesis/GetRecordsAsyncIntegration.cs | 4 +- .../AWS/Kinesis/GetRecordsIntegration.cs | 4 +- .../AWS/Kinesis/KinesisEdgeTagCacheKey.cs | 45 +++++++ .../AWS/SNS/AwsSnsHandlerCommon.cs | 7 +- .../AWS/SNS/SnsEdgeTagCacheKey.cs | 33 +++++ .../AWS/SQS/AwsSqsHandlerCommon.cs | 16 ++- .../AWS/SQS/SqsEdgeTagCacheKey.cs | 45 +++++++ .../ServiceBus/ProcessMessageIntegration.cs | 10 +- .../ServiceBus/ServiceBusEdgeTagCacheKey.cs | 33 +++++ .../IbmMq/GetIntegration.cs | 4 +- .../IbmMq/IbmMqEdgeTagCacheKey.cs | 45 +++++++ .../IbmMq/PutIntegration.cs | 4 +- .../Kafka/ConsumeEdgeTagCacheKey.cs | 46 +++++++ .../AutoInstrumentation/Kafka/KafkaHelper.cs | 34 ++--- .../RabbitMQConsumeEdgeTagCacheKey.cs | 33 +++++ .../RabbitMQ/RabbitMQIntegration.cs | 21 +++- .../RabbitMQProduceEdgeTagCacheKey.cs | 48 +++++++ .../DataStreamsMonitoring/EdgeTagCache.cs | 2 + .../DataStreamsManagerTests.cs | 117 ++++++++++++++++++ 20 files changed, 521 insertions(+), 34 deletions(-) create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/KinesisEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/SnsEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/SqsEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/IbmMqEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ConsumeEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQConsumeEdgeTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQProduceEdgeTagCacheKey.cs diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/ContextPropagation.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/ContextPropagation.cs index 438d47cf0f3a..36640a8f45be 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/ContextPropagation.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/ContextPropagation.cs @@ -64,7 +64,9 @@ public static void InjectTraceIntoData(Tracer tracer, TRecordReq if (dataStreamsManager != null && dataStreamsManager.IsEnabled) { var payloadSize = jsonData?.Count > 0 && record.Data != null ? record.Data.Length : 0; - var edgeTags = new[] { "direction:out", $"topic:{streamName}", "type:kinesis" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new KinesisEdgeTagCacheKey(streamName!, isConsume: false), + static k => ["direction:out", $"topic:{k.StreamName}", "type:kinesis"]); scope.Span.SetDataStreamsCheckpoint( dataStreamsManager, CheckpointKind.Produce, diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsAsyncIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsAsyncIntegration.cs index 54e6e38fea65..f20bb85eb00b 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsAsyncIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsAsyncIntegration.cs @@ -61,7 +61,9 @@ internal static TResponse OnAsyncMethodEnd(TTarget instance, var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager; if (dataStreamsManager is { IsEnabled: true }) { - var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new KinesisEdgeTagCacheKey((string)state.State, isConsume: true), + static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]); foreach (var o in response.Records) { var record = o.DuckCast(); diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsIntegration.cs index 35787e73617a..79d8d54af521 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/GetRecordsIntegration.cs @@ -78,7 +78,9 @@ internal static CallTargetReturn OnMethodEnd(TTar var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager; if (dataStreamsManager is { IsEnabled: true }) { - var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new KinesisEdgeTagCacheKey((string)state.State, isConsume: true), + static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]); foreach (var o in response.Records) { var record = o.DuckCast(); diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/KinesisEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/KinesisEdgeTagCacheKey.cs new file mode 100644 index 000000000000..6c75dd4f8eca --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Kinesis/KinesisEdgeTagCacheKey.cs @@ -0,0 +1,45 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis; + +/// +/// Value-type cache key for Kinesis edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// distinguishes produce (direction:out) from consume (direction:in) +/// so that both directions share a single cache type without key collision. +/// +internal readonly struct KinesisEdgeTagCacheKey : IEquatable +{ + public readonly string StreamName; + public readonly bool IsConsume; + + public KinesisEdgeTagCacheKey(string streamName, bool isConsume) + { + StreamName = streamName; + IsConsume = isConsume; + } + + public bool Equals(KinesisEdgeTagCacheKey other) + => StreamName == other.StreamName && IsConsume == other.IsConsume; + + public override bool Equals(object? obj) + => obj is KinesisEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (StreamName?.GetHashCode() ?? 0); + hash = (hash * 31) + IsConsume.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/AwsSnsHandlerCommon.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/AwsSnsHandlerCommon.cs index f26019203da7..96158fe578da 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/AwsSnsHandlerCommon.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/AwsSnsHandlerCommon.cs @@ -37,8 +37,11 @@ public static CallTargetState BeforePublish(TPublishRequest req if (scope?.Span.Context is { } context && !string.IsNullOrEmpty(topicName)) { var dataStreamsManager = tracer.TracerManager.DataStreamsManager; - // avoid allocation if edgeTags are not going to be used - var edgeTags = dataStreamsManager is { IsEnabled: true } ? ["direction:out", $"topic:{topicName}", "type:sns"] : Array.Empty(); + var edgeTags = dataStreamsManager is { IsEnabled: true } + ? dataStreamsManager.GetOrCreateEdgeTags( + new SnsEdgeTagCacheKey(topicName!), + static k => ["direction:out", $"topic:{k.TopicName}", "type:sns"]) + : Array.Empty(); if (sendType == SendType.SingleMessage) { diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/SnsEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/SnsEdgeTagCacheKey.cs new file mode 100644 index 000000000000..498fc70b8aa4 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/SnsEdgeTagCacheKey.cs @@ -0,0 +1,33 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SNS; + +/// +/// Value-type cache key for SNS produce edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct SnsEdgeTagCacheKey : IEquatable +{ + public readonly string TopicName; + + public SnsEdgeTagCacheKey(string topicName) + { + TopicName = topicName; + } + + public bool Equals(SnsEdgeTagCacheKey other) + => TopicName == other.TopicName; + + public override bool Equals(object? obj) + => obj is SnsEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + => TopicName?.GetHashCode() ?? 0; +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/AwsSqsHandlerCommon.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/AwsSqsHandlerCommon.cs index 508e9b0c808f..748596c0ba94 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/AwsSqsHandlerCommon.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/AwsSqsHandlerCommon.cs @@ -66,7 +66,9 @@ private static void InjectForSingleMessage(Tracer tracer, T var dataStreamsManager = tracer.TracerManager.DataStreamsManager; if (dataStreamsManager != null && dataStreamsManager.IsEnabled) { - var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new SqsEdgeTagCacheKey(queueName, isConsume: false), + static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"]); scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0); } @@ -81,14 +83,18 @@ private static void InjectForBatch(Tracer tracer, TSen return; } - var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" }; + var dataStreamsManager = tracer.TracerManager.DataStreamsManager; + var edgeTags = dataStreamsManager is { IsEnabled: true } + ? dataStreamsManager.GetOrCreateEdgeTags( + new SqsEdgeTagCacheKey(queueName, isConsume: false), + static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"]) + : new[] { "direction:out", $"topic:{queueName}", "type:sqs" }; foreach (var e in requestProxy.Entries) { var entry = e.DuckCast(); if (entry != null) { // this has no effect if DSM is disabled - var dataStreamsManager = tracer.TracerManager.DataStreamsManager; scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0); // this needs to be done for context propagation even when DSM is disabled // (when DSM is enabled, it injects the pathway context on top of the trace context) @@ -148,7 +154,9 @@ internal static TResponse AfterReceive(TResponse response, Exception? var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager; if (dataStreamsManager is { IsEnabled: true }) { - var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new SqsEdgeTagCacheKey((string)state.State, isConsume: true), + static k => ["direction:in", $"topic:{k.QueueName}", "type:sqs"]); foreach (var o in response.Messages) { var message = o.DuckCast(); diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/SqsEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/SqsEdgeTagCacheKey.cs new file mode 100644 index 000000000000..dfbb1062d5a4 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/SqsEdgeTagCacheKey.cs @@ -0,0 +1,45 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS; + +/// +/// Value-type cache key for SQS edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// distinguishes produce (direction:out) from consume (direction:in) +/// so that both directions share a single cache type without key collision. +/// +internal readonly struct SqsEdgeTagCacheKey : IEquatable +{ + public readonly string QueueName; + public readonly bool IsConsume; + + public SqsEdgeTagCacheKey(string queueName, bool isConsume) + { + QueueName = queueName; + IsConsume = isConsume; + } + + public bool Equals(SqsEdgeTagCacheKey other) + => QueueName == other.QueueName && IsConsume == other.IsConsume; + + public override bool Equals(object? obj) + => obj is SqsEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (QueueName?.GetHashCode() ?? 0); + hash = (hash * 31) + IsConsume.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ProcessMessageIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ProcessMessageIntegration.cs index 712b447bb1e4..3898a0a4ce7e 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ProcessMessageIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ProcessMessageIntegration.cs @@ -34,6 +34,9 @@ public sealed class ProcessMessageIntegration { private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(ProcessMessageIntegration)); + // Used when the entity path is unknown — direction:in and type:servicebus but no topic tag + private static readonly string[] DefaultConsumeEdgeTags = ["direction:in", "type:servicebus"]; + /// /// OnMethodBegin callback /// @@ -81,11 +84,12 @@ internal static CallTargetState OnMethodBegin(TTarget instanc var namespaceString = instance.Processor.EntityPath; - // TODO: we could pool these arrays to reduce allocations // NOTE: the tags must be sorted in alphabetical order var edgeTags = string.IsNullOrEmpty(namespaceString) - ? new[] { "direction:in", "type:servicebus" } - : new[] { "direction:in", $"topic:{namespaceString}", "type:servicebus" }; + ? DefaultConsumeEdgeTags + : dataStreamsManager.GetOrCreateEdgeTags( + new ServiceBusEdgeTagCacheKey(namespaceString), + static k => ["direction:in", $"topic:{k.EntityPath}", "type:servicebus"]); var msgSize = dataStreamsManager.IsInDefaultState ? 0 : AzureServiceBusCommon.GetMessageSize(message); span.SetDataStreamsCheckpoint( dataStreamsManager, diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusEdgeTagCacheKey.cs new file mode 100644 index 000000000000..eaef6eba5703 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusEdgeTagCacheKey.cs @@ -0,0 +1,33 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus; + +/// +/// Value-type cache key for Azure Service Bus consume edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct ServiceBusEdgeTagCacheKey : IEquatable +{ + public readonly string EntityPath; + + public ServiceBusEdgeTagCacheKey(string entityPath) + { + EntityPath = entityPath; + } + + public bool Equals(ServiceBusEdgeTagCacheKey other) + => EntityPath == other.EntityPath; + + public override bool Equals(object? obj) + => obj is ServiceBusEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + => EntityPath?.GetHashCode() ?? 0; +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/GetIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/GetIntegration.cs index daea8b28f9a3..856a1812d7a7 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/GetIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/GetIntegration.cs @@ -58,7 +58,9 @@ internal static CallTargetReturn OnMethodEnd(TTarget instance, Exceptio } var queueName = IbmMqHelper.SanitizeQueueName(instance.Name); - var edgeTags = new[] { "direction:in", $"topic:{queueName}", $"type:{IbmMqConstants.QueueType}" }; + var edgeTags = dataStreams.GetOrCreateEdgeTags( + new IbmMqEdgeTagCacheKey(queueName, isConsume: true), + static k => ["direction:in", $"topic:{k.QueueName}", "type:ibmmq"]); scope.Span.SetDataStreamsCheckpoint( dataStreams, diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/IbmMqEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/IbmMqEdgeTagCacheKey.cs new file mode 100644 index 000000000000..d4c86d09adb0 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/IbmMqEdgeTagCacheKey.cs @@ -0,0 +1,45 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.IbmMq; + +/// +/// Value-type cache key for IBM MQ edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// distinguishes produce (direction:out) from consume (direction:in) +/// so that both directions share a single cache type without key collision. +/// +internal readonly struct IbmMqEdgeTagCacheKey : IEquatable +{ + public readonly string QueueName; + public readonly bool IsConsume; + + public IbmMqEdgeTagCacheKey(string queueName, bool isConsume) + { + QueueName = queueName; + IsConsume = isConsume; + } + + public bool Equals(IbmMqEdgeTagCacheKey other) + => QueueName == other.QueueName && IsConsume == other.IsConsume; + + public override bool Equals(object? obj) + => obj is IbmMqEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (QueueName?.GetHashCode() ?? 0); + hash = (hash * 31) + IsConsume.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/PutIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/PutIntegration.cs index 1140890d1d64..9817a80c8dde 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/PutIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/IbmMq/PutIntegration.cs @@ -45,7 +45,9 @@ internal static CallTargetState OnMethodBegin(TTarg if (dataStreams.IsEnabled && (instance).Instance != null && (msg).Instance != null) { var queueName = IbmMqHelper.SanitizeQueueName(instance.Name); - var edgeTags = new[] { "direction:out", $"topic:{queueName}", $"type:{IbmMqConstants.QueueType}" }; + var edgeTags = dataStreams.GetOrCreateEdgeTags( + new IbmMqEdgeTagCacheKey(queueName, isConsume: false), + static k => ["direction:out", $"topic:{k.QueueName}", "type:ibmmq"]); scope.Span.SetDataStreamsCheckpoint(dataStreams, CheckpointKind.Produce, edgeTags, msg.MessageLength, 0); dataStreams.InjectPathwayContextAsBase64String(scope.Span.Context.PathwayContext, IbmMqHelper.GetHeadersAdapter(msg)); } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ConsumeEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ConsumeEdgeTagCacheKey.cs new file mode 100644 index 000000000000..d7677514efb2 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ConsumeEdgeTagCacheKey.cs @@ -0,0 +1,46 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; + +/// +/// Value-type cache key for consume edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct ConsumeEdgeTagCacheKey : IEquatable +{ + public readonly string GroupId; + public readonly string Topic; + public readonly string ClusterId; + + public ConsumeEdgeTagCacheKey(string groupId, string topic, string clusterId) + { + GroupId = groupId; + Topic = topic; + ClusterId = clusterId; + } + + public bool Equals(ConsumeEdgeTagCacheKey other) + => GroupId == other.GroupId && Topic == other.Topic && ClusterId == other.ClusterId; + + public override bool Equals(object? obj) + => obj is ConsumeEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (GroupId?.GetHashCode() ?? 0); + hash = (hash * 31) + (Topic?.GetHashCode() ?? 0); + hash = (hash * 31) + (ClusterId?.GetHashCode() ?? 0); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs index 5b61f290ec57..a7275156b2d9 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs @@ -142,6 +142,21 @@ private static long GetMessageSize(T message) return size; } + // NOTE: tags must be sorted alphabetically — called only on edge-tag cache miss + private static string[] BuildConsumeEdgeTags(string groupId, string topic, string clusterId) + { + if (!StringUtil.IsNullOrEmpty(clusterId)) + { + return StringUtil.IsNullOrEmpty(topic) + ? ["direction:in", $"group:{groupId}", $"kafka_cluster_id:{clusterId}", "type:kafka"] + : ["direction:in", $"group:{groupId}", $"kafka_cluster_id:{clusterId}", $"topic:{topic}", "type:kafka"]; + } + + return StringUtil.IsNullOrEmpty(topic) + ? ["direction:in", $"group:{groupId}", "type:kafka"] + : ["direction:in", $"group:{groupId}", $"topic:{topic}", "type:kafka"]; + } + // NOTE: tags must be sorted alphabetically — called only on edge-tag cache miss private static string[] BuildProduceEdgeTags(string clusterId, string topic) { @@ -269,21 +284,12 @@ private static string[] BuildProduceEdgeTags(string clusterId, string topic) if (dataStreamsManager.IsEnabled) { - // TODO: we could pool these arrays to reduce allocations // NOTE: the tags must be sorted in alphabetical order - string[] edgeTags; - if (!StringUtil.IsNullOrEmpty(consumerClusterId)) - { - edgeTags = StringUtil.IsNullOrEmpty(topic) - ? new[] { "direction:in", $"group:{groupId}", $"kafka_cluster_id:{consumerClusterId}", "type:kafka" } - : new[] { "direction:in", $"group:{groupId}", $"kafka_cluster_id:{consumerClusterId}", $"topic:{topic}", "type:kafka" }; - } - else - { - edgeTags = StringUtil.IsNullOrEmpty(topic) - ? new[] { "direction:in", $"group:{groupId}", "type:kafka" } - : new[] { "direction:in", $"group:{groupId}", $"topic:{topic}", "type:kafka" }; - } + var cacheKey = new ConsumeEdgeTagCacheKey( + groupId ?? string.Empty, + topic ?? string.Empty, + consumerClusterId ?? string.Empty); + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(cacheKey, static k => BuildConsumeEdgeTags(k.GroupId, k.Topic, k.ClusterId)); span.SetDataStreamsCheckpoint( dataStreamsManager, diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQConsumeEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQConsumeEdgeTagCacheKey.cs new file mode 100644 index 000000000000..5e20858d0c50 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQConsumeEdgeTagCacheKey.cs @@ -0,0 +1,33 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ; + +/// +/// Value-type cache key for RabbitMQ consume edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct RabbitMQConsumeEdgeTagCacheKey : IEquatable +{ + public readonly string TopicOrRoutingKey; + + public RabbitMQConsumeEdgeTagCacheKey(string topicOrRoutingKey) + { + TopicOrRoutingKey = topicOrRoutingKey; + } + + public bool Equals(RabbitMQConsumeEdgeTagCacheKey other) + => TopicOrRoutingKey == other.TopicOrRoutingKey; + + public override bool Equals(object? obj) + => obj is RabbitMQConsumeEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + => TopicOrRoutingKey?.GetHashCode() ?? 0; +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQIntegration.cs index e1d6280fc3a1..1797fedb0b07 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQIntegration.cs @@ -139,11 +139,18 @@ internal static void SetDataStreamsCheckpointOnProduce(Tracer tracer, Span span, try { var headersAdapter = new RabbitMQHeadersCollectionAdapter(headers); - var edgeTags = string.IsNullOrEmpty(tags.Exchange) - ? - // exchange can be empty for "direct" - new[] { "direction:out", $"topic:{tags.Queue ?? tags.RoutingKey}", "type:rabbitmq" } - : new[] { "direction:out", $"exchange:{tags.Exchange}", string.IsNullOrEmpty(tags.RoutingKey) ? "has_routing_key:false" : "has_routing_key:true", "type:rabbitmq" }; + // exchange can be empty for "direct"; key encodes both cases without collision + var produceKey = new RabbitMQProduceEdgeTagCacheKey( + tags.Exchange ?? string.Empty, + string.IsNullOrEmpty(tags.Exchange) ? tags.Queue ?? tags.RoutingKey ?? string.Empty : string.Empty, + !string.IsNullOrEmpty(tags.RoutingKey)); + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + produceKey, + static k => string.IsNullOrEmpty(k.Exchange) + ? ["direction:out", $"topic:{k.TopicOrRoutingKey}", "type:rabbitmq"] + : k.HasRoutingKey + ? ["direction:out", $"exchange:{k.Exchange}", "has_routing_key:true", "type:rabbitmq"] + : ["direction:out", $"exchange:{k.Exchange}", "has_routing_key:false", "type:rabbitmq"]); var size = dataStreamsManager.IsInDefaultState ? messageSize : GetHeadersSize(headers) + messageSize; span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, size, 0); // DSM context will not be injected in default state if its size exceeds 128kb @@ -171,7 +178,9 @@ internal static void SetDataStreamsCheckpointOnConsume(Tracer tracer, Span span, try { var headersAdapter = new RabbitMQHeadersCollectionAdapter(headers); - var edgeTags = new[] { "direction:in", $"topic:{tags.Queue ?? tags.RoutingKey}", "type:rabbitmq" }; + var edgeTags = dataStreamsManager.GetOrCreateEdgeTags( + new RabbitMQConsumeEdgeTagCacheKey(tags.Queue ?? tags.RoutingKey ?? string.Empty), + static k => ["direction:in", $"topic:{k.TopicOrRoutingKey}", "type:rabbitmq"]); var pathwayContext = dataStreamsManager.ExtractPathwayContext(headersAdapter); span.SetDataStreamsCheckpoint( dataStreamsManager, diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQProduceEdgeTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQProduceEdgeTagCacheKey.cs new file mode 100644 index 000000000000..2fb19d159f08 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/RabbitMQProduceEdgeTagCacheKey.cs @@ -0,0 +1,48 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ; + +/// +/// Value-type cache key for RabbitMQ produce edge tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// When is empty the array uses as the topic; +/// otherwise it uses and . +/// +internal readonly struct RabbitMQProduceEdgeTagCacheKey : IEquatable +{ + public readonly string Exchange; + public readonly string TopicOrRoutingKey; + public readonly bool HasRoutingKey; + + public RabbitMQProduceEdgeTagCacheKey(string exchange, string topicOrRoutingKey, bool hasRoutingKey) + { + Exchange = exchange; + TopicOrRoutingKey = topicOrRoutingKey; + HasRoutingKey = hasRoutingKey; + } + + public bool Equals(RabbitMQProduceEdgeTagCacheKey other) + => Exchange == other.Exchange && TopicOrRoutingKey == other.TopicOrRoutingKey && HasRoutingKey == other.HasRoutingKey; + + public override bool Equals(object? obj) + => obj is RabbitMQProduceEdgeTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + (Exchange?.GetHashCode() ?? 0); + hash = (hash * 31) + (TopicOrRoutingKey?.GetHashCode() ?? 0); + hash = (hash * 31) + HasRoutingKey.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs index 8c06cfc2154d..8742ec8a52c2 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/EdgeTagCache.cs @@ -3,6 +3,8 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. // +#nullable enable + using System; using System.Collections.Concurrent; diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs index 9a6bae8ef1c4..5b39bc0e697c 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs @@ -9,7 +9,13 @@ using System.Threading; using System.Threading.Tasks; using Datadog.Trace.Agent.DiscoveryService; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SNS; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.IbmMq; using Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; +using Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ; using Datadog.Trace.Configuration; using Datadog.Trace.DataStreamsMonitoring; using Datadog.Trace.DataStreamsMonitoring.Aggregation; @@ -429,6 +435,117 @@ public void GetOrCreateEdgeTags_ReturnsDifferentArrayReferences_ForDifferentKeys forTopic2.Should().NotBeSameAs(forTopic1); } + [Fact] + public void GetOrCreateEdgeTags_KafkaConsume_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new ConsumeEdgeTagCacheKey("group1", "topic1", "cluster1"); + + var first = dsm.GetOrCreateEdgeTags(key, static k => [$"group:{k.GroupId}", $"topic:{k.Topic}", $"kafka_cluster_id:{k.ClusterId}"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => [$"group:{k.GroupId}", $"topic:{k.Topic}", $"kafka_cluster_id:{k.ClusterId}"]); + + second.Should().BeSameAs(first); + } + + [Fact] + public void GetOrCreateEdgeTags_RabbitMQProduce_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new RabbitMQProduceEdgeTagCacheKey("exchange1", string.Empty, hasRoutingKey: true); + + var first = dsm.GetOrCreateEdgeTags(key, static k => [$"exchange:{k.Exchange}", "has_routing_key:true", "type:rabbitmq"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => [$"exchange:{k.Exchange}", "has_routing_key:true", "type:rabbitmq"]); + + second.Should().BeSameAs(first); + } + + [Fact] + public void GetOrCreateEdgeTags_RabbitMQConsume_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new RabbitMQConsumeEdgeTagCacheKey("queue1"); + + var first = dsm.GetOrCreateEdgeTags(key, static k => [$"topic:{k.TopicOrRoutingKey}", "type:rabbitmq"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => [$"topic:{k.TopicOrRoutingKey}", "type:rabbitmq"]); + + second.Should().BeSameAs(first); + } + + [Fact] + public void GetOrCreateEdgeTags_IbmMq_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var produceKey = new IbmMqEdgeTagCacheKey("queue1", isConsume: false); + var consumeKey = new IbmMqEdgeTagCacheKey("queue1", isConsume: true); + + var produce1 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.QueueName}", "type:ibmmq"]); + var produce2 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.QueueName}", "type:ibmmq"]); + var consume1 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.QueueName}", "type:ibmmq"]); + var consume2 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.QueueName}", "type:ibmmq"]); + + produce2.Should().BeSameAs(produce1); + consume2.Should().BeSameAs(consume1); + consume1.Should().NotBeSameAs(produce1); // same queue but different direction → different entry + } + + [Fact] + public void GetOrCreateEdgeTags_Kinesis_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var produceKey = new KinesisEdgeTagCacheKey("stream1", isConsume: false); + var consumeKey = new KinesisEdgeTagCacheKey("stream1", isConsume: true); + + var produce1 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.StreamName}", "type:kinesis"]); + var produce2 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.StreamName}", "type:kinesis"]); + var consume1 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]); + var consume2 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]); + + produce2.Should().BeSameAs(produce1); + consume2.Should().BeSameAs(consume1); + consume1.Should().NotBeSameAs(produce1); + } + + [Fact] + public void GetOrCreateEdgeTags_Sns_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new SnsEdgeTagCacheKey("arn:topic1"); + + var first = dsm.GetOrCreateEdgeTags(key, static k => ["direction:out", $"topic:{k.TopicName}", "type:sns"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => ["direction:out", $"topic:{k.TopicName}", "type:sns"]); + + second.Should().BeSameAs(first); + } + + [Fact] + public void GetOrCreateEdgeTags_Sqs_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var produceKey = new SqsEdgeTagCacheKey("queue1", isConsume: false); + var consumeKey = new SqsEdgeTagCacheKey("queue1", isConsume: true); + + var produce1 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"]); + var produce2 = dsm.GetOrCreateEdgeTags(produceKey, static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"]); + var consume1 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.QueueName}", "type:sqs"]); + var consume2 = dsm.GetOrCreateEdgeTags(consumeKey, static k => ["direction:in", $"topic:{k.QueueName}", "type:sqs"]); + + produce2.Should().BeSameAs(produce1); + consume2.Should().BeSameAs(consume1); + consume1.Should().NotBeSameAs(produce1); + } + + [Fact] + public void GetOrCreateEdgeTags_ServiceBus_ReturnsSameArrayReference_WhenCalledTwiceWithSameKey() + { + var dsm = GetDataStreamManager(true, out _); + var key = new ServiceBusEdgeTagCacheKey("my-entity"); + + var first = dsm.GetOrCreateEdgeTags(key, static k => ["direction:in", $"topic:{k.EntityPath}", "type:servicebus"]); + var second = dsm.GetOrCreateEdgeTags(key, static k => ["direction:in", $"topic:{k.EntityPath}", "type:servicebus"]); + + second.Should().BeSameAs(first); + } + [Fact] public void GetOrCreateEdgeTags_BypassesCache_WhenAtMaxCapacity() { From 13b47c97bec6f0bce811d6eed25b3fcb0d37100f Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 14 Apr 2026 12:56:53 -0500 Subject: [PATCH 3/6] [DSM] Reduce per-message overhead in Kafka produce and consume hot paths --- .../AutoInstrumentation/Kafka/KafkaHelper.cs | 10 ++++- .../DataStreamsContextPropagator.cs | 14 ++++++ .../DataStreamsManager.cs | 22 +++++++++- .../PathwayContextEncoder.cs | 44 +++++++++++++++++++ 4 files changed, 87 insertions(+), 3 deletions(-) diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs index a7275156b2d9..e23f9d87ebba 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs @@ -299,7 +299,15 @@ private static string[] BuildProduceEdgeTags(string clusterId, string topic) tags.MessageQueueTimeMs == null ? 0 : (long)tags.MessageQueueTimeMs, pathwayContext); - message?.Headers?.Remove(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext); // remove eventual junk + // TemporaryBase64PathwayContext is only written by our consumer code when + // KafkaCreateConsumerScopeEnabled=false. When it is true (the default), the + // header is never present so the unconditional Remove performs a wasted O(n) + // linear header scan on every message. Skip it when we know it can't be there. + if (!tracer.CurrentTraceSettings.Settings.KafkaCreateConsumerScopeEnabled) + { + message?.Headers?.Remove(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext); + } + if (!tracer.CurrentTraceSettings.Settings.KafkaCreateConsumerScopeEnabled && message?.Headers is not null && span.Context.PathwayContext != null) { // write the _new_ pathway (the "consume" checkpoint that we just set above) to the headers as a way to pass its value to an eventual diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs index 827339e9df3d..7228cdfb67d1 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs @@ -110,6 +110,19 @@ internal void Inject(PathwayContext context, TCarrier headers, bool is { try { +#if NETCOREAPP3_1_OR_GREATER + // Decode directly into a stack buffer to avoid a heap allocation per consume. + Span decodedBytes = stackalloc byte[PathwayContextEncoder.MaxEncodedSize]; + var status = Base64.DecodeFromUtf8(base64Bytes, decodedBytes, out _, out int bytesWritten); + + if (status != OperationStatus.Done) + { + Log.Error("Failed to decode Base64 data streams context. OperationStatus: {Status}", status); + return null; + } + + return PathwayContextEncoder.Decode(decodedBytes.Slice(0, bytesWritten)); +#else // Calculate the maximum decoded length // Base64 encoding encodes 3 bytes of data into 4 bytes of encoded data // So the maximum decoded length is (base64Bytes.Length * 3) / 4 @@ -134,6 +147,7 @@ internal void Inject(PathwayContext context, TCarrier headers, bool is return PathwayContextEncoder.Decode(decodedBytes.AsSpan(0, bytesWritten).ToArray()); } } +#endif } catch (Exception ex) { diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs index f08eb5ecb0ae..7c210864da43 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs @@ -44,7 +44,11 @@ internal sealed class DataStreamsManager private readonly IDisposable _updateSubscription; private readonly bool _isLegacyDsmHeadersEnabled; private readonly bool _isInDefaultState; - private readonly ConditionalWeakTable _nodeHashCache = new(); + // Keyed by string[] identity (reference equality) — safe because EdgeTagCache holds strong + // references to the cached arrays (bounded by MaxEdgeTagCacheSize). + private readonly ConcurrentDictionary _nodeHashCache = + new(NodeHashCacheKeyComparer.Instance); + private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for private MutableSettings _previousMutableSettings; private string? _previousContainerTagsHash; @@ -312,7 +316,7 @@ public void InjectPathwayContextAsBase64String(PathwayContext? context // Don't blame me, blame the fact we can't do Volatile.Read with a ulong in .NET FX... var nodeHashBase = new NodeHashBase(unchecked((ulong)Volatile.Read(ref _nodeHashBase))); - var cacheEntry = _nodeHashCache.GetOrCreateValue(edgeTags); + var cacheEntry = _nodeHashCache.GetOrAdd(edgeTags, static _ => new NodeHashCacheEntry()); NodeHash nodeHash; // Fast lock-free path: snapshot is an immutable object published via a volatile field. @@ -426,6 +430,20 @@ public bool ShouldExtractSchema(Span span, string operation, out int weight) return false; } + /// + /// Reference-equality comparer for string[] keys in . + /// Two string[] objects are considered equal only when they are the same instance, + /// which is always true for the cached arrays held by . + /// + private sealed class NodeHashCacheKeyComparer : IEqualityComparer + { + internal static readonly NodeHashCacheKeyComparer Instance = new(); + + public bool Equals(string[]? x, string[]? y) => ReferenceEquals(x, y); + + public int GetHashCode(string[] obj) => System.Runtime.CompilerServices.RuntimeHelpers.GetHashCode(obj); + } + /// /// Memoized NodeHash associated with a specific edge-tag array instance and nodeHashBase value. /// The volatile field enables a lock-free fast path: callers read the diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs index 9bdada6c501d..ab2689fbc314 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs @@ -112,6 +112,50 @@ public static int EncodeInto(PathwayContext pathway, Span buffer) return new PathwayContext(new PathwayHash(hash), pathwayStartNs, edgeStartNs); } +#if NETCOREAPP3_1_OR_GREATER + /// + /// Zero-allocation alternative to : decodes directly from a + /// caller-supplied span (e.g. a stackalloc buffer). + /// + public static PathwayContext? Decode(Span bytes) + { + if (bytes.Length < 10) + { + Log.Warning("Error decoding Data Stream PathwayContext from bytes: insufficient bytes ({ByteCount})", bytes.Length); + return null; + } + + var hash = System.Buffers.Binary.BinaryPrimitives.ReadUInt64LittleEndian(bytes); + + var pathwayStartMs = VarEncodingHelper.ReadVarLongZigZag(bytes.Slice(8), out var bytesRead); + if (pathwayStartMs is null) + { + Log.Warning("Error decoding Data Stream PathwayContext from bytes: invalid pathway start"); + return null; + } + + var edgeStartMs = VarEncodingHelper.ReadVarLongZigZag(bytes.Slice(8 + bytesRead), out _); + if (edgeStartMs is null) + { + Log.Warning("Error decoding Data Stream PathwayContext from bytes: invalid edge start"); + return null; + } + + var pathwayStartNs = ToNanoseconds(pathwayStartMs.Value); + var edgeStartNs = ToNanoseconds(edgeStartMs.Value); + if (pathwayStartMs > pathwayStartNs || edgeStartMs.Value > edgeStartNs) + { + Log.Warning( + "Overflow detected in Data Stream PathwayContext from bytes: invalid pathway {PathwayMs}ms or edge {EdgeMs}ms", + pathwayStartMs, + edgeStartMs); + return null; + } + + return new PathwayContext(new PathwayHash(hash), pathwayStartNs, edgeStartNs); + } +#endif + private static long ToNanoseconds(long milliseconds) => milliseconds * 1_000_000; From ec5f36b0500eab679f9ab070eb25aabfa01dabdb Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 14 Apr 2026 15:46:54 -0500 Subject: [PATCH 4/6] Avoid context switching --- .../DataStreamsWriter.cs | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs index 9256e9f3f8e3..c4b1f9b50477 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs @@ -24,14 +24,20 @@ internal sealed class DataStreamsWriter : IDataStreamsWriter { private const TaskCreationOptions TaskOptions = TaskCreationOptions.RunContinuationsAsynchronously; + // Drain the queues when this many items accumulate OR after DrainTimeoutMs — whichever comes first. + // At the queue limit of 10_000, this leaves 50% headroom even at 2× current peak throughput. + private const int DrainThreshold = 5_000; + private const int DrainTimeoutMs = 1_000; + private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(); private readonly object _initLock = new(); private readonly long _bucketDurationMs; + private readonly BoundedConcurrentQueue _buffer = new(queueLimit: 10_000); private readonly BoundedConcurrentQueue _backlogBuffer = new(queueLimit: 10_000); private readonly BoundedConcurrentQueue _transactionBuffer = new(queueLimit: 10_000); - private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10); + private readonly ManualResetEventSlim _drainSignal = new(false); private readonly TimeSpan _flushSemaphoreWaitTime = TimeSpan.FromSeconds(1); private readonly DataStreamsAggregator _aggregator; private readonly IDiscoveryService _discoveryService; @@ -125,6 +131,11 @@ public void Add(in StatsPoint point) if (_buffer.TryEnqueue(point)) { + if (!_drainSignal.IsSet && _buffer.Count >= DrainThreshold) + { + _drainSignal.Set(); + } + return; } } @@ -143,6 +154,11 @@ public void AddTransaction(in DataStreamsTransactionInfo transaction) { if (_transactionBuffer.TryEnqueue(transaction)) { + if (!_drainSignal.IsSet && _transactionBuffer.Count >= DrainThreshold) + { + _drainSignal.Set(); + } + return; } } @@ -181,6 +197,7 @@ public async Task DisposeAsync() #endif await FlushAndCloseAsync().ConfigureAwait(false); _flushSemaphore.Dispose(); + _drainSignal.Dispose(); } private async Task FlushAndCloseAsync() @@ -190,6 +207,9 @@ private async Task FlushAndCloseAsync() return; } + // Wake ProcessQueueLoop immediately so it can observe _processExit and return. + _drainSignal.Set(); + // nothing else to do, since the writer was not fully initialized if (!Volatile.Read(ref _isInitialized) || _processTask == null || _flushTask == null) { @@ -347,7 +367,8 @@ private void ProcessQueueLoop() { while (true) { - Thread.Sleep(_waitTimeSpan); + _drainSignal.Wait(DrainTimeoutMs); + _drainSignal.Reset(); if (!_flushSemaphore.Wait(_flushSemaphoreWaitTime)) { From 110c9781d60530afd2c29a9035c43d39f1ad1dbb Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Wed, 15 Apr 2026 14:55:29 -0500 Subject: [PATCH 5/6] Adjusted DrainThreshold and DrainTimeoutMs --- .../Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs index c4b1f9b50477..9caa32f34509 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs @@ -25,9 +25,8 @@ internal sealed class DataStreamsWriter : IDataStreamsWriter private const TaskCreationOptions TaskOptions = TaskCreationOptions.RunContinuationsAsynchronously; // Drain the queues when this many items accumulate OR after DrainTimeoutMs — whichever comes first. - // At the queue limit of 10_000, this leaves 50% headroom even at 2× current peak throughput. - private const int DrainThreshold = 5_000; - private const int DrainTimeoutMs = 1_000; + private const int DrainThreshold = 1_000; + private const int DrainTimeoutMs = 500; private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(); From bb42028aece7be09cce37ceb75a31869c37343f3 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Thu, 16 Apr 2026 13:57:16 -0500 Subject: [PATCH 6/6] Cache backlog tags to avoid allocations, simplified dictionary lookups --- .../Kafka/CommitBacklogTagCacheKey.cs | 49 +++++++++++++++++++ .../KafkaConsumerCommitAllIntegration.cs | 9 ++-- .../Kafka/KafkaConsumerCommitIntegration.cs | 9 ++-- .../Kafka/KafkaProduceAsyncIntegration.cs | 9 ++-- ...kaProduceSyncDeliveryHandlerIntegration.cs | 9 ++-- .../Kafka/OffsetsCommittedCallbacks.cs | 9 ++-- .../Kafka/ProduceBacklogTagCacheKey.cs | 46 +++++++++++++++++ .../Aggregation/DataStreamsAggregator.cs | 17 ++++++- .../Aggregation/SerializableBacklogBucket.cs | 2 +- .../DataStreamsMonitoring/BacklogTagCache.cs | 22 +++++++++ .../DataStreamsManager.cs | 30 +++++++++++- 11 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/CommitBacklogTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceBacklogTagCacheKey.cs create mode 100644 tracer/src/Datadog.Trace/DataStreamsMonitoring/BacklogTagCache.cs diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/CommitBacklogTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/CommitBacklogTagCacheKey.cs new file mode 100644 index 000000000000..4da7c331cdb8 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/CommitBacklogTagCacheKey.cs @@ -0,0 +1,49 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; + +/// +/// Value-type cache key for consumer-commit backlog tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct CommitBacklogTagCacheKey : IEquatable +{ + public readonly string GroupId; + public readonly string ClusterId; + public readonly int Partition; + public readonly string Topic; + + public CommitBacklogTagCacheKey(string groupId, string clusterId, int partition, string topic) + { + GroupId = groupId; + ClusterId = clusterId; + Partition = partition; + Topic = topic; + } + + public bool Equals(CommitBacklogTagCacheKey other) + => GroupId == other.GroupId && ClusterId == other.ClusterId && Partition == other.Partition && Topic == other.Topic; + + public override bool Equals(object? obj) + => obj is CommitBacklogTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + GroupId.GetHashCode(); + hash = (hash * 31) + ClusterId.GetHashCode(); + hash = (hash * 31) + Partition; + hash = (hash * 31) + Topic.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitAllIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitAllIntegration.cs index 4dc5e08dc692..2da10f6bb3cd 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitAllIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitAllIntegration.cs @@ -37,10 +37,11 @@ internal static CallTargetReturn OnMethodEnd(TTar for (var i = 0; i < response.Count; i++) { var item = response[i]; - var backlogTags = StringUtil.IsNullOrEmpty(clusterId) - ? $"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit" - : $"consumer_group:{groupId},kafka_cluster_id:{clusterId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit"; - + var cacheKey = new CommitBacklogTagCacheKey(groupId ?? string.Empty, clusterId ?? string.Empty, item.Partition.Value, item.Topic ?? string.Empty); + var backlogTags = dataStreams.GetOrCreateBacklogTags(cacheKey, static k => + k.ClusterId.Length == 0 + ? $"consumer_group:{k.GroupId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit" + : $"consumer_group:{k.GroupId},kafka_cluster_id:{k.ClusterId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit"); dataStreams.TrackBacklog(backlogTags, item.Offset.Value); } } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitIntegration.cs index 1e79324da2f2..f361e047cb4d 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitIntegration.cs @@ -43,10 +43,11 @@ internal static CallTargetReturn OnMethodEnd(TTarget instance, Exceptio { if (offset.TryDuckCast(out var item)) { - var backlogTags = StringUtil.IsNullOrEmpty(clusterId) - ? $"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit" - : $"consumer_group:{groupId},kafka_cluster_id:{clusterId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit"; - + var cacheKey = new CommitBacklogTagCacheKey(groupId ?? string.Empty, clusterId ?? string.Empty, item.Partition.Value, item.Topic ?? string.Empty); + var backlogTags = dataStreams.GetOrCreateBacklogTags(cacheKey, static k => + k.ClusterId.Length == 0 + ? $"consumer_group:{k.GroupId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit" + : $"consumer_group:{k.GroupId},kafka_cluster_id:{k.ClusterId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit"); dataStreams.TrackBacklog(backlogTags, item.Offset.Value); } } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceAsyncIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceAsyncIntegration.cs index ee44dd563d8a..9d3d0ee6bb97 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceAsyncIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceAsyncIntegration.cs @@ -101,10 +101,11 @@ internal static TResponse OnAsyncMethodEnd(TTarget instance, var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager; if (dataStreams.IsEnabled) { - var backlogTags = tags.ClusterId is null - ? $"partition:{deliveryResult.Partition.Value},topic:{deliveryResult.Topic},type:kafka_produce" - : $"kafka_cluster_id:{tags.ClusterId},partition:{deliveryResult.Partition.Value},topic:{deliveryResult.Topic},type:kafka_produce"; - + var cacheKey = new ProduceBacklogTagCacheKey(tags.ClusterId ?? string.Empty, deliveryResult.Partition.Value, deliveryResult.Topic ?? string.Empty); + var backlogTags = dataStreams.GetOrCreateBacklogTags(cacheKey, static k => + k.ClusterId.Length == 0 + ? $"partition:{k.Partition},topic:{k.Topic},type:kafka_produce" + : $"kafka_cluster_id:{k.ClusterId},partition:{k.Partition},topic:{k.Topic},type:kafka_produce"); dataStreams.TrackBacklog(backlogTags, deliveryResult.Offset.Value); } } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceSyncDeliveryHandlerIntegration.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceSyncDeliveryHandlerIntegration.cs index 8e44cd7c8c6e..38794a7c2b2f 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceSyncDeliveryHandlerIntegration.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceSyncDeliveryHandlerIntegration.cs @@ -147,10 +147,11 @@ internal static Action WrapAction(Action + k.ClusterId.Length == 0 + ? $"partition:{k.Partition},topic:{k.Topic},type:kafka_produce" + : $"kafka_cluster_id:{k.ClusterId},partition:{k.Partition},topic:{k.Topic},type:kafka_produce"); dataStreams.TrackBacklog(backlogTags, report.Offset.Value); } } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/OffsetsCommittedCallbacks.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/OffsetsCommittedCallbacks.cs index 5d32a92d211e..9d2131691d63 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/OffsetsCommittedCallbacks.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/OffsetsCommittedCallbacks.cs @@ -42,10 +42,11 @@ public void OnDelegateEnd(object? sender, Exception? exception, object? state) for (var i = 0; i < committedOffsets?.Offsets.Count; i++) { var item = committedOffsets.Offsets[i]; - var backlogTags = StringUtil.IsNullOrEmpty(clusterId) - ? $"consumer_group:{GroupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit" - : $"consumer_group:{GroupId},kafka_cluster_id:{clusterId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit"; - + var cacheKey = new CommitBacklogTagCacheKey(GroupId ?? string.Empty, clusterId ?? string.Empty, item.Partition.Value, item.Topic ?? string.Empty); + var backlogTags = dataStreams.GetOrCreateBacklogTags(cacheKey, static k => + k.ClusterId.Length == 0 + ? $"consumer_group:{k.GroupId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit" + : $"consumer_group:{k.GroupId},kafka_cluster_id:{k.ClusterId},partition:{k.Partition},topic:{k.Topic},type:kafka_commit"); dataStreams.TrackBacklog(backlogTags, item.Offset.Value); } } diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceBacklogTagCacheKey.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceBacklogTagCacheKey.cs new file mode 100644 index 000000000000..f4205bd51827 --- /dev/null +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/ProduceBacklogTagCacheKey.cs @@ -0,0 +1,46 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; + +namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka; + +/// +/// Value-type cache key for produce backlog tags. Using a named struct avoids boxing and +/// is compatible with all supported target frameworks. +/// +internal readonly struct ProduceBacklogTagCacheKey : IEquatable +{ + public readonly string ClusterId; + public readonly int Partition; + public readonly string Topic; + + public ProduceBacklogTagCacheKey(string clusterId, int partition, string topic) + { + ClusterId = clusterId; + Partition = partition; + Topic = topic; + } + + public bool Equals(ProduceBacklogTagCacheKey other) + => ClusterId == other.ClusterId && Partition == other.Partition && Topic == other.Topic; + + public override bool Equals(object? obj) + => obj is ProduceBacklogTagCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + int hash = 17; + hash = (hash * 31) + ClusterId.GetHashCode(); + hash = (hash * 31) + Partition; + hash = (hash * 31) + Topic.GetHashCode(); + return hash; + } + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs index c1a289c93bc3..132d03d2a1bf 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Runtime.CompilerServices; using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Utils; using Datadog.Trace.SourceGenerators; @@ -59,7 +60,7 @@ public void AddBacklog(in BacklogPoint point) var currentBucketStartTime = BucketStartTimeForTimestamp(point.TimestampNs); if (!_backlogBuckets.TryGetValue(currentBucketStartTime, out var bucket)) { - bucket = new Dictionary(); + bucket = new Dictionary(StringReferenceEqualityComparer.Instance); _backlogBuckets[currentBucketStartTime] = bucket; } @@ -220,4 +221,18 @@ private long BucketStartTimeForTimestamp(long timestampNs) { return timestampNs - (timestampNs % _bucketDurationInNs); } + + /// + /// Reference-equality comparer for string keys in backlog buckets. + /// Safe to use because backlog tag strings are reference-interned via + /// . + /// + private sealed class StringReferenceEqualityComparer : IEqualityComparer + { + internal static readonly StringReferenceEqualityComparer Instance = new(); + + public bool Equals(string? x, string? y) => ReferenceEquals(x, y); + + public int GetHashCode(string obj) => RuntimeHelpers.GetHashCode(obj); + } } diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/SerializableBacklogBucket.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/SerializableBacklogBucket.cs index 306b25026d57..4f31bdd3f45d 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/SerializableBacklogBucket.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/SerializableBacklogBucket.cs @@ -16,7 +16,7 @@ internal readonly struct SerializableBacklogBucket public readonly long BucketStartTimeNs; /// - /// The stats bucket, keyed on + /// The stats bucket, keyed on by reference identity. /// public readonly Dictionary Bucket; diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/BacklogTagCache.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/BacklogTagCache.cs new file mode 100644 index 000000000000..a8590abf627a --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/BacklogTagCache.cs @@ -0,0 +1,22 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using System; +using System.Collections.Concurrent; + +namespace Datadog.Trace.DataStreamsMonitoring; + +/// +/// Process-wide cache of backlog tag strings, keyed by a caller-supplied value type. +/// One dictionary instance exists per distinct TKey type (static generic class pattern). +/// This avoids boxing and lets each integration use its own natural key shape. +/// +internal static class BacklogTagCache + where TKey : notnull, IEquatable +{ + internal static readonly ConcurrentDictionary Cache = new(); +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs index 7c210864da43..1189e785a844 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs @@ -299,7 +299,7 @@ public void InjectPathwayContextAsBase64String(PathwayContext? context try { var previousContext = parentPathway; - if (previousContext == null && LastConsumePathway.Value != null && checkpointKind == CheckpointKind.Produce) + if (previousContext == null && checkpointKind == CheckpointKind.Produce) { // We only enter here on produce: when we consume, the only thing that matters is the parent we'd have read from the inbound message, not what happened before. // We want to use the context from the previous consume (but we'll give priority to the parent passed in param if set). @@ -409,6 +409,34 @@ public string[] GetOrCreateEdgeTags(TKey key, Func factory return cache.GetOrAdd(key, factory); } + /// + /// Returns a cached backlog tag string for the given key, creating and caching it on first use. + /// On cache hits, zero heap allocations occur. The factory is only invoked on the first call + /// per unique key, making this safe to use on high-throughput hot paths. + /// Once the cache reaches entries the result is computed + /// fresh each time (no caching) to bound memory usage for high-cardinality key spaces. + /// + /// A value type (struct) used as the cache key — no boxing. + /// The cache key derived from the caller's natural identifiers. + /// A static factory that builds the backlog tag string from the key on cache miss. + public string GetOrCreateBacklogTags(TKey key, Func factory) + where TKey : notnull, IEquatable + { + var cache = BacklogTagCache.Cache; + if (cache.TryGetValue(key, out var existing)) + { + return existing; + } + + if (cache.Count >= MaxEdgeTagCacheSize) + { + // High-cardinality key space — bypass cache to prevent unbounded memory growth + return factory(key); + } + + return cache.GetOrAdd(key, factory); + } + /// /// Make sure we only extract the schema (a costly operation) on select occasions ///