The metrics aggregator is bounded at every layer:
+ *
+ *
+ *
The aggregate cache caps total entries at {@code tracerMetricsMaxAggregates} (default
+ * 2048). Beyond that LRU eviction kicks in.
+ *
The producer/consumer inbox is a fixed-size MPSC queue ({@code tracerMetricsMaxPending});
+ * when full, producer {@code offer} returns false and the snapshot is dropped via {@link
+ * HealthMetrics#onStatsInboxFull()}.
+ *
Histograms use a bounded dense store -- per-histogram memory is fixed.
+ *
+ *
+ *
The benchmark hammers all of these simultaneously with 8 producer threads, unique labels per
+ * op (so the aggregate cache fills+evicts repeatedly), random durations across a wide range (so
+ * histograms accept many distinct bins), and random {@code error}/{@code topLevel} flags (so both
+ * histograms are exercised). After the run, drop counters are printed so you can see how the
+ * subsystem absorbed the burst.
+ *
+ *
What "OOM the metrics subsystem" would look like if the bounds break: producer-thread
+ * allocation would grow unbounded (snapshots faster than the inbox can drain produces dropped
+ * snapshots, not heap growth); aggregator-thread heap would grow if entries weren't capped or
+ * histograms grew past their dense-store limit.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Threads(8)
+@Fork(value = 1)
+public class AdversarialMetricsBenchmark {
+
+ private ConflatingMetricsAggregator aggregator;
+ private CountingHealthMetrics health;
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int cursor;
+ }
+
+ @Setup
+ public void setup() {
+ this.health = new CountingHealthMetrics();
+ this.aggregator =
+ new ConflatingMetricsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet()),
+ this.health,
+ new ConflatingMetricsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+ this.aggregator.start();
+ }
+
+ @TearDown
+ @SuppressForbidden
+ public void tearDown() {
+ aggregator.close();
+ // Counters accumulate across the trial (warmup + measurement iterations), since the
+ // CountingHealthMetrics instance is created once in @Setup and never reset.
+ System.err.println(
+ "[ADVERSARIAL] drops over the trial (8 threads, warmup + measurement combined):");
+ System.err.println(
+ " onStatsInboxFull = "
+ + health.inboxFull.sum()
+ + " (snapshots dropped because the MPSC inbox was full)");
+ System.err.println(
+ " onStatsAggregateDropped = "
+ + health.aggregateDropped.sum()
+ + " (snapshots dropped because the aggregate cache was full with no stale entry)");
+ }
+
+ @Benchmark
+ public void publish(ThreadState ts, Blackhole blackhole) {
+ int idx = ts.cursor++;
+ ThreadLocalRandom rng = ThreadLocalRandom.current();
+
+ // Mix indices so labels don't fall into linear order. Distinct labels exceed every reasonable
+ // working-set bound, so the aggregate cache evicts continuously and most ops force a fresh
+ // MetricKey construction on the consumer thread.
+ int scrambled = idx * 0x9E3779B1; // golden ratio multiplier
+ String service = "svc-" + (scrambled & 0xFFFF);
+ String operation = "op-" + ((scrambled >>> 8) & 0x3FFFF);
+ String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF);
+ String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF);
+ boolean error = (idx & 7) == 0;
+ boolean topLevel = (idx & 3) == 0;
+ // Wide duration spread forces histogram bins to populate broadly.
+ long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); // 1 ns .. ~1.07 s
+
+ SimpleSpan span =
+ new SimpleSpan(
+ service, operation, resource, "web", true, topLevel, error, 0, durationNanos, 200);
+ span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ span.setTag("peer.hostname", hostname);
+
+ List> trace = Collections.singletonList(span);
+ blackhole.consume(aggregator.publish(trace));
+ }
+
+ /**
+ * Counts what gets dropped. Uses {@link LongAdder} so the printed totals hold up under 8-way
+ * contention -- {@code volatile long ++} loses ~20% of updates here, which would mask the
+ * order-of-magnitude shape the bench is trying to surface (inbox-full vs aggregate-dropped).
+ */
+ static final class CountingHealthMetrics extends HealthMetrics {
+ final LongAdder inboxFull = new LongAdder();
+ final LongAdder aggregateDropped = new LongAdder();
+
+ @Override
+ public void onStatsInboxFull() {
+ inboxFull.increment();
+ }
+
+ @Override
+ public void onStatsAggregateDropped() {
+ aggregateDropped.increment();
+ }
+ }
+}
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java
new file mode 100644
index 00000000000..67caaca6ced
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java
@@ -0,0 +1,107 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import datadog.trace.api.WellKnownTags;
+import datadog.trace.common.metrics.AdversarialMetricsBenchmark.CountingHealthMetrics;
+import datadog.trace.core.CoreSpan;
+import de.thetaphi.forbiddenapis.SuppressForbidden;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Cardinality-isolation companion to {@link AdversarialMetricsBenchmark}: only the {@code
+ * peer.hostname} tag value varies; {@code service}, {@code operation}, and {@code resource} are
+ * pinned to single values. Pairing this with the adversarial bench (all four dimensions
+ * high-cardinality) and {@link HighCardinalityResourceMetricsBenchmark} (only resource
+ * high-cardinality) lets you attribute any throughput delta to a specific axis.
+ *
+ *
This isolates the peer-tag-encoding hot path: {@code PEER_TAGS_CACHE} lookups, the per-tag
+ * UTF8 encoding of {@code "name:value"}, and the parallel-array capture inside the producer's
+ * {@code SpanSnapshot} build. With {@code 0x7FFF} (~32K) distinct hostnames the cache thrashes
+ * heavily and exceeds the default {@code tracerMetricsMaxAggregates=2048} so the LRU evicts
+ * continuously.
+ *
+ *
Random {@code error}/{@code topLevel}/duration to keep histogram load comparable; only the
+ * cardinality profile changes.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Threads(8)
+@Fork(value = 1)
+public class HighCardinalityPeerMetricsBenchmark {
+
+ private ConflatingMetricsAggregator aggregator;
+ private CountingHealthMetrics health;
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int cursor;
+ }
+
+ @Setup
+ public void setup() {
+ this.health = new CountingHealthMetrics();
+ this.aggregator =
+ new ConflatingMetricsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet()),
+ this.health,
+ new ConflatingMetricsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+ this.aggregator.start();
+ }
+
+ @TearDown
+ @SuppressForbidden
+ public void tearDown() {
+ aggregator.close();
+ System.err.println(
+ "[HIGH_CARD_PEER] drops over the trial (8 threads, warmup + measurement combined):");
+ System.err.println(" onStatsInboxFull = " + health.inboxFull.sum());
+ System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum());
+ }
+
+ @Benchmark
+ public void publish(ThreadState ts, Blackhole blackhole) {
+ int idx = ts.cursor++;
+ ThreadLocalRandom rng = ThreadLocalRandom.current();
+
+ int scrambled = idx * 0x9E3779B1;
+ String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF);
+ boolean error = (idx & 7) == 0;
+ boolean topLevel = (idx & 3) == 0;
+ long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL);
+
+ SimpleSpan span =
+ new SimpleSpan("svc", "op", "res", "web", true, topLevel, error, 0, durationNanos, 200);
+ span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ span.setTag("peer.hostname", hostname);
+
+ List> trace = Collections.singletonList(span);
+ blackhole.consume(aggregator.publish(trace));
+ }
+}
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java
new file mode 100644
index 00000000000..5ae8c3a715f
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java
@@ -0,0 +1,103 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import datadog.trace.api.WellKnownTags;
+import datadog.trace.common.metrics.AdversarialMetricsBenchmark.CountingHealthMetrics;
+import datadog.trace.core.CoreSpan;
+import de.thetaphi.forbiddenapis.SuppressForbidden;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Cardinality-isolation companion to {@link AdversarialMetricsBenchmark}: only the {@code resource}
+ * dimension varies; {@code service}, {@code operation}, and {@code peer.hostname} are pinned to
+ * single values. Pairing this with the adversarial bench (all four dimensions high-cardinality) and
+ * {@link HighCardinalityPeerMetricsBenchmark} (only peer-tag high-cardinality) lets you attribute
+ * any throughput delta to a specific axis.
+ *
+ *
Same shape as the adversarial bench -- 8 producer threads, {@code 0xFFFFF} (~1M) distinct
+ * resource values which exceeds the default {@code tracerMetricsMaxAggregates=2048}, so the LRU
+ * cache evicts continuously. Random {@code error}/{@code topLevel}/duration to keep histogram load
+ * comparable; only the cardinality profile changes.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Threads(8)
+@Fork(value = 1)
+public class HighCardinalityResourceMetricsBenchmark {
+
+ private ConflatingMetricsAggregator aggregator;
+ private CountingHealthMetrics health;
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int cursor;
+ }
+
+ @Setup
+ public void setup() {
+ this.health = new CountingHealthMetrics();
+ this.aggregator =
+ new ConflatingMetricsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet()),
+ this.health,
+ new ConflatingMetricsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+ this.aggregator.start();
+ }
+
+ @TearDown
+ @SuppressForbidden
+ public void tearDown() {
+ aggregator.close();
+ System.err.println(
+ "[HIGH_CARD_RESOURCE] drops over the trial (8 threads, warmup + measurement combined):");
+ System.err.println(" onStatsInboxFull = " + health.inboxFull.sum());
+ System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum());
+ }
+
+ @Benchmark
+ public void publish(ThreadState ts, Blackhole blackhole) {
+ int idx = ts.cursor++;
+ ThreadLocalRandom rng = ThreadLocalRandom.current();
+
+ int scrambled = idx * 0x9E3779B1;
+ String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF);
+ boolean error = (idx & 7) == 0;
+ boolean topLevel = (idx & 3) == 0;
+ long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL);
+
+ SimpleSpan span =
+ new SimpleSpan("svc", "op", resource, "web", true, topLevel, error, 0, durationNanos, 200);
+ span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ span.setTag("peer.hostname", "localhost");
+
+ List> trace = Collections.singletonList(span);
+ blackhole.consume(aggregator.publish(trace));
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java
index 478ff520a37..dba66a5ab9c 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java
@@ -46,6 +46,27 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) {
return this;
}
+ /**
+ * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link
+ * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in.
+ */
+ public AggregateMetric recordOneDuration(long tagAndDuration) {
+ ++hitCount;
+ if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) {
+ tagAndDuration ^= TOP_LEVEL_TAG;
+ ++topLevelCount;
+ }
+ if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) {
+ tagAndDuration ^= ERROR_TAG;
+ errorLatencies.accept(tagAndDuration);
+ ++errorCount;
+ } else {
+ okLatencies.accept(tagAndDuration);
+ }
+ duration += tagAndDuration;
+ return this;
+ }
+
public int getErrorCount() {
return errorCount;
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
index 8a69dbc6e56..9998c21ed0b 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
@@ -1,16 +1,26 @@
package datadog.trace.common.metrics;
+import static datadog.trace.api.Functions.UTF8_ENCODE;
+import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE;
+import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER;
+import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES;
+import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import datadog.trace.api.Pair;
+import datadog.trace.api.cache.DDCache;
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.metrics.SignalItem.StopSignal;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.util.LRUCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,11 +31,8 @@ final class Aggregator implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Aggregator.class);
- private final MessagePassingQueue batchPool;
private final MessagePassingQueue inbox;
private final LRUCache aggregates;
- private final ConcurrentMap pending;
- private final Set commonKeys;
private final MetricWriter writer;
// the reporting interval controls how much history will be buffered
// when the agent is unresponsive (only 10 pending requests will be
@@ -34,6 +41,15 @@ final class Aggregator implements Runnable {
private final long sleepMillis;
+ /**
+ * Per-cycle hook run on the aggregator thread at the start of each report cycle, before the
+ * flush. Used by {@link ConflatingMetricsAggregator} to reconcile its cached peer-tag schema
+ * against {@link datadog.communication.ddagent.DDAgentFeaturesDiscovery}; running before the
+ * flush guarantees that any test awaiting {@code writer.finishBucket()} observes the schema in
+ * its post-reconcile state. May be {@code null}.
+ */
+ private final Runnable onReportCycle;
+
@SuppressFBWarnings(
value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE",
justification = "the field is confined to the agent thread running the Aggregator")
@@ -41,51 +57,56 @@ final class Aggregator implements Runnable {
Aggregator(
MetricWriter writer,
- MessagePassingQueue batchPool,
MessagePassingQueue inbox,
- ConcurrentMap pending,
- final Set commonKeys,
int maxAggregates,
long reportingInterval,
TimeUnit reportingIntervalTimeUnit,
- HealthMetrics healthMetrics) {
+ HealthMetrics healthMetrics,
+ Runnable onReportCycle) {
this(
writer,
- batchPool,
inbox,
- pending,
- commonKeys,
maxAggregates,
reportingInterval,
reportingIntervalTimeUnit,
DEFAULT_SLEEP_MILLIS,
- healthMetrics);
+ healthMetrics,
+ onReportCycle);
}
Aggregator(
MetricWriter writer,
- MessagePassingQueue batchPool,
MessagePassingQueue inbox,
- ConcurrentMap pending,
- final Set commonKeys,
int maxAggregates,
long reportingInterval,
TimeUnit reportingIntervalTimeUnit,
long sleepMillis,
- HealthMetrics healthMetrics) {
+ HealthMetrics healthMetrics,
+ Runnable onReportCycle) {
this.writer = writer;
- this.batchPool = batchPool;
this.inbox = inbox;
- this.commonKeys = commonKeys;
this.aggregates =
new LRUCache<>(
- new CommonKeyCleaner(commonKeys, healthMetrics),
- maxAggregates * 4 / 3,
- 0.75f,
- maxAggregates);
- this.pending = pending;
+ new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates);
this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval);
this.sleepMillis = sleepMillis;
+ this.onReportCycle = onReportCycle;
+ }
+
+ private static final class AggregateExpiry
+ implements LRUCache.ExpiryListener {
+ private final HealthMetrics healthMetrics;
+
+ AggregateExpiry(HealthMetrics healthMetrics) {
+ this.healthMetrics = healthMetrics;
+ }
+
+ @Override
+ public void accept(Map.Entry expired) {
+ if (expired.getValue().getHitCount() > 0) {
+ healthMetrics.onStatsAggregateDropped();
+ }
+ }
}
public void clearAggregates() {
@@ -129,21 +150,87 @@ public void accept(InboxItem item) {
} else {
signal.ignore();
}
- } else if (item instanceof Batch && !stopped) {
- Batch batch = (Batch) item;
- MetricKey key = batch.getKey();
- // important that it is still *this* batch pending, must not remove otherwise
- pending.remove(key, batch);
+ } else if (item instanceof SpanSnapshot && !stopped) {
+ SpanSnapshot snapshot = (SpanSnapshot) item;
+ MetricKey key = buildMetricKey(snapshot);
AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric());
- batch.contributeTo(aggregate);
+ aggregate.recordOneDuration(snapshot.tagAndDuration);
dirty = true;
- // return the batch for reuse
- batchPool.offer(batch);
}
}
}
+ private static MetricKey buildMetricKey(SpanSnapshot s) {
+ return new MetricKey(
+ s.resourceName,
+ SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE),
+ s.operationName,
+ s.serviceNameSource,
+ s.spanType,
+ s.httpStatusCode,
+ s.synthetic,
+ s.traceRoot,
+ SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create),
+ materializePeerTags(s.peerTagSchema, s.peerTagValues),
+ s.httpMethod,
+ s.httpEndpoint,
+ s.grpcStatusCode);
+ }
+
+ /**
+ * Encodes the per-span peer-tag values into the {@code List} the {@link
+ * MetricKey} consumes. Reads name/value pairs at the same index from the schema's names and the
+ * snapshot's values; null value slots are skipped (the span didn't set that peer tag).
+ */
+ private static List materializePeerTags(PeerTagSchema schema, String[] values) {
+ if (schema == null || values == null) {
+ return Collections.emptyList();
+ }
+ String[] names = schema.names;
+ int n = names.length;
+ // First pass: count how many tags fired and remember the first index. The single-entry case
+ // is common (e.g. INTERNAL spans only emit base.service) and gets a singletonList to avoid an
+ // ArrayList allocation on the hot path.
+ int firstHit = -1;
+ int hitCount = 0;
+ for (int i = 0; i < n; i++) {
+ if (values[i] != null) {
+ if (hitCount == 0) {
+ firstHit = i;
+ }
+ hitCount++;
+ }
+ }
+ if (hitCount == 0) {
+ return Collections.emptyList();
+ }
+ if (hitCount == 1) {
+ return Collections.singletonList(encodePeerTag(names[firstHit], values[firstHit]));
+ }
+ List tags = new ArrayList<>(hitCount);
+ for (int i = firstHit; i < n; i++) {
+ if (values[i] != null) {
+ tags.add(encodePeerTag(names[i], values[i]));
+ }
+ }
+ return tags;
+ }
+
+ private static UTF8BytesString encodePeerTag(String name, String value) {
+ final Pair, Function>
+ cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER);
+ return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight());
+ }
+
private void report(long when, SignalItem signal) {
+ // Per-cycle hook on the aggregator thread -- used by ClientStatsAggregator to reconcile the
+ // cached peer-tag schema against feature discovery. Runs before the flush so any test that
+ // awaits writer.finishBucket() observes the schema in its post-reconcile state, and so
+ // subsequent producer publishes (which may happen as soon as the flush completes) see the new
+ // schema without an additional handoff.
+ if (onReportCycle != null) {
+ onReportCycle.run();
+ }
boolean skipped = true;
if (dirty) {
try {
@@ -177,7 +264,6 @@ private void expungeStaleAggregates() {
AggregateMetric metric = pair.getValue();
if (metric.getHitCount() == 0) {
it.remove();
- commonKeys.remove(pair.getKey());
}
}
}
@@ -185,24 +271,4 @@ private void expungeStaleAggregates() {
private long wallClockTime() {
return MILLISECONDS.toNanos(System.currentTimeMillis());
}
-
- private static final class CommonKeyCleaner
- implements LRUCache.ExpiryListener {
-
- private final Set commonKeys;
- private final HealthMetrics healthMetrics;
-
- private CommonKeyCleaner(Set commonKeys, HealthMetrics healthMetrics) {
- this.commonKeys = commonKeys;
- this.healthMetrics = healthMetrics;
- }
-
- @Override
- public void accept(Map.Entry expired) {
- commonKeys.remove(expired.getKey());
- if (expired.getValue().getHitCount() > 0) {
- healthMetrics.onStatsAggregateDropped();
- }
- }
- }
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java
deleted file mode 100644
index 5f103805e98..00000000000
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package datadog.trace.common.metrics;
-
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongArray;
-
-/**
- * This is a thread-safe container for partial conflating and accumulating partial aggregates on the
- * same key.
- *
- *
Updates to an already consumed batch are rejected.
- *
- *
A batch can currently take at most 64 values. Attempts to add the 65th update will be
- * rejected.
- */
-public final class Batch implements InboxItem {
-
- private static final int MAX_BATCH_SIZE = 64;
- private static final AtomicIntegerFieldUpdater COUNT =
- AtomicIntegerFieldUpdater.newUpdater(Batch.class, "count");
- private static final AtomicIntegerFieldUpdater COMMITTED =
- AtomicIntegerFieldUpdater.newUpdater(Batch.class, "committed");
-
- /**
- * This counter has two states:
- *
- *
- *
negative: the batch has been used, must not add values
- *
otherwise: the number of values added to the batch
- *
- */
- private volatile int count = 0;
-
- /** incremented when a duration has been added. */
- private volatile int committed = 0;
-
- private MetricKey key;
- private final AtomicLongArray durations;
-
- Batch(MetricKey key) {
- this(new AtomicLongArray(MAX_BATCH_SIZE));
- this.key = key;
- }
-
- Batch() {
- this(new AtomicLongArray(MAX_BATCH_SIZE));
- }
-
- private Batch(AtomicLongArray durations) {
- this.durations = durations;
- }
-
- public MetricKey getKey() {
- return key;
- }
-
- public Batch reset(MetricKey key) {
- this.key = key;
- COUNT.lazySet(this, 0);
- return this;
- }
-
- public boolean isUsed() {
- return count < 0;
- }
-
- public boolean add(long tag, long durationNanos) {
- // technically this would be wrong if there were 2^31 unsuccessful
- // attempts to add a value, but this an acceptable risk
- int position = COUNT.getAndIncrement(this);
- if (position >= 0 && position < durations.length()) {
- durations.set(position, tag | durationNanos);
- COMMITTED.getAndIncrement(this);
- return true;
- }
- return false;
- }
-
- public void contributeTo(AggregateMetric aggregate) {
- int count = Math.min(COUNT.getAndSet(this, Integer.MIN_VALUE), MAX_BATCH_SIZE);
- if (count >= 0) {
- // wait for the duration to have been set.
- // note this mechanism only supports a single reader
- while (committed != count) {
- Thread.yield();
- }
- COMMITTED.lazySet(this, 0);
- aggregate.recordDurations(count, durations);
- }
- }
-}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
index 69f1932f2d1..dc5d698bcc1 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
@@ -2,8 +2,6 @@
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT;
import static datadog.trace.api.DDSpanTypes.RPC;
-import static datadog.trace.api.DDTags.BASE_SERVICE;
-import static datadog.trace.api.Functions.UTF8_ENCODE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
@@ -14,9 +12,6 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import datadog.common.queue.Queues;
@@ -36,12 +31,12 @@
import datadog.trace.core.SpanKindFilter;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.AgentTaskScheduler;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -54,20 +49,18 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
private static final Map DEFAULT_HEADERS =
- singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
+ Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
- private static final DDCache SERVICE_NAMES =
- DDCaches.newFixedSizeCache(32);
+ static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32);
- private static final DDCache SPAN_KINDS =
- DDCaches.newFixedSizeCache(16);
- private static final DDCache<
+ static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16);
+ static final DDCache<
String, Pair, Function>>
PEER_TAGS_CACHE =
DDCaches.newFixedSizeCache(
64); // it can be unbounded since those values are returned by the agent and should be
// under control. 64 entries is enough in this case to contain all the peer tags.
- private static final Function<
+ static final Function<
String, Pair, Function>>
PEER_TAGS_CACHE_ADDER =
key ->
@@ -91,9 +84,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
SpanKindFilter.builder().includeInternal().build();
private final Set ignoredResources;
- private final MessagePassingQueue batchPool;
- private final ConcurrentHashMap pending;
- private final ConcurrentHashMap keys;
private final Thread thread;
private final MessagePassingQueue inbox;
private final Sink sink;
@@ -104,6 +94,23 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final HealthMetrics healthMetrics;
private final boolean includeEndpointInMetrics;
+ /**
+ * Cached peer-aggregation schema. Producers read this reference once per trace and pass it
+ * through to the consumer in {@link SpanSnapshot}; they never inspect the schema's discovery
+ * state or rebuild it. Reconciliation is the aggregator thread's job: {@link
+ * #reconcilePeerTagSchema()} compares the schema's {@link PeerTagSchema#state} against {@link
+ * DDAgentFeaturesDiscovery#state()} once per reporting cycle and either updates the state in
+ * place (when the tag set is unchanged) or swaps in a freshly-built schema.
+ *
+ *
{@code null} only on the bootstrap window before {@link #bootstrapPeerTagSchema()} runs on
+ * the first publish.
+ *
+ *
{@code volatile} so the consumer's reconcile-time replacement is visible to producer
+ * threads; the schema's own internal mutable state ({@link PeerTagSchema#state}) is exercised
+ * only on the aggregator thread.
+ */
+ private volatile PeerTagSchema cachedPeerTagSchema;
+
private volatile AgentTaskScheduler.Scheduled> cancellation;
public ConflatingMetricsAggregator(
@@ -187,23 +194,18 @@ public ConflatingMetricsAggregator(
this.ignoredResources = ignoredResources;
this.includeEndpointInMetrics = includeEndpointInMetrics;
this.inbox = Queues.mpscArrayQueue(queueSize);
- this.batchPool = Queues.spmcArrayQueue(maxAggregates);
- this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3);
- this.keys = new ConcurrentHashMap<>();
this.features = features;
this.healthMetrics = healthMetric;
this.sink = sink;
this.aggregator =
new Aggregator(
metricWriter,
- batchPool,
inbox,
- pending,
- keys.keySet(),
maxAggregates,
reportingInterval,
timeUnit,
- healthMetric);
+ healthMetric,
+ this::reconcilePeerTagSchema);
this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator);
this.reportingInterval = reportingInterval;
this.reportingIntervalTimeUnit = timeUnit;
@@ -311,6 +313,19 @@ private boolean shouldComputeMetric(CoreSpan> span, boolean isTopLevel) {
}
private boolean publish(CoreSpan> span, boolean isTopLevel) {
+ // Error decision drives force-keep sampling regardless of whether the snapshot gets queued.
+ boolean error = span.getError() > 0;
+
+ // Fast-path the inbox-full case before any tag extraction or snapshot allocation. size() is
+ // approximate on jctools' MPSC queue but that's fine: if we under-estimate, we fall through
+ // and let inbox.offer be the source of truth (existing behavior); if we over-estimate, we
+ // drop a snapshot that would have fit -- acceptable, onStatsInboxFull was going to fire
+ // imminently anyway.
+ if (inbox.size() >= inbox.capacity()) {
+ healthMetrics.onStatsInboxFull();
+ return error;
+ }
+
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
@@ -330,97 +345,149 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
// CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
// tag values don't trigger a ClassCastException on the String assignment.
final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
- MetricKey newKey =
- new MetricKey(
+
+ long tagAndDuration =
+ span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);
+
+ PeerTagSchema peerTagSchema = peerTagSchemaFor(span);
+ String[] peerTagValues =
+ peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema);
+ if (peerTagValues == null) {
+ // No tags fired -- drop the schema reference so the consumer doesn't bother iterating an
+ // all-null array.
+ peerTagSchema = null;
+ }
+
+ SpanSnapshot snapshot =
+ new SpanSnapshot(
span.getResourceName(),
- SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE),
+ span.getServiceName(),
span.getOperationName(),
span.getServiceNameSource(),
spanType,
span.getHttpStatusCode(),
isSynthetic(span),
span.getParentId() == 0,
- SPAN_KINDS.computeIfAbsent(
- spanKind, UTF8BytesString::create), // save repeated utf8 conversions
- getPeerTags(span),
+ spanKind,
+ peerTagSchema,
+ peerTagValues,
httpMethod,
httpEndpoint,
- grpcStatusCode);
- MetricKey key = keys.putIfAbsent(newKey, newKey);
- if (null == key) {
- key = newKey;
+ grpcStatusCode,
+ tagAndDuration);
+ if (!inbox.offer(snapshot)) {
+ healthMetrics.onStatsInboxFull();
}
- long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);
- long durationNanos = span.getDurationNano();
- Batch batch = pending.get(key);
- if (null != batch) {
- // there is a pending batch, try to win the race to add to it
- // returning false means that either the batch can't take any
- // more data, or it has already been consumed
- if (batch.add(tag, durationNanos)) {
- // added to a pending batch prior to consumption,
- // so skip publishing to the queue (we also know
- // the key isn't rare enough to override the sampler)
- return false;
- }
- // recycle the older key
- key = batch.getKey();
- }
- batch = newBatch(key);
- batch.add(tag, durationNanos);
- // overwrite the last one if present, it was already full
- // or had been consumed by the time we tried to add to it
- pending.put(key, batch);
- // must offer to the queue after adding to pending
- inbox.offer(batch);
// force keep keys if there are errors
- return span.getError() > 0;
+ return error;
}
- private List getPeerTags(CoreSpan> span) {
+ /**
+ * Picks the peer-tag schema for a span. For internal-kind spans we always use the static {@link
+ * PeerTagSchema#INTERNAL} singleton (one entry for {@code base.service}); for {@code
+ * client}/{@code producer}/{@code consumer} kinds we use the cached peer-aggregation schema
+ * synced from {@link DDAgentFeaturesDiscovery#peerTags()}. Other kinds get {@code null}.
+ */
+ private PeerTagSchema peerTagSchemaFor(CoreSpan> span) {
if (span.isKind(PEER_AGGREGATION_KINDS)) {
- final Set eligiblePeerTags = features.peerTags();
- List peerTags = null;
- for (String peerTag : eligiblePeerTags) {
- Object value = span.unsafeGetTag(peerTag);
- if (value != null) {
- final Pair, Function>
- cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
- if (peerTags == null) {
- peerTags = new ArrayList<>(eligiblePeerTags.size());
- }
- peerTags.add(
- cacheAndCreator
- .getLeft()
- .computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
- }
- }
- return peerTags == null ? emptyList() : peerTags;
- } else if (span.isKind(INTERNAL_KIND)) {
- // in this case only the base service should be aggregated if present
- final Object baseService = span.unsafeGetTag(BASE_SERVICE);
- if (baseService != null) {
- final Pair, Function>
- cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
- return singletonList(
- cacheAndCreator
- .getLeft()
- .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
+ PeerTagSchema schema = cachedPeerTagSchema;
+ if (schema == null) {
+ schema = bootstrapPeerTagSchema();
}
+ return schema.size() > 0 ? schema : null;
}
- return emptyList();
+ if (span.isKind(INTERNAL_KIND)) {
+ return PeerTagSchema.INTERNAL;
+ }
+ return null;
}
- private static boolean isSynthetic(CoreSpan> span) {
- return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
+ /**
+ * One-time producer-side bootstrap of {@link #cachedPeerTagSchema}. Synchronized double-check
+ * guards against two producers racing on the very first publish; after this returns, {@code
+ * cachedPeerTagSchema} is non-null forever and the aggregator thread is the sole subsequent
+ * mutator (see {@link #reconcilePeerTagSchema()}).
+ */
+ private synchronized PeerTagSchema bootstrapPeerTagSchema() {
+ PeerTagSchema cached = cachedPeerTagSchema;
+ if (cached != null) {
+ return cached;
+ }
+ PeerTagSchema schema = buildPeerTagSchema();
+ cachedPeerTagSchema = schema;
+ return schema;
+ }
+
+ /**
+ * Builds a fresh {@link PeerTagSchema} from the current state of feature discovery.
+ *
+ *
Read order matters: {@code DDAgentFeaturesDiscovery} exposes {@code peerTags()} and {@code
+ * state()} as two separate accessors, each reading its volatile {@code discoveryState}
+ * independently. If a discovery refresh interleaves between the two reads, we want to be left
+ * with a schema whose embedded state is *stale* relative to its tag set rather than the other way
+ * around -- that way the next reconcile sees a state mismatch and re-runs the deep compare to
+ * pick up the change, instead of short-circuiting on a too-fresh state and missing it.
+ *
+ *
So read {@code state()} first, then {@code peerTags()}.
+ */
+ private PeerTagSchema buildPeerTagSchema() {
+ String state = features.state();
+ Set names = features.peerTags();
+ return PeerTagSchema.of(names == null ? Collections.emptySet() : names, state);
}
- private Batch newBatch(MetricKey key) {
- Batch batch = batchPool.poll();
- if (null == batch) {
- return new Batch(key);
+ /**
+ * Reconciles {@link #cachedPeerTagSchema} with the latest feature discovery. Runs on the
+ * aggregator thread once per reporting cycle via the reset hook passed to {@link Aggregator}.
+ * Cheap fast path: an equality check against the cached schema's embedded {@link
+ * DDAgentFeaturesDiscovery#state()} hash short-circuits when discovery's response hasn't changed
+ * since the schema was built. On mismatch, a set compare distinguishes "discovery response
+ * changed but peer tags are the same" (just update the cached state in place) from "tags actually
+ * changed" (build a new schema and swap the volatile reference).
+ */
+ private void reconcilePeerTagSchema() {
+ PeerTagSchema cached = cachedPeerTagSchema;
+ if (cached == null) {
+ // First reset before the first publish -- producer-side bootstrap hasn't run yet.
+ return;
+ }
+ String latestState = features.state();
+ if (Objects.equals(cached.state, latestState)) {
+ return;
}
- return batch.reset(key);
+ Set latestNames = features.peerTags();
+ Set normalized = latestNames == null ? Collections.emptySet() : latestNames;
+ if (cached.hasSameTagsAs(normalized)) {
+ cached.state = latestState;
+ } else {
+ cachedPeerTagSchema = PeerTagSchema.of(normalized, latestState);
+ }
+ }
+
+ /**
+ * Captures the span's peer-tag values into a {@code String[]} parallel to {@code schema.names}.
+ * Slots remain {@code null} for tags the span didn't set; the array itself is lazily allocated on
+ * the first hit so spans that fire no peer tags pay zero allocation. Returns {@code null} when
+ * none of the configured peer tags are set on the span.
+ */
+ private static String[] capturePeerTagValues(CoreSpan> span, PeerTagSchema schema) {
+ String[] names = schema.names;
+ int n = names.length;
+ String[] values = null;
+ for (int i = 0; i < n; i++) {
+ Object v = span.unsafeGetTag(names[i]);
+ if (v != null) {
+ if (values == null) {
+ values = new String[n];
+ }
+ values[i] = v.toString();
+ }
+ }
+ return values;
+ }
+
+ private static boolean isSynthetic(CoreSpan> span) {
+ return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
}
public void stop() {
@@ -465,8 +532,6 @@ private void disable() {
features.discover();
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
- this.pending.clear();
- this.batchPool.clear();
this.inbox.clear();
this.aggregator.clearAggregates();
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
new file mode 100644
index 00000000000..4821d1b33a4
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
@@ -0,0 +1,94 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.api.DDTags.BASE_SERVICE;
+
+import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
+import java.util.Set;
+
+/**
+ * Names of the peer-tags eligible for client-stats aggregation, packed into a flat {@code String[]}
+ * for parallel-array access by producers and the aggregator thread.
+ *
+ *
This is the minimal carrier shape used by {@link SpanSnapshot}: the producer captures per-span
+ * values into a {@code String[]} parallel to {@link #names}, and the aggregator reconstructs the
+ * encoded {@code tag:value} pairs from the same name index. It replaces the prior "flat pairs"
+ * {@code [name0, value0, name1, value1, ...]} layout, which forced a worst-case allocation +
+ * trim-and-copy on every span.
+ *
+ *
Two schemas exist:
+ *
+ *
+ *
{@link #INTERNAL} -- a singleton with one entry for {@code base.service}, used for
+ * internal-kind spans where only the base service is aggregated.
+ *
A peer-aggregation schema built via {@link #of(Set, String)} for {@code client}/{@code
+ * producer}/{@code consumer} spans. {@link ConflatingMetricsAggregator} caches the most
+ * recently built schema and reconciles it on the aggregator thread once per reporting cycle
+ * by comparing {@link #state} against {@link DDAgentFeaturesDiscovery#state()}.
+ *
+ *
+ *
This class deliberately has no cardinality limiters -- callers that need those layer them on
+ * top.
+ *
+ *
Thread-safety: {@link #names} is final and safe to read from any thread. {@link #state}
+ * is exercised only on the aggregator thread (read and updated in reconciliation); producer threads
+ * access the schema only through the volatile {@code cachedPeerTagSchema} reference in {@link
+ * ConflatingMetricsAggregator}.
+ */
+final class PeerTagSchema {
+
+ /** Singleton schema for internal-kind spans -- only {@code base.service}. */
+ static final PeerTagSchema INTERNAL =
+ // INTERNAL is never reconciled, so the state value is irrelevant.
+ new PeerTagSchema(new String[] {BASE_SERVICE}, null);
+
+ final String[] names;
+
+ /**
+ * The {@code DDAgentFeaturesDiscovery.state()} hash this schema was built from. The aggregator
+ * thread reads and updates this once per reporting cycle when reconciling against the latest
+ * discovery; producer threads never touch it. Plain (non-volatile, non-final) because the
+ * aggregator is the sole reader/writer. May be {@code null} before discovery has produced a
+ * response.
+ */
+ String state;
+
+ private PeerTagSchema(String[] names, String state) {
+ this.names = names;
+ this.state = state;
+ }
+
+ /** Builds a schema for the given peer-tag names. Order is determined by the {@link Set}. */
+ static PeerTagSchema of(Set tags, String state) {
+ return new PeerTagSchema(tags.toArray(new String[0]), state);
+ }
+
+ /**
+ * Test-only factory that takes the names array directly so tests can build a schema in a specific
+ * order without going through a {@link Set}.
+ */
+ static PeerTagSchema testSchema(String[] names) {
+ return new PeerTagSchema(names, null);
+ }
+
+ /**
+ * Whether this schema's tag names exactly match {@code other}. Used by the aggregator's reconcile
+ * path: when a feature discovery refresh changes {@link DDAgentFeaturesDiscovery#state()} but the
+ * resulting set is unchanged, the aggregator can keep this schema and just update {@link #state}
+ * instead of rebuilding.
+ */
+ boolean hasSameTagsAs(Set other) {
+ if (this.names.length != other.size()) {
+ return false;
+ }
+ for (String name : this.names) {
+ if (!other.contains(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int size() {
+ return names.length;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
new file mode 100644
index 00000000000..eb9b741cea6
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
@@ -0,0 +1,75 @@
+package datadog.trace.common.metrics;
+
+/**
+ * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw
+ * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}.
+ *
+ *
All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the
+ * aggregator thread; the producer just shuffles references.
+ */
+final class SpanSnapshot implements InboxItem {
+
+ final CharSequence resourceName;
+ final String serviceName;
+ final CharSequence operationName;
+ final CharSequence serviceNameSource;
+ final CharSequence spanType;
+ final short httpStatusCode;
+ final boolean synthetic;
+ final boolean traceRoot;
+ final String spanKind;
+
+ /**
+ * Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema
+ * carries the names in parallel-array form; {@code peerTagValues} holds the per-span tag values
+ * at the same indices.
+ */
+ final PeerTagSchema peerTagSchema;
+
+ /**
+ * Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null}
+ * entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link
+ * #peerTagSchema} is {@code null}.
+ */
+ final String[] peerTagValues;
+
+ final String httpMethod;
+ final String httpEndpoint;
+ final String grpcStatusCode;
+
+ /** Duration in nanoseconds, OR-ed with {@code ERROR_TAG} / {@code TOP_LEVEL_TAG} as needed. */
+ final long tagAndDuration;
+
+ SpanSnapshot(
+ CharSequence resourceName,
+ String serviceName,
+ CharSequence operationName,
+ CharSequence serviceNameSource,
+ CharSequence spanType,
+ short httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ String spanKind,
+ PeerTagSchema peerTagSchema,
+ String[] peerTagValues,
+ String httpMethod,
+ String httpEndpoint,
+ String grpcStatusCode,
+ long tagAndDuration) {
+ this.resourceName = resourceName;
+ this.serviceName = serviceName;
+ this.operationName = operationName;
+ this.serviceNameSource = serviceNameSource;
+ this.spanType = spanType;
+ this.httpStatusCode = httpStatusCode;
+ this.synthetic = synthetic;
+ this.traceRoot = traceRoot;
+ this.spanKind = spanKind;
+ this.peerTagSchema = peerTagSchema;
+ this.peerTagValues = peerTagValues;
+ this.httpMethod = httpMethod;
+ this.httpEndpoint = httpEndpoint;
+ this.grpcStatusCode = grpcStatusCode;
+ this.tagAndDuration = tagAndDuration;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
index 257d887029b..d1c7fe126b4 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
@@ -93,6 +93,11 @@ public void onClientStatDowngraded() {}
public void onStatsAggregateDropped() {}
+ /**
+ * Reports a single span whose stats snapshot was dropped because the aggregator inbox was full.
+ */
+ public void onStatsInboxFull() {}
+
/**
* @return Human-readable summary of the current health metrics.
*/
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
index 2df54241e56..db384a7e42e 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
@@ -98,6 +98,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
private final LongAdder clientStatsDowngrades = new LongAdder();
private final LongAdder statsAggregateDropped = new LongAdder();
+ private final LongAdder statsInboxFull = new LongAdder();
private final StatsDClient statsd;
private final long interval;
@@ -357,6 +358,11 @@ public void onStatsAggregateDropped() {
statsAggregateDropped.increment();
}
+ @Override
+ public void onStatsInboxFull() {
+ statsInboxFull.increment();
+ }
+
@Override
public void close() {
if (null != cancellation) {
@@ -374,8 +380,9 @@ private static class Flush implements AgentTaskScheduler.Task= 99
okLatencies.getMaxValue() <= 5
}
-
- def "consistent under concurrent attempts to read and write"() {
- given:
- AggregateMetric aggregate = new AggregateMetric()
- MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null)
- BlockingDeque queue = new LinkedBlockingDeque<>(1000)
- ExecutorService reader = Executors.newSingleThreadExecutor()
- int writerCount = 10
- ExecutorService writers = Executors.newFixedThreadPool(writerCount)
- CountDownLatch readerLatch = new CountDownLatch(1)
- CountDownLatch writerLatch = new CountDownLatch(writerCount)
- CountDownLatch queueEmptyLatch = new CountDownLatch(1)
-
- AtomicInteger written = new AtomicInteger(0)
-
- when:
- for (int i = 0; i < writerCount; ++i) {
- writers.submit({
- readerLatch.await()
- for (int j = 0; j < 10_000; ++j) {
- Batch batch = queue.peekLast()
- if (batch?.add(0L, 1)) {
- written.incrementAndGet()
- } else {
- queue.offer(new Batch().reset(key))
- }
- }
- writerLatch.countDown()
- })
- }
- def future = reader.submit({
- readerLatch.countDown()
- while (!Thread.currentThread().isInterrupted()) {
- Batch batch = queue.poll(100, TimeUnit.MILLISECONDS)
- if (null == batch && writerLatch.count == 0) {
- queueEmptyLatch.countDown()
- } else if (null != batch) {
- batch.contributeTo(aggregate)
- }
- }
- })
- assert writerLatch.await(10, TimeUnit.SECONDS)
- // Wait here until we know that the queue is empty
- assert queueEmptyLatch.await(10, TimeUnit.SECONDS)
- future.cancel(true)
-
- then:
- aggregate.getHitCount() == written.get()
- }
}
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
index 962ad2ce892..a95f6bcbdbc 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
@@ -255,29 +255,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
def "should create bucket for each set of peer tags"() {
setup:
+ // Peer-tag schema is reconciled with feature discovery once per reporting cycle (on the
+ // aggregator thread, in the post-report hook), not per-span on the producer. Drive two
+ // reporting cycles with different peerTags() configurations to verify the aggregator buckets
+ // each cycle by the schema that was current at publish time.
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- features.peerTags() >>> [["country"], ["country", "georegion"],]
+ features.peerTags() >>> [["country"], ["country", "georegion"]]
+ // Bump the discovered state hash so reconcile during report cycle 1 sees a mismatch and
+ // rebuilds the schema for span 2. Three calls: bootstrap (span1's publish), reconcile-during-
+ // report-1 (mismatch -> rebuild + 2nd peerTags() call), reconcile-during-report-2 (no change).
+ features.state() >>> ["state-1", "state-2", "state-2"]
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
- when:
- CountDownLatch latch = new CountDownLatch(1)
+ when: "cycle 1 -- peerTags=[country]"
+ CountDownLatch latch1 = new CountDownLatch(1)
aggregator.publish([
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
- .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"),
+ .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
+ ])
+ aggregator.report()
+ def cycle1Triggered = latch1.await(2, SECONDS)
+
+ and: "cycle 2 -- reconcile picks up peerTags=[country, georegion]"
+ CountDownLatch latch2 = new CountDownLatch(1)
+ aggregator.publish([
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
])
aggregator.report()
- def latchTriggered = latch.await(2, SECONDS)
+ def cycle2Triggered = latch2.await(2, SECONDS)
then:
- latchTriggered
- 1 * writer.startBucket(2, _, _)
+ cycle1Triggered
+ cycle2Triggered
1 * writer.add(
new MetricKey(
"resource",
@@ -314,7 +329,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
), { AggregateMetric aggregateMetric ->
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
})
- 1 * writer.finishBucket() >> { latch.countDown() }
+ 2 * writer.finishBucket() >> { latch1.countDown(); latch2.countDown() }
cleanup:
aggregator.close()
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy
index bfc1ee2f4e7..2fd8554d499 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy
@@ -2,8 +2,11 @@ package datadog.trace.common.metrics
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTraceId
+import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.CoreSpan
+import datadog.trace.core.DDSpanContext
import datadog.trace.core.MetadataConsumer
+import datadog.trace.core.SpanKindFilter
class SimpleSpan implements CoreSpan {
@@ -24,6 +27,8 @@ class SimpleSpan implements CoreSpan {
private final Map