Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public final class GeneralConfig {
public static final String TRACER_METRICS_MAX_PENDING = "trace.tracer.metrics.max.pending";
public static final String TRACER_METRICS_IGNORED_RESOURCES =
"trace.tracer.metrics.ignored.resources";
public static final String TRACE_STATS_ADDITIONAL_TAGS = "trace.stats.additional.tags";
public static final String TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT =
"trace.stats.additional.tags.cardinality.limit";

public static final String AZURE_APP_SERVICES = "azure.app.services";
public static final String INTERNAL_EXIT_ON_FAILURE = "trace.internal.exit.on.failure";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package datadog.trace.common.metrics;

import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.trace.api.WellKnownTags;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
* JMH benchmark exercising the span-derived primary tags pipeline added in CSS v1.3.0. Parallel to
* {@link AdversarialMetricsBenchmark} but configures two additional-tag keys (each with a per-key
* cardinality cap of 100) and generates unique values per op so the cap saturates fast. The
* benchmark measures the cost of:
*
* <ul>
* <li>producer-side capture: {@code ClientStatsAggregator.captureAdditionalTagValues} walks the
* schema and pulls each key via {@code unsafeGetTag}.
* <li>aggregator-side canonicalization: {@code AdditionalTagsSchema.register(i, value)} runs
* length-check, handler probe + insert, isBlockedResult check, and per-tag block-counter
* accumulation.
* <li>cycle-reset flush: at every reporting cycle, the schema fires one {@code
* HealthMetrics.onTagCardinalityBlocked(name, count)} per affected key.
* </ul>
*
* <p>The aim is not absolute throughput numbers but a regression guard for the additional-tags
* hot path: any future refactor that adds a tag-map lookup, allocates per call, or pulls the
* sentinel-materialization onto the hot path should show up as a step change here.
*/
@State(Scope.Benchmark)
@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(SECONDS)
@Threads(8)
@Fork(value = 1)
public class AdditionalTagsMetricsBenchmark {

private ClientStatsAggregator aggregator;
private AdversarialMetricsBenchmark.CountingHealthMetrics health;

@State(Scope.Thread)
public static class ThreadState {
int cursor;
}

@Setup
public void setup() {
this.health = new AdversarialMetricsBenchmark.CountingHealthMetrics();
// Two configured additional tags. The schema caps per-key cardinality at 100, so over the run
// most ops will collapse onto the per-key "<key>:blocked_by_tracer" sentinel -- the contention
// we want to measure.
AdditionalTagsSchema additionalTagsSchema =
AdditionalTagsSchema.from(
new LinkedHashSet<>(Arrays.asList("region", "tenant_id")), 100, this.health);
this.aggregator =
new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
additionalTagsSchema,
new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet()),
this.health,
new ClientStatsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
this.aggregator.start();
}

@TearDown
public void tearDown() {
aggregator.close();
System.err.println("[ADDITIONAL-TAGS] counters (across all threads, single fork):");
System.err.println(" onStatsInboxFull = " + health.inboxFull.sum());
System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum());
System.err.println(" tagCardinalityBlocked = " + health.tagCardinalityBlocked.sum());
}

@Benchmark
public void publish(ThreadState ts, Blackhole blackhole) {
int idx = ts.cursor++;
ThreadLocalRandom rng = ThreadLocalRandom.current();

// Distinct values per op -- 65k regions × 65k tenants × random durations. Cardinality cap is
// 100 per key, so the first 100 distinct values per key admit, the rest collapse to the
// blocked sentinel and increment the per-tag block counter via the schema's flush path.
int scrambled = idx * 0x9E3779B1;
String region = "region-" + ((scrambled >>> 4) & 0xFFFF);
String tenant = "tenant-" + ((scrambled >>> 16) & 0xFFFF);
long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL);

SimpleSpan span =
new SimpleSpan(
"svc", "op", "res", "web", true, true, false, 0, durationNanos, 200);
span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
span.setTag("region", region);
span.setTag("tenant_id", tenant);

List<CoreSpan<?>> trace = Collections.singletonList(span);
blackhole.consume(aggregator.publish(trace));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void publish(ThreadState ts, Blackhole blackhole) {
static final class CountingHealthMetrics extends HealthMetrics {
final LongAdder inboxFull = new LongAdder();
final LongAdder aggregateDropped = new LongAdder();
final LongAdder tagCardinalityBlocked = new LongAdder();

@Override
public void onStatsInboxFull() {
Expand All @@ -153,5 +154,10 @@ public void onStatsInboxFull() {
public void onStatsAggregateDropped() {
aggregateDropped.increment();
}

@Override
public void onTagCardinalityBlocked(String tag, long count) {
tagCardinalityBlocked.add(count);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package datadog.trace.common.metrics;

import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Schema describing the configured span-derived primary tag keys. Built once from {@code
* Config.getTraceStatsAdditionalTags()} at aggregator construction; the name + handler list isn't
* replaced at runtime, but per-cycle warn-once state lives here too.
*
* <p>Parallels {@link PeerTagSchema} for shape -- a sorted, deduped, capped {@code String[]} of
* names plus per-name {@link TagCardinalityHandler} -- but lives in {@code SpanSnapshot} and
* {@link AggregateEntry} alongside, not in place of, the peer-tag schema.
*
* <p>Each handler enforces a per-key cardinality cap; this class adds a per-value length cap on
* top, substituting the handler's {@code "<key>:blocked_by_tracer"} sentinel when either limit is
* hit. Length-blocks and cardinality-blocks each emit a one-shot warn log per reporting cycle per
* key and increment a per-tag block counter that is flushed to {@link
* HealthMetrics#onTagCardinalityBlocked(String, long)} once per affected tag at cycle reset. The
* aggregate table's {@code maxAggregates} bound prevents total-entry explosion above and beyond
* what the per-key caps allow.
*/
final class AdditionalTagsSchema {

private static final Logger log = LoggerFactory.getLogger(AdditionalTagsSchema.class);

/**
* Backend stats pipeline supports a small number of primary tag dimensions (4 by default, up to
* ~10 for elevated quotas). Configuring more than this is misuse; we drop the overflow at
* startup.
*/
static final int MAX_ADDITIONAL_TAG_KEYS = 10;

/**
* Maximum length of an additional metric tag value. Caps entry footprint + wire payload from
* stack-trace / JSON / SQL stuffed into a tag by misconfigured app code.
*/
static final int MAX_ADDITIONAL_TAG_VALUE_LENGTH = 250;

/** Singleton empty schema returned when no additional tags are configured. */
static final AdditionalTagsSchema EMPTY =
new AdditionalTagsSchema(new String[0], new TagCardinalityHandler[0], HealthMetrics.NO_OP);

private final String[] names;
private final TagCardinalityHandler[] handlers;
private final HealthMetrics healthMetrics;

// Per-cycle warn-once gating. Set.add(name) returns true exactly the first time per cycle, which
// is the only time we want to emit the warn log. Cleared in resetCardinalityHandlers.
private final Set<String> warnedCardinality = new HashSet<>();
private final Set<String> warnedLength = new HashSet<>();

/**
* Per-tag block counter, indexed in lockstep with {@link #names}. Incremented on every blocked
* value during the cycle; flushed to {@link HealthMetrics#onTagCardinalityBlocked(String, long)}
* and zeroed in {@link #resetCardinalityHandlers()}. Single statsd call per affected tag per
* cycle keeps a misconfigured high-cardinality tag from flooding the metrics pipe.
*/
private final long[] blockedCounts;

private AdditionalTagsSchema(
String[] names, TagCardinalityHandler[] handlers, HealthMetrics healthMetrics) {
this.names = names;
this.handlers = handlers;
this.healthMetrics = healthMetrics;
this.blockedCounts = new long[names.length];
}

/**
* Builds a schema from the configured tag keys. Sorts alphabetically (so the hash order matches
* the spec's requirement), dedupes, and caps at {@link #MAX_ADDITIONAL_TAG_KEYS}. Returns the
* shared empty schema when {@code configured} is null or empty.
*/
static AdditionalTagsSchema from(
Set<String> configured, int cardinalityLimit, HealthMetrics healthMetrics) {
if (configured == null || configured.isEmpty()) {
return EMPTY;
}
List<String> sorted = new ArrayList<>(configured);
Collections.sort(sorted);
if (sorted.size() > MAX_ADDITIONAL_TAG_KEYS) {
log.warn(
"Configured additional metric tag keys ({}) exceeds the supported limit of {}; "
+ "dropping extra keys: {}",
sorted.size(),
MAX_ADDITIONAL_TAG_KEYS,
sorted.subList(MAX_ADDITIONAL_TAG_KEYS, sorted.size()));
sorted = sorted.subList(0, MAX_ADDITIONAL_TAG_KEYS);
}
String[] namesArr = sorted.toArray(new String[0]);
TagCardinalityHandler[] handlers = new TagCardinalityHandler[namesArr.length];
for (int i = 0; i < namesArr.length; i++) {
handlers[i] = new TagCardinalityHandler(namesArr[i], cardinalityLimit);
}
return new AdditionalTagsSchema(namesArr, handlers, healthMetrics);
}

int size() {
return names.length;
}

String name(int i) {
return names[i];
}

/**
* Canonicalizes the additional-tag value at slot {@code i}. Returns {@link UTF8BytesString#EMPTY}
* for null inputs and the handler's {@code "<key>:blocked_by_tracer"} sentinel when the value
* exceeds the length cap or pushes the per-key cardinality budget. Increments the per-tag block
* counter on every block and emits a one-shot warn log per cycle per key on each kind of block;
* the counter is flushed to {@link HealthMetrics} in {@link #resetCardinalityHandlers()}.
*/
UTF8BytesString register(int i, String value) {
TagCardinalityHandler handler = handlers[i];
String name = names[i];
if (value != null && value.length() > MAX_ADDITIONAL_TAG_VALUE_LENGTH) {
blockedCounts[i]++;
if (warnedLength.add(name)) {
log.warn(
"Value length of {} exceeded the cap of {} for additional metric tag '{}'; the value"
+ " is reported as 'blocked_by_tracer' until the next reporting cycle",
value.length(),
MAX_ADDITIONAL_TAG_VALUE_LENGTH,
name);
}
return handler.blockedSentinel();
}
UTF8BytesString result = handler.register(value);
if (handler.isBlockedResult(result)) {
blockedCounts[i]++;
if (warnedCardinality.add(name)) {
log.warn(
"Cardinality limit reached for additional metric tag '{}'; further values are reported"
+ " as 'blocked_by_tracer' until the next reporting cycle",
name);
}
}
return result;
}

/**
* Resets every per-key handler's working set, flushes accumulated per-tag block counts to
* {@link HealthMetrics}, and clears the per-cycle warn-once tracking.
*/
void resetCardinalityHandlers() {
for (int i = 0; i < handlers.length; i++) {
handlers[i].reset();
if (blockedCounts[i] > 0) {
healthMetrics.onTagCardinalityBlocked(names[i], blockedCounts[i]);
blockedCounts[i] = 0;
}
}
warnedCardinality.clear();
warnedLength.clear();
}
}
Loading