Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datadog.trace.common.metrics;

import static datadog.trace.api.ProtocolVersion.V0_4;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -52,6 +54,7 @@ static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1);
span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
span.setTag("peer.hostname", Strings.random(10));
trace.add(span);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package datadog.trace.common.metrics;

import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.trace.api.WellKnownTags;
import datadog.trace.common.writer.Writer;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.CoreTracer;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.Strings;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
* Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances
* instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's
* dispatch.
*
* <p>SpanKindFilter rollout result vs. the pre-bitmask code on master: ~1.3% faster on the
* production path, with tighter fork-to-fork variance. The CIs overlap so the headline number sits
* inside noise, but the centers move the right way and the new path is structurally cheaper (byte
* read + bit-test vs tag-map read + HashSet.contains). <code>
* MacBook M1 (Java 21), 2 forks x 5 iterations x 15s, AverageTime
*
* Branch Score (avg) CI (99.9%)
* master 6.428 ± 0.189 µs/op [6.239, 6.617]
* this branch 6.343 ± 0.115 µs/op [6.228, 6.458]
* </code>
*/
@State(Scope.Benchmark)
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
@Measurement(iterations = 3, time = 30, timeUnit = SECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorDDSpanBenchmark {
Comment thread
dougqh marked this conversation as resolved.

private static final CoreTracer TRACER =
CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();

private final DDAgentFeaturesDiscovery featuresDiscovery =
new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
new ConflatingMetricsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
private final List<CoreSpan<?>> spans = generateTrace(64);

static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
DDSpan span = (DDSpan) TRACER.startSpan("benchmark", "op");
span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
span.setTag("peer.hostname", Strings.random(10));
// Fix duration; bypasses the wall clock and avoids per-fork drift.
span.finishWithDuration(10);
trace.add(span);
}
return trace;
}

static class NoopWriter implements Writer {
@Override
public void write(List<DDSpan> trace) {}

@Override
public void start() {}

@Override
public boolean flush() {
return true;
}

@Override
public void close() {}

@Override
public void incrementDropCounts(int spanCount) {}
}

@Benchmark
public void benchmark(Blackhole blackhole) {
blackhole.consume(aggregator.publish(spans));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT;
import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Collections.unmodifiableSet;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.common.queue.Queues;
Expand All @@ -36,12 +33,10 @@
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.core.SpanKindFilter;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,7 +45,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -60,7 +54,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);

private static final Map<String, String> DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);

private static final DDCache<String, UTF8BytesString> SERVICE_NAMES =
DDCaches.newFixedSizeCache(32);
Expand All @@ -82,15 +76,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
value -> UTF8BytesString.create(key + ":" + value));
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
unmodifiableSet(
new HashSet<>(
Arrays.asList(
SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER)));
private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
.includeServer()
.includeClient()
.includeProducer()
.includeConsumer()
.build();

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
unmodifiableSet(
new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER)));
private static final SpanKindFilter PEER_AGGREGATION_KINDS =
SpanKindFilter.builder().includeClient().includeProducer().includeConsumer().build();

private static final SpanKindFilter INTERNAL_KIND =
SpanKindFilter.builder().includeInternal().build();

private final Set<String> ignoredResources;
private final MessagePassingQueue<Batch> batchPool;
Expand Down Expand Up @@ -289,36 +287,30 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
if (features.supportsMetrics()) {
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, "");
if (shouldComputeMetric(span, spanKind)) {
if (shouldComputeMetric(span, isTopLevel)) {
final CharSequence resourceName = span.getResourceName();
if (resourceName != null && ignoredResources.contains(resourceName.toString())) {
// skip publishing all children
forceKeep = false;
break;
}
counted++;
forceKeep |= publish(span, isTopLevel, spanKind);
forceKeep |= publish(span, isTopLevel);
}
}
healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
}
return forceKeep;
}

private boolean shouldComputeMetric(CoreSpan<?> span, @Nonnull CharSequence spanKind) {
return (span.isMeasured() || span.isTopLevel() || spanKindEligible(spanKind))
private boolean shouldComputeMetric(CoreSpan<?> span, boolean isTopLevel) {
return (span.isMeasured() || isTopLevel || span.isKind(METRICS_ELIGIBLE_KINDS))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep span.kind cache in sync before eligibility checks

When a span is built with span.kind and then removes it in the same builder ledger (for example withTag(SPAN_KIND, "client").withTag(SPAN_KIND, null)), DDSpanContext.setAllTags(TagMap.Ledger) removes the tag from unsafeTags but does not clear the cached ordinal. This new eligibility check uses that cached ordinal, so such child spans are now counted and peer-aggregated even though the serialized/current span.kind is absent; the previous code read the tag map and skipped them. Either the ledger-removal path needs to update the cache, or this check should not rely on stale cached state.

Useful? React with 👍 / 👎.

&& span.getLongRunningVersion()
<= 0 // either not long-running or unpublished long-running span
&& span.getDurationNano() > 0;
}

private boolean spanKindEligible(@Nonnull CharSequence spanKind) {
// use toString since it could be a CharSequence...
return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanKind) {
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
Expand All @@ -335,6 +327,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
// CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
// tag values don't trigger a ClassCastException on the String assignment.
final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
MetricKey newKey =
new MetricKey(
span.getResourceName(),
Expand All @@ -347,7 +342,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
span.getParentId() == 0,
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()),
getPeerTags(span),
httpMethod,
httpEndpoint,
grpcStatusCode);
Expand Down Expand Up @@ -382,35 +377,38 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
return span.getError() > 0;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) {
if (span.isKind(PEER_AGGREGATION_KINDS)) {
final Set<String> eligiblePeerTags = features.peerTags();
List<UTF8BytesString> peerTags = new ArrayList<>(eligiblePeerTags.size());
List<UTF8BytesString> peerTags = null;
for (String peerTag : eligiblePeerTags) {
Object value = span.unsafeGetTag(peerTag);
if (value != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
if (peerTags == null) {
peerTags = new ArrayList<>(eligiblePeerTags.size());
}
peerTags.add(
cacheAndCreator
.getLeft()
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
}
}
return peerTags;
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
return peerTags == null ? emptyList() : peerTags;
} else if (span.isKind(INTERNAL_KIND)) {
// in this case only the base service should be aggregated if present
final Object baseService = span.unsafeGetTag(BASE_SERVICE);
if (baseService != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
return Collections.singletonList(
return singletonList(
cacheAndCreator
.getLeft()
.computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
}
}
return Collections.emptyList();
return emptyList();
}

private static boolean isSynthetic(CoreSpan<?> span) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.core;

import datadog.trace.api.DDTraceId;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.util.Map;

public interface CoreSpan<T extends CoreSpan<T>> {
Expand Down Expand Up @@ -80,6 +81,11 @@ default <U> U unsafeGetTag(CharSequence name) {

boolean isForceKeep();

default boolean isKind(SpanKindFilter filter) {
Object kind = unsafeGetTag(Tags.SPAN_KIND);
return filter.matches(kind == null ? null : kind.toString());
}

CharSequence getType();

/**
Expand Down
5 changes: 5 additions & 0 deletions dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,11 @@ public boolean isOutbound() {
return ordinal == DDSpanContext.SPAN_KIND_CLIENT || ordinal == DDSpanContext.SPAN_KIND_PRODUCER;
}

@Override
public boolean isKind(SpanKindFilter filter) {
Comment thread
dougqh marked this conversation as resolved.
return filter.matches(context.getSpanKindOrdinal());
}

@Override
public void copyPropagationAndBaggage(final AgentSpan source) {
if (source instanceof DDSpan) {
Expand Down
Loading