-
Notifications
You must be signed in to change notification settings - Fork 336
Use bitmask SpanKindFilter for per-span eligibility in metrics aggregator #11380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bc42093
808d63d
6aa620e
a02d0a9
ed38f18
3355865
5bdef61
9a72d31
78e1497
f9822a5
da126a3
6f66211
e77add0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| package datadog.trace.common.metrics; | ||
|
|
||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; | ||
| import static java.util.concurrent.TimeUnit.MICROSECONDS; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
||
| import datadog.communication.ddagent.DDAgentFeaturesDiscovery; | ||
| import datadog.trace.api.WellKnownTags; | ||
| import datadog.trace.common.writer.Writer; | ||
| import datadog.trace.core.CoreSpan; | ||
| import datadog.trace.core.CoreTracer; | ||
| import datadog.trace.core.DDSpan; | ||
| import datadog.trace.core.monitor.HealthMetrics; | ||
| import datadog.trace.util.Strings; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| /** | ||
| * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances | ||
| * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link | ||
| * CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's | ||
| * dispatch. | ||
| * | ||
| * <p>SpanKindFilter rollout result vs. the pre-bitmask code on master: ~1.3% faster on the | ||
| * production path, with tighter fork-to-fork variance. The CIs overlap so the headline number sits | ||
| * inside noise, but the centers move the right way and the new path is structurally cheaper (byte | ||
| * read + bit-test vs tag-map read + HashSet.contains). <code> | ||
| * MacBook M1 (Java 21), 2 forks x 5 iterations x 15s, AverageTime | ||
| * | ||
| * Branch Score (avg) CI (99.9%) | ||
| * master 6.428 ± 0.189 µs/op [6.239, 6.617] | ||
| * this branch 6.343 ± 0.115 µs/op [6.228, 6.458] | ||
| * </code> | ||
| */ | ||
| @State(Scope.Benchmark) | ||
| @Warmup(iterations = 1, time = 30, timeUnit = SECONDS) | ||
| @Measurement(iterations = 3, time = 30, timeUnit = SECONDS) | ||
| @BenchmarkMode(Mode.AverageTime) | ||
| @OutputTimeUnit(MICROSECONDS) | ||
| @Fork(value = 1) | ||
| public class ConflatingMetricsAggregatorDDSpanBenchmark { | ||
|
|
||
| private static final CoreTracer TRACER = | ||
| CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build(); | ||
|
|
||
| private final DDAgentFeaturesDiscovery featuresDiscovery = | ||
| new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( | ||
| Collections.singleton("peer.hostname"), Collections.emptySet()); | ||
| private final ConflatingMetricsAggregator aggregator = | ||
| new ConflatingMetricsAggregator( | ||
| new WellKnownTags("", "", "", "", "", ""), | ||
| Collections.emptySet(), | ||
| featuresDiscovery, | ||
| HealthMetrics.NO_OP, | ||
| new ConflatingMetricsAggregatorBenchmark.NullSink(), | ||
| 2048, | ||
| 2048, | ||
| false); | ||
| private final List<CoreSpan<?>> spans = generateTrace(64); | ||
|
|
||
| static List<CoreSpan<?>> generateTrace(int len) { | ||
| final List<CoreSpan<?>> trace = new ArrayList<>(); | ||
| for (int i = 0; i < len; i++) { | ||
| DDSpan span = (DDSpan) TRACER.startSpan("benchmark", "op"); | ||
| span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); | ||
| span.setTag("peer.hostname", Strings.random(10)); | ||
| // Fix duration; bypasses the wall clock and avoids per-fork drift. | ||
| span.finishWithDuration(10); | ||
| trace.add(span); | ||
| } | ||
| return trace; | ||
| } | ||
|
|
||
| static class NoopWriter implements Writer { | ||
| @Override | ||
| public void write(List<DDSpan> trace) {} | ||
|
|
||
| @Override | ||
| public void start() {} | ||
|
|
||
| @Override | ||
| public boolean flush() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() {} | ||
|
|
||
| @Override | ||
| public void incrementDropCounts(int spanCount) {} | ||
| } | ||
|
|
||
| @Benchmark | ||
| public void benchmark(Blackhole blackhole) { | ||
| blackhole.consume(aggregator.publish(spans)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,19 +7,16 @@ | |
| import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; | ||
| import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; | ||
| import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; | ||
| import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; | ||
| import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; | ||
| import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; | ||
| import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; | ||
| import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; | ||
| import static datadog.trace.util.AgentThreadFactory.newAgentThread; | ||
| import static java.util.Collections.unmodifiableSet; | ||
| import static java.util.Collections.emptyList; | ||
| import static java.util.Collections.singletonList; | ||
| import static java.util.Collections.singletonMap; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
||
| import datadog.common.queue.Queues; | ||
|
|
@@ -36,12 +33,10 @@ | |
| import datadog.trace.common.writer.ddagent.DDAgentApi; | ||
| import datadog.trace.core.CoreSpan; | ||
| import datadog.trace.core.DDTraceCoreInfo; | ||
| import datadog.trace.core.SpanKindFilter; | ||
| import datadog.trace.core.monitor.HealthMetrics; | ||
| import datadog.trace.util.AgentTaskScheduler; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
@@ -50,7 +45,6 @@ | |
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Function; | ||
| import javax.annotation.Nonnull; | ||
| import org.jctools.queues.MessagePassingQueue; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -60,7 +54,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve | |
| private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class); | ||
|
|
||
| private static final Map<String, String> DEFAULT_HEADERS = | ||
| Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); | ||
| singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); | ||
|
|
||
| private static final DDCache<String, UTF8BytesString> SERVICE_NAMES = | ||
| DDCaches.newFixedSizeCache(32); | ||
|
|
@@ -82,15 +76,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve | |
| value -> UTF8BytesString.create(key + ":" + value)); | ||
| private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; | ||
|
|
||
| private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS = | ||
| unmodifiableSet( | ||
| new HashSet<>( | ||
| Arrays.asList( | ||
| SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); | ||
| private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = | ||
| SpanKindFilter.builder() | ||
| .includeServer() | ||
| .includeClient() | ||
| .includeProducer() | ||
| .includeConsumer() | ||
| .build(); | ||
|
|
||
| private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = | ||
| unmodifiableSet( | ||
| new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); | ||
| private static final SpanKindFilter PEER_AGGREGATION_KINDS = | ||
| SpanKindFilter.builder().includeClient().includeProducer().includeConsumer().build(); | ||
|
|
||
| private static final SpanKindFilter INTERNAL_KIND = | ||
| SpanKindFilter.builder().includeInternal().build(); | ||
|
|
||
| private final Set<String> ignoredResources; | ||
| private final MessagePassingQueue<Batch> batchPool; | ||
|
|
@@ -289,36 +287,30 @@ public boolean publish(List<? extends CoreSpan<?>> trace) { | |
| if (features.supportsMetrics()) { | ||
| for (CoreSpan<?> span : trace) { | ||
| boolean isTopLevel = span.isTopLevel(); | ||
| final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, ""); | ||
| if (shouldComputeMetric(span, spanKind)) { | ||
| if (shouldComputeMetric(span, isTopLevel)) { | ||
| final CharSequence resourceName = span.getResourceName(); | ||
| if (resourceName != null && ignoredResources.contains(resourceName.toString())) { | ||
| // skip publishing all children | ||
| forceKeep = false; | ||
| break; | ||
| } | ||
| counted++; | ||
| forceKeep |= publish(span, isTopLevel, spanKind); | ||
| forceKeep |= publish(span, isTopLevel); | ||
| } | ||
| } | ||
| healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); | ||
| } | ||
| return forceKeep; | ||
| } | ||
|
|
||
| private boolean shouldComputeMetric(CoreSpan<?> span, @Nonnull CharSequence spanKind) { | ||
| return (span.isMeasured() || span.isTopLevel() || spanKindEligible(spanKind)) | ||
| private boolean shouldComputeMetric(CoreSpan<?> span, boolean isTopLevel) { | ||
| return (span.isMeasured() || isTopLevel || span.isKind(METRICS_ELIGIBLE_KINDS)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a span is built with Useful? React with 👍 / 👎. |
||
| && span.getLongRunningVersion() | ||
| <= 0 // either not long-running or unpublished long-running span | ||
| && span.getDurationNano() > 0; | ||
| } | ||
|
|
||
| private boolean spanKindEligible(@Nonnull CharSequence spanKind) { | ||
| // use toString since it could be a CharSequence... | ||
| return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); | ||
| } | ||
|
|
||
| private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanKind) { | ||
| private boolean publish(CoreSpan<?> span, boolean isTopLevel) { | ||
| // Extract HTTP method and endpoint only if the feature is enabled | ||
| String httpMethod = null; | ||
| String httpEndpoint = null; | ||
|
|
@@ -335,6 +327,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK | |
| Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); | ||
| grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; | ||
| } | ||
| // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString | ||
| // tag values don't trigger a ClassCastException on the String assignment. | ||
| final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); | ||
| MetricKey newKey = | ||
| new MetricKey( | ||
| span.getResourceName(), | ||
|
|
@@ -347,7 +342,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK | |
| span.getParentId() == 0, | ||
| SPAN_KINDS.computeIfAbsent( | ||
| spanKind, UTF8BytesString::create), // save repeated utf8 conversions | ||
| getPeerTags(span, spanKind.toString()), | ||
| getPeerTags(span), | ||
| httpMethod, | ||
| httpEndpoint, | ||
| grpcStatusCode); | ||
|
|
@@ -382,35 +377,38 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK | |
| return span.getError() > 0; | ||
| } | ||
|
|
||
| private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) { | ||
| if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { | ||
| private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) { | ||
| if (span.isKind(PEER_AGGREGATION_KINDS)) { | ||
| final Set<String> eligiblePeerTags = features.peerTags(); | ||
| List<UTF8BytesString> peerTags = new ArrayList<>(eligiblePeerTags.size()); | ||
| List<UTF8BytesString> peerTags = null; | ||
| for (String peerTag : eligiblePeerTags) { | ||
| Object value = span.unsafeGetTag(peerTag); | ||
| if (value != null) { | ||
| final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> | ||
| cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); | ||
| if (peerTags == null) { | ||
| peerTags = new ArrayList<>(eligiblePeerTags.size()); | ||
| } | ||
| peerTags.add( | ||
| cacheAndCreator | ||
| .getLeft() | ||
| .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); | ||
| } | ||
| } | ||
| return peerTags; | ||
| } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { | ||
| return peerTags == null ? emptyList() : peerTags; | ||
| } else if (span.isKind(INTERNAL_KIND)) { | ||
| // in this case only the base service should be aggregated if present | ||
| final Object baseService = span.unsafeGetTag(BASE_SERVICE); | ||
| if (baseService != null) { | ||
| final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> | ||
| cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); | ||
| return Collections.singletonList( | ||
| return singletonList( | ||
| cacheAndCreator | ||
| .getLeft() | ||
| .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); | ||
| } | ||
| } | ||
| return Collections.emptyList(); | ||
| return emptyList(); | ||
| } | ||
|
|
||
| private static boolean isSynthetic(CoreSpan<?> span) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.