Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public void methodAdvice(MethodTransformer transformer) {
.and(named("update"))
.and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))),
MetadataInstrumentation.class.getName() + "$MetadataUpdate22AndAfterAdvice");
transformer.applyAdvice(
isMethod().and(named("failedUpdate")),
MetadataInstrumentation.class.getName() + "$FailedUpdateAdvice");
}

public static class MetadataUpdateBefore22Advice {
Expand All @@ -103,6 +106,21 @@ public static void muzzleCheck(ConsumerRecord record) {
}
}

public static class FailedUpdateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This final Metadata metadata) {
MetadataState state =
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
if (state != null) {
KafkaConfigHelper.reportPendingConfigAsFailed(state);
}
}

public static void muzzleCheck(ConsumerRecord record) {
record.headers();
}
}

public static class MetadataUpdate22AndAfterAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ public void methodAdvice(MethodTransformer transformer) {
.and(named("update"))
.and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))),
packageName + ".MetadataUpdate22AndAfterAdvice");
transformer.applyAdvice(
isMethod().and(named("failedUpdate")), packageName + ".MetadataFailedUpdateAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package datadog.trace.instrumentation.kafka_clients38;

import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
import datadog.trace.instrumentation.kafka_common.MetadataState;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class MetadataFailedUpdateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This final Metadata metadata) {
MetadataState state =
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
if (state != null) {
KafkaConfigHelper.reportPendingConfigAsFailed(state);
}
}

public static void muzzleCheck(ConsumerRecord record) {
record.headers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public class KafkaConfigHelper {
"socket.connection.setup.timeout.ms",
"socket.connection.setup.timeout.max.ms",
"security.protocol",
// Non-secret auth selectors — values name the mechanism / algorithm in use,
// never carry credentials. Useful for spotting typos like SCRAM-SHA-256 vs -512.
"sasl.mechanism",
"sasl.kerberos.service.name",
"sasl.login.callback.handler.class",
"ssl.protocol",
"ssl.enabled.protocols",
"ssl.endpoint.identification.algorithm",
"ssl.truststore.type",
"ssl.keystore.type",
"ssl.cipher.suites",
"metrics.sample.window.ms",
"metrics.num.samples",
"metrics.recording.level",
Expand Down Expand Up @@ -107,13 +118,28 @@ public static void storePendingConsumerConfig(

/** Called from metadata update advice when the cluster ID becomes available. */
public static void reportPendingConfig(MetadataState state, String clusterId) {
reportPendingConfig(state, clusterId, PendingConfig.STATUS_CONNECTED);
}

/** Called from failure advice when the client cannot reach / authenticate to the cluster. */
public static void reportPendingConfigAsFailed(MetadataState state) {
// clusterId may be unknown on auth/connect failure — emit with whatever we have (often "")
reportPendingConfig(state, state.clusterId, PendingConfig.STATUS_FAILED);
}

private static void reportPendingConfig(MetadataState state, String clusterId, String status) {
PendingConfig pending = state.takePendingConfig();
if (pending != null) {
log.debug("Received cluster ID, reporting {} config", pending.type);
log.debug("Reporting {} config with status={}", pending.type, status);
if (Config.get().isDataStreamsEnabled()) {
AgentTracer.get()
.getDataStreamsMonitoring()
.reportKafkaConfig(pending.type, clusterId, pending.consumerGroup, pending.config);
.reportKafkaConfig(
pending.type,
clusterId != null ? clusterId : "",
pending.consumerGroup,
pending.config,
status);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.Map;

/** Holds pending Kafka config info until the cluster ID becomes available from metadata. */
/** Holds pending Kafka config info until the client's connection lifecycle resolves. */
public class PendingConfig {
public static final String STATUS_CONNECTED = "connected";
public static final String STATUS_FAILED = "failed";

public final String type;
public final String consumerGroup;
public final Map<String, String> config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,18 @@ public void reportSchemaRegistryUsage(

@Override
public void reportKafkaConfig(
String type, String kafkaClusterId, String consumerGroup, Map<String, String> config) {
String type,
String kafkaClusterId,
String consumerGroup,
Map<String, String> config,
String connectionStatus) {
inbox.offer(
new KafkaConfigReport(
type,
kafkaClusterId,
consumerGroup,
config,
connectionStatus,
timeSource.getCurrentTimeNanos(),
getThreadServiceName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] CONFIG_KAFKA_CLUSTER_ID = "KafkaClusterId".getBytes(ISO_8859_1);
private static final byte[] CONFIG_CONSUMER_GROUP = "ConsumerGroup".getBytes(ISO_8859_1);
private static final byte[] CONFIG_ENTRIES = "Config".getBytes(ISO_8859_1);
private static final byte[] CONFIG_CONNECTION_STATUS = "ConnectionStatus".getBytes(ISO_8859_1);

private static final int INITIAL_CAPACITY = 512 * 1024;

Expand Down Expand Up @@ -290,7 +291,7 @@ private void writeKafkaConfigs(List<KafkaConfigReport> configs, Writable packer)
packer.writeUTF8(CONFIGS);
packer.startArray(configs.size());
for (KafkaConfigReport config : configs) {
packer.startMap(4); // Type, KafkaClusterId, ConsumerGroup, Config
packer.startMap(5); // Type, KafkaClusterId, ConsumerGroup, ConnectionStatus, Config

packer.writeUTF8(CONFIG_TYPE);
packer.writeString(config.getType(), null);
Expand All @@ -301,6 +302,9 @@ private void writeKafkaConfigs(List<KafkaConfigReport> configs, Writable packer)
packer.writeUTF8(CONFIG_CONSUMER_GROUP);
packer.writeString(config.getConsumerGroup(), null);

packer.writeUTF8(CONFIG_CONNECTION_STATUS);
packer.writeString(config.getConnectionStatus(), null);

packer.writeUTF8(CONFIG_ENTRIES);
Map<String, String> entries = config.getConfig();
packer.startMap(entries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
dataStreams.start()

// Report a producer and consumer config
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all", "linger.ms": "5"])
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all", "linger.ms": "5"], "connected")
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"], "connected")

// Also add a stats point so the bucket is not empty of stats
dataStreams.add(new StatsPoint(DataStreamsTags.create(null, null), 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null))
Expand Down Expand Up @@ -253,8 +253,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
dataStreams.start()

// Report the same producer config twice — both should be serialized
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected")
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected")

// Also add a stats point so the bucket has content
dataStreams.add(new StatsPoint(DataStreamsTags.create(null, null), 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null))
Expand Down Expand Up @@ -299,13 +299,15 @@ class DataStreamsWritingTest extends DDCoreSpecification {
// Collect configs in a map keyed by type
Map<String, Map<String, String>> configsByType = [:]
numConfigs.times {
assert unpacker.unpackMapHeader() == 4
assert unpacker.unpackMapHeader() == 5
assert unpacker.unpackString() == "Type"
def type = unpacker.unpackString()
assert unpacker.unpackString() == "KafkaClusterId"
unpacker.unpackString() // skip cluster id value
assert unpacker.unpackString() == "ConsumerGroup"
unpacker.unpackString() // skip consumer group value
assert unpacker.unpackString() == "ConnectionStatus"
assert unpacker.unpackString() == "connected"
assert unpacker.unpackString() == "Config"
def configSize = unpacker.unpackMapHeader()
Map<String, String> configEntries = [:]
Expand Down Expand Up @@ -372,13 +374,15 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert numConfigs == 2

numConfigs.times {
assert unpacker.unpackMapHeader() == 4
assert unpacker.unpackMapHeader() == 5
assert unpacker.unpackString() == "Type"
assert unpacker.unpackString() == "kafka_producer"
assert unpacker.unpackString() == "KafkaClusterId"
unpacker.unpackString() // skip cluster id value
assert unpacker.unpackString() == "ConsumerGroup"
unpacker.unpackString() // skip consumer group value
assert unpacker.unpackString() == "ConnectionStatus"
assert unpacker.unpackString() == "connected"
assert unpacker.unpackString() == "Config"
def configSize = unpacker.unpackMapHeader()
Map<String, String> configEntries = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1079,7 +1079,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"])
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1124,8 +1124,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
dataStreams.start()
def config1 = ["bootstrap.servers": "localhost:9092", "acks": "all"]
def config2 = ["bootstrap.servers": "localhost:9092", "acks": "all"]
dataStreams.reportKafkaConfig("kafka_producer", "", "", config1)
dataStreams.reportKafkaConfig("kafka_producer", "", "", config2)
dataStreams.reportKafkaConfig("kafka_producer", "", "", config1, "connected")
dataStreams.reportKafkaConfig("kafka_producer", "", "", config2, "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1166,7 +1166,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
def config = ["bootstrap.servers": "localhost:9092", "acks": "all"]
dataStreams.reportKafkaConfig("kafka_producer", "", "", config)
dataStreams.reportKafkaConfig("kafka_producer", "", "", config, "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand All @@ -1183,7 +1183,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {

when: "reporting the same config again in a new bucket"
payloadWriter.buckets.clear()
dataStreams.reportKafkaConfig("kafka_producer", "", "", config)
dataStreams.reportKafkaConfig("kafka_producer", "", "", config, "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1225,8 +1225,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when: "reporting producer and consumer configs"
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"])
dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", ["bootstrap.servers": "localhost:9092", "group.id": "my-group"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected")
dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", ["bootstrap.servers": "localhost:9092", "group.id": "my-group"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1273,8 +1273,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when: "reporting two producer configs with different settings"
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093", "acks": "1"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected")
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093", "acks": "1"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1313,7 +1313,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
dataStreams.start()
def tg = DataStreamsTags.create("testType", null, "testTopic", "testGroup", null)
dataStreams.add(new StatsPoint(tg, 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null))
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1362,7 +1362,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -1401,7 +1401,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.start()
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"])
dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected")
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l)
dataStreams.close()

Expand All @@ -1426,11 +1426,11 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {

def "KafkaConfigReport equals and hashCode work correctly"() {
given:
def config1 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, null)
def config2 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 2000L, null)
def config3 = new KafkaConfigReport("kafka_consumer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, null)
def config4 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093"], 1000L, null)
def config5 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, "other-service")
def config1 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, null)
def config2 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 2000L, null)
def config3 = new KafkaConfigReport("kafka_consumer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, null)
def config4 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093"], "connected", 1000L, null)
def config5 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, "other-service")

expect:
// Reflexive
Expand Down Expand Up @@ -1465,8 +1465,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
def "StatsBucket stores Kafka configs"() {
given:
def bucket = new StatsBucket(1000L, 10000L)
def config1 = new KafkaConfigReport("kafka_producer", "", "", ["acks": "all"], 1000L, null)
def config2 = new KafkaConfigReport("kafka_consumer", "", "test", ["group.id": "test"], 2000L, null)
def config1 = new KafkaConfigReport("kafka_producer", "", "", ["acks": "all"], "connected", 1000L, null)
def config2 = new KafkaConfigReport("kafka_consumer", "", "test", ["group.id": "test"], "connected", 2000L, null)

when:
bucket.addKafkaConfig(config1)
Expand Down
Loading
Loading