Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public static void InjectTraceIntoData<TRecordRequest>(Tracer tracer, TRecordReq
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
{
var payloadSize = jsonData?.Count > 0 && record.Data != null ? record.Data.Length : 0;
var edgeTags = new[] { "direction:out", $"topic:{streamName}", "type:kinesis" };
var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(
new KinesisEdgeTagCacheKey(streamName!, isConsume: false),
static k => ["direction:out", $"topic:{k.StreamName}", "type:kinesis"]);
scope.Span.SetDataStreamsCheckpoint(
dataStreamsManager,
CheckpointKind.Produce,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance,
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager is { IsEnabled: true })
{
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" };
var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(
new KinesisEdgeTagCacheKey((string)state.State, isConsume: true),
static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]);
foreach (var o in response.Records)
{
var record = o.DuckCast<IRecord>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager is { IsEnabled: true })
{
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" };
var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(
new KinesisEdgeTagCacheKey((string)state.State, isConsume: true),
static k => ["direction:in", $"topic:{k.StreamName}", "type:kinesis"]);
foreach (var o in response.Records)
{
var record = o.DuckCast<IRecord>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// <copyright file="KinesisEdgeTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis;

/// <summary>
/// Value-type cache key for Kinesis edge tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// <see cref="IsConsume"/> distinguishes produce (direction:out) from consume (direction:in)
/// so that both directions share a single cache type without key collision.
/// </summary>
internal readonly struct KinesisEdgeTagCacheKey : IEquatable<KinesisEdgeTagCacheKey>
{
public readonly string StreamName;
public readonly bool IsConsume;

public KinesisEdgeTagCacheKey(string streamName, bool isConsume)
{
StreamName = streamName;
IsConsume = isConsume;
}

public bool Equals(KinesisEdgeTagCacheKey other)
=> StreamName == other.StreamName && IsConsume == other.IsConsume;

public override bool Equals(object? obj)
=> obj is KinesisEdgeTagCacheKey other && Equals(other);

public override int GetHashCode()
{
unchecked
{
int hash = 17;
hash = (hash * 31) + (StreamName?.GetHashCode() ?? 0);
hash = (hash * 31) + IsConsume.GetHashCode();
return hash;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public static CallTargetState BeforePublish<TPublishRequest>(TPublishRequest req
if (scope?.Span.Context is { } context && !string.IsNullOrEmpty(topicName))
{
var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
// avoid allocation if edgeTags are not going to be used
var edgeTags = dataStreamsManager is { IsEnabled: true } ? ["direction:out", $"topic:{topicName}", "type:sns"] : Array.Empty<string>();
var edgeTags = dataStreamsManager is { IsEnabled: true }
? dataStreamsManager.GetOrCreateEdgeTags(
new SnsEdgeTagCacheKey(topicName!),
static k => ["direction:out", $"topic:{k.TopicName}", "type:sns"])
: Array.Empty<string>();

if (sendType == SendType.SingleMessage)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// <copyright file="SnsEdgeTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SNS;

/// <summary>
/// Value-type cache key for SNS produce edge tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// </summary>
internal readonly struct SnsEdgeTagCacheKey : IEquatable<SnsEdgeTagCacheKey>
{
public readonly string TopicName;

public SnsEdgeTagCacheKey(string topicName)
{
TopicName = topicName;
}

public bool Equals(SnsEdgeTagCacheKey other)
=> TopicName == other.TopicName;

public override bool Equals(object? obj)
=> obj is SnsEdgeTagCacheKey other && Equals(other);

public override int GetHashCode()
=> TopicName?.GetHashCode() ?? 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ private static void InjectForSingleMessage<TSendMessageRequest>(Tracer tracer, T
var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
{
var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" };
var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(
new SqsEdgeTagCacheKey(queueName, isConsume: false),
static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"]);
scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
}

Expand All @@ -81,14 +83,18 @@ private static void InjectForBatch<TSendMessageBatchRequest>(Tracer tracer, TSen
return;
}

var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" };
var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
var edgeTags = dataStreamsManager is { IsEnabled: true }
? dataStreamsManager.GetOrCreateEdgeTags(
new SqsEdgeTagCacheKey(queueName, isConsume: false),
static k => ["direction:out", $"topic:{k.QueueName}", "type:sqs"])
: new[] { "direction:out", $"topic:{queueName}", "type:sqs" };
foreach (var e in requestProxy.Entries)
{
var entry = e.DuckCast<IContainsMessageAttributes>();
if (entry != null)
{
// this has no effect if DSM is disabled
var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
// this needs to be done for context propagation even when DSM is disabled
// (when DSM is enabled, it injects the pathway context on top of the trace context)
Expand Down Expand Up @@ -148,7 +154,9 @@ internal static TResponse AfterReceive<TResponse>(TResponse response, Exception?
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager is { IsEnabled: true })
{
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
var edgeTags = dataStreamsManager.GetOrCreateEdgeTags(
new SqsEdgeTagCacheKey((string)state.State, isConsume: true),
static k => ["direction:in", $"topic:{k.QueueName}", "type:sqs"]);
foreach (var o in response.Messages)
{
var message = o.DuckCast<IMessage>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// <copyright file="SqsEdgeTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS;

/// <summary>
/// Value-type cache key for SQS edge tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// <see cref="IsConsume"/> distinguishes produce (direction:out) from consume (direction:in)
/// so that both directions share a single cache type without key collision.
/// </summary>
internal readonly struct SqsEdgeTagCacheKey : IEquatable<SqsEdgeTagCacheKey>
{
public readonly string QueueName;
public readonly bool IsConsume;

public SqsEdgeTagCacheKey(string queueName, bool isConsume)
{
QueueName = queueName;
IsConsume = isConsume;
}

public bool Equals(SqsEdgeTagCacheKey other)
=> QueueName == other.QueueName && IsConsume == other.IsConsume;

public override bool Equals(object? obj)
=> obj is SqsEdgeTagCacheKey other && Equals(other);

public override int GetHashCode()
{
unchecked
{
int hash = 17;
hash = (hash * 31) + (QueueName?.GetHashCode() ?? 0);
hash = (hash * 31) + IsConsume.GetHashCode();
return hash;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public sealed class ProcessMessageIntegration
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(ProcessMessageIntegration));

// Used when the entity path is unknown — direction:in and type:servicebus but no topic tag
private static readonly string[] DefaultConsumeEdgeTags = ["direction:in", "type:servicebus"];

/// <summary>
/// OnMethodBegin callback
/// </summary>
Expand Down Expand Up @@ -81,11 +84,12 @@ internal static CallTargetState OnMethodBegin<TTarget, TMessage>(TTarget instanc

var namespaceString = instance.Processor.EntityPath;

// TODO: we could pool these arrays to reduce allocations
// NOTE: the tags must be sorted in alphabetical order
var edgeTags = string.IsNullOrEmpty(namespaceString)
? new[] { "direction:in", "type:servicebus" }
: new[] { "direction:in", $"topic:{namespaceString}", "type:servicebus" };
? DefaultConsumeEdgeTags
: dataStreamsManager.GetOrCreateEdgeTags(
new ServiceBusEdgeTagCacheKey(namespaceString),
static k => ["direction:in", $"topic:{k.EntityPath}", "type:servicebus"]);
var msgSize = dataStreamsManager.IsInDefaultState ? 0 : AzureServiceBusCommon.GetMessageSize(message);
span.SetDataStreamsCheckpoint(
dataStreamsManager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// <copyright file="ServiceBusEdgeTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus;

/// <summary>
/// Value-type cache key for Azure Service Bus consume edge tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// </summary>
internal readonly struct ServiceBusEdgeTagCacheKey : IEquatable<ServiceBusEdgeTagCacheKey>
{
public readonly string EntityPath;

public ServiceBusEdgeTagCacheKey(string entityPath)
{
EntityPath = entityPath;
}

public bool Equals(ServiceBusEdgeTagCacheKey other)
=> EntityPath == other.EntityPath;

public override bool Equals(object? obj)
=> obj is ServiceBusEdgeTagCacheKey other && Equals(other);

public override int GetHashCode()
=> EntityPath?.GetHashCode() ?? 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exceptio
}

var queueName = IbmMqHelper.SanitizeQueueName(instance.Name);
var edgeTags = new[] { "direction:in", $"topic:{queueName}", $"type:{IbmMqConstants.QueueType}" };
var edgeTags = dataStreams.GetOrCreateEdgeTags(
new IbmMqEdgeTagCacheKey(queueName, isConsume: true),
static k => ["direction:in", $"topic:{k.QueueName}", "type:ibmmq"]);

scope.Span.SetDataStreamsCheckpoint(
dataStreams,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// <copyright file="IbmMqEdgeTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.IbmMq;

/// <summary>
/// Value-type cache key for IBM MQ edge tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// <see cref="IsConsume"/> distinguishes produce (direction:out) from consume (direction:in)
/// so that both directions share a single cache type without key collision.
/// </summary>
internal readonly struct IbmMqEdgeTagCacheKey : IEquatable<IbmMqEdgeTagCacheKey>
{
public readonly string QueueName;
public readonly bool IsConsume;

public IbmMqEdgeTagCacheKey(string queueName, bool isConsume)
{
QueueName = queueName;
IsConsume = isConsume;
}

public bool Equals(IbmMqEdgeTagCacheKey other)
=> QueueName == other.QueueName && IsConsume == other.IsConsume;

public override bool Equals(object? obj)
=> obj is IbmMqEdgeTagCacheKey other && Equals(other);

public override int GetHashCode()
{
unchecked
{
int hash = 17;
hash = (hash * 31) + (QueueName?.GetHashCode() ?? 0);
hash = (hash * 31) + IsConsume.GetHashCode();
return hash;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ internal static CallTargetState OnMethodBegin<TTarget, TMessage, TOptions>(TTarg
if (dataStreams.IsEnabled && (instance).Instance != null && (msg).Instance != null)
{
var queueName = IbmMqHelper.SanitizeQueueName(instance.Name);
var edgeTags = new[] { "direction:out", $"topic:{queueName}", $"type:{IbmMqConstants.QueueType}" };
var edgeTags = dataStreams.GetOrCreateEdgeTags(
new IbmMqEdgeTagCacheKey(queueName, isConsume: false),
static k => ["direction:out", $"topic:{k.QueueName}", "type:ibmmq"]);
scope.Span.SetDataStreamsCheckpoint(dataStreams, CheckpointKind.Produce, edgeTags, msg.MessageLength, 0);
dataStreams.InjectPathwayContextAsBase64String(scope.Span.Context.PathwayContext, IbmMqHelper.GetHeadersAdapter(msg));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// <copyright file="CommitBacklogTagCacheKey.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Value-type cache key for consumer-commit backlog tags. Using a named struct avoids boxing and
/// is compatible with all supported target frameworks.
/// </summary>
internal readonly struct CommitBacklogTagCacheKey : IEquatable<CommitBacklogTagCacheKey>
{
public readonly string GroupId;
public readonly string ClusterId;
public readonly int Partition;
public readonly string Topic;

public CommitBacklogTagCacheKey(string groupId, string clusterId, int partition, string topic)
{
GroupId = groupId;
ClusterId = clusterId;
Partition = partition;
Topic = topic;
}

public bool Equals(CommitBacklogTagCacheKey other)
=> GroupId == other.GroupId && ClusterId == other.ClusterId && Partition == other.Partition && Topic == other.Topic;

public override bool Equals(object? obj)
=> obj is CommitBacklogTagCacheKey other && Equals(other);

public override int GetHashCode()
{
unchecked
{
int hash = 17;
hash = (hash * 31) + GroupId.GetHashCode();
hash = (hash * 31) + ClusterId.GetHashCode();
hash = (hash * 31) + Partition;
hash = (hash * 31) + Topic.GetHashCode();
return hash;
}
}
}
Loading
Loading