From 1c8096dbd5022a7bbfb2effd82f4466d9145b1f7 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 21 May 2026 12:18:37 -0600 Subject: [PATCH] Report Kafka client connection status in DSM payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the DSM payload only carried Kafka client configs once `Metadata.update` fired with a valid cluster ID — so clients that never authenticated or never reached a broker were silently dropped, and we couldn't compare their configs against working clients. Now every config is also reported with a `connectionStatus` field ("connected" / "failed") on the per-bucket `Configs` entry, including on `Metadata.failedUpdate`. Also expands the value-allowlist with non-secret auth selectors (`sasl.mechanism`, `ssl.protocol`, `ssl.endpoint.identification.algorithm`, etc.) so the comparison flow can surface mechanism typos without leaking credentials. tag: ai generated Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MetadataInstrumentation.java | 18 +++++++++ .../MetadataInstrumentation.java | 2 + .../MetadataFailedUpdateAdvice.java | 23 +++++++++++ .../kafka_common/KafkaConfigHelper.java | 30 +++++++++++++- .../kafka_common/PendingConfig.java | 5 ++- .../DefaultDataStreamsMonitoring.java | 7 +++- .../MsgPackDatastreamsPayloadWriter.java | 6 ++- .../datastreams/DataStreamsWritingTest.groovy | 16 +++++--- .../DefaultDataStreamsMonitoringTest.groovy | 40 +++++++++---------- .../AgentDataStreamsMonitoring.java | 9 ++++- .../api/datastreams/KafkaConfigReport.java | 11 ++++- .../NoopDataStreamsMonitoring.java | 6 ++- 12 files changed, 139 insertions(+), 34 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java index d6acfe30369..ed95968312f 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java @@ -80,6 +80,9 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("update")) .and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))), MetadataInstrumentation.class.getName() + "$MetadataUpdate22AndAfterAdvice"); + transformer.applyAdvice( + isMethod().and(named("failedUpdate")), + MetadataInstrumentation.class.getName() + "$FailedUpdateAdvice"); } public static class MetadataUpdateBefore22Advice { @@ -103,6 +106,21 @@ public static void muzzleCheck(ConsumerRecord record) { } } + public static class FailedUpdateAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.This final Metadata metadata) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata); + if (state != null) { + KafkaConfigHelper.reportPendingConfigAsFailed(state); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + record.headers(); + } + } + public static class MetadataUpdate22AndAfterAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java index 3907ad0c18c..a29b8b69f94 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java @@ -66,5 +66,7 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("update")) .and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))), packageName + ".MetadataUpdate22AndAfterAdvice"); + transformer.applyAdvice( + isMethod().and(named("failedUpdate")), packageName + ".MetadataFailedUpdateAdvice"); } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java new file mode 100644 index 00000000000..5e0b4c7ceac --- /dev/null +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java @@ -0,0 +1,23 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper; +import datadog.trace.instrumentation.kafka_common.MetadataState; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class MetadataFailedUpdateAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.This final Metadata metadata) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata); + if (state != null) { + KafkaConfigHelper.reportPendingConfigAsFailed(state); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + record.headers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java index b9c5089f70a..4d5e83d14e2 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java @@ -43,6 +43,17 @@ public class KafkaConfigHelper { "socket.connection.setup.timeout.ms", "socket.connection.setup.timeout.max.ms", "security.protocol", + // Non-secret auth selectors — values name the mechanism / algorithm in use, + // never carry credentials. Useful for spotting typos like SCRAM-SHA-256 vs -512. + "sasl.mechanism", + "sasl.kerberos.service.name", + "sasl.login.callback.handler.class", + "ssl.protocol", + "ssl.enabled.protocols", + "ssl.endpoint.identification.algorithm", + "ssl.truststore.type", + "ssl.keystore.type", + "ssl.cipher.suites", "metrics.sample.window.ms", "metrics.num.samples", "metrics.recording.level", @@ -107,13 +118,28 @@ public static void storePendingConsumerConfig( /** Called from metadata update advice when the cluster ID becomes available. */ public static void reportPendingConfig(MetadataState state, String clusterId) { + reportPendingConfig(state, clusterId, PendingConfig.STATUS_CONNECTED); + } + + /** Called from failure advice when the client cannot reach / authenticate to the cluster. */ + public static void reportPendingConfigAsFailed(MetadataState state) { + // clusterId may be unknown on auth/connect failure — emit with whatever we have (often "") + reportPendingConfig(state, state.clusterId, PendingConfig.STATUS_FAILED); + } + + private static void reportPendingConfig(MetadataState state, String clusterId, String status) { PendingConfig pending = state.takePendingConfig(); if (pending != null) { - log.debug("Received cluster ID, reporting {} config", pending.type); + log.debug("Reporting {} config with status={}", pending.type, status); if (Config.get().isDataStreamsEnabled()) { AgentTracer.get() .getDataStreamsMonitoring() - .reportKafkaConfig(pending.type, clusterId, pending.consumerGroup, pending.config); + .reportKafkaConfig( + pending.type, + clusterId != null ? clusterId : "", + pending.consumerGroup, + pending.config, + status); } } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java index e0798d00ecf..f5eb143d6fb 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java @@ -2,8 +2,11 @@ import java.util.Map; -/** Holds pending Kafka config info until the cluster ID becomes available from metadata. */ +/** Holds pending Kafka config info until the client's connection lifecycle resolves. */ public class PendingConfig { + public static final String STATUS_CONNECTED = "connected"; + public static final String STATUS_FAILED = "failed"; + public final String type; public final String consumerGroup; public final Map config; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 32db178bc1a..95146f2fe12 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -293,13 +293,18 @@ public void reportSchemaRegistryUsage( @Override public void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config) { + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus) { inbox.offer( new KafkaConfigReport( type, kafkaClusterId, consumerGroup, config, + connectionStatus, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 6b7c5ad32ea..67700d55423 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -58,6 +58,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] CONFIG_KAFKA_CLUSTER_ID = "KafkaClusterId".getBytes(ISO_8859_1); private static final byte[] CONFIG_CONSUMER_GROUP = "ConsumerGroup".getBytes(ISO_8859_1); private static final byte[] CONFIG_ENTRIES = "Config".getBytes(ISO_8859_1); + private static final byte[] CONFIG_CONNECTION_STATUS = "ConnectionStatus".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -290,7 +291,7 @@ private void writeKafkaConfigs(List configs, Writable packer) packer.writeUTF8(CONFIGS); packer.startArray(configs.size()); for (KafkaConfigReport config : configs) { - packer.startMap(4); // Type, KafkaClusterId, ConsumerGroup, Config + packer.startMap(5); // Type, KafkaClusterId, ConsumerGroup, ConnectionStatus, Config packer.writeUTF8(CONFIG_TYPE); packer.writeString(config.getType(), null); @@ -301,6 +302,9 @@ private void writeKafkaConfigs(List configs, Writable packer) packer.writeUTF8(CONFIG_CONSUMER_GROUP); packer.writeString(config.getConsumerGroup(), null); + packer.writeUTF8(CONFIG_CONNECTION_STATUS); + packer.writeString(config.getConnectionStatus(), null); + packer.writeUTF8(CONFIG_ENTRIES); Map entries = config.getConfig(); packer.startMap(entries.size()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index f47904f522b..5cf0fc9315e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -202,8 +202,8 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.start() // Report a producer and consumer config - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all", "linger.ms": "5"]) - dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all", "linger.ms": "5"], "connected") + dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"], "connected") // Also add a stats point so the bucket is not empty of stats dataStreams.add(new StatsPoint(DataStreamsTags.create(null, null), 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) @@ -253,8 +253,8 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.start() // Report the same producer config twice — both should be serialized - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"]) - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected") + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected") // Also add a stats point so the bucket has content dataStreams.add(new StatsPoint(DataStreamsTags.create(null, null), 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) @@ -299,13 +299,15 @@ class DataStreamsWritingTest extends DDCoreSpecification { // Collect configs in a map keyed by type Map> configsByType = [:] numConfigs.times { - assert unpacker.unpackMapHeader() == 4 + assert unpacker.unpackMapHeader() == 5 assert unpacker.unpackString() == "Type" def type = unpacker.unpackString() assert unpacker.unpackString() == "KafkaClusterId" unpacker.unpackString() // skip cluster id value assert unpacker.unpackString() == "ConsumerGroup" unpacker.unpackString() // skip consumer group value + assert unpacker.unpackString() == "ConnectionStatus" + assert unpacker.unpackString() == "connected" assert unpacker.unpackString() == "Config" def configSize = unpacker.unpackMapHeader() Map configEntries = [:] @@ -372,13 +374,15 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert numConfigs == 2 numConfigs.times { - assert unpacker.unpackMapHeader() == 4 + assert unpacker.unpackMapHeader() == 5 assert unpacker.unpackString() == "Type" assert unpacker.unpackString() == "kafka_producer" assert unpacker.unpackString() == "KafkaClusterId" unpacker.unpackString() // skip cluster id value assert unpacker.unpackString() == "ConsumerGroup" unpacker.unpackString() // skip consumer group value + assert unpacker.unpackString() == "ConnectionStatus" + assert unpacker.unpackString() == "connected" assert unpacker.unpackString() == "Config" def configSize = unpacker.unpackMapHeader() Map configEntries = [:] diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 217dd721d08..8a82220f849 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -1037,7 +1037,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1079,7 +1079,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"]) + dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", ["bootstrap.servers": "localhost:9092", "group.id": "test-group", "auto.offset.reset": "earliest"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1124,8 +1124,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() def config1 = ["bootstrap.servers": "localhost:9092", "acks": "all"] def config2 = ["bootstrap.servers": "localhost:9092", "acks": "all"] - dataStreams.reportKafkaConfig("kafka_producer", "", "", config1) - dataStreams.reportKafkaConfig("kafka_producer", "", "", config2) + dataStreams.reportKafkaConfig("kafka_producer", "", "", config1, "connected") + dataStreams.reportKafkaConfig("kafka_producer", "", "", config2, "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1166,7 +1166,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() def config = ["bootstrap.servers": "localhost:9092", "acks": "all"] - dataStreams.reportKafkaConfig("kafka_producer", "", "", config) + dataStreams.reportKafkaConfig("kafka_producer", "", "", config, "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1183,7 +1183,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting the same config again in a new bucket" payloadWriter.buckets.clear() - dataStreams.reportKafkaConfig("kafka_producer", "", "", config) + dataStreams.reportKafkaConfig("kafka_producer", "", "", config, "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1225,8 +1225,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting producer and consumer configs" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"]) - dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", ["bootstrap.servers": "localhost:9092", "group.id": "my-group"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected") + dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", ["bootstrap.servers": "localhost:9092", "group.id": "my-group"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1273,8 +1273,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting two producer configs with different settings" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"]) - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093", "acks": "1"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected") + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093", "acks": "1"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1313,7 +1313,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() def tg = DataStreamsTags.create("testType", null, "testTopic", "testGroup", null) dataStreams.add(new StatsPoint(tg, 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1362,7 +1362,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -1401,7 +1401,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"]) + dataStreams.reportKafkaConfig("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092"], "connected") timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.close() @@ -1426,11 +1426,11 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { def "KafkaConfigReport equals and hashCode work correctly"() { given: - def config1 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, null) - def config2 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 2000L, null) - def config3 = new KafkaConfigReport("kafka_consumer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, null) - def config4 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093"], 1000L, null) - def config5 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], 1000L, "other-service") + def config1 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, null) + def config2 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 2000L, null) + def config3 = new KafkaConfigReport("kafka_consumer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, null) + def config4 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9093"], "connected", 1000L, null) + def config5 = new KafkaConfigReport("kafka_producer", "", "", ["bootstrap.servers": "localhost:9092", "acks": "all"], "connected", 1000L, "other-service") expect: // Reflexive @@ -1465,8 +1465,8 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { def "StatsBucket stores Kafka configs"() { given: def bucket = new StatsBucket(1000L, 10000L) - def config1 = new KafkaConfigReport("kafka_producer", "", "", ["acks": "all"], 1000L, null) - def config2 = new KafkaConfigReport("kafka_consumer", "", "test", ["group.id": "test"], 2000L, null) + def config1 = new KafkaConfigReport("kafka_producer", "", "", ["acks": "all"], "connected", 1000L, null) + def config2 = new KafkaConfigReport("kafka_consumer", "", "test", ["group.id": "test"], "connected", 2000L, null) when: bucket.addKafkaConfig(config1) diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index 42654dfda7c..5161e64ac5a 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -18,9 +18,16 @@ public interface AgentDataStreamsMonitoring * @param kafkaClusterId the Kafka cluster identifier, or empty string if not yet known * @param consumerGroup the consumer group name, or empty string for producers * @param config the configuration key-value pairs + * @param connectionStatus "connected" if the client successfully fetched metadata from a broker, + * "failed" if metadata fetch failed (e.g. auth error, broker unreachable). Empty string if + * unknown. */ void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config); + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus); /** * Tracks Schema Registry usage for Data Streams Monitoring. diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java b/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java index d940847e22f..c76d9dd4396 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java @@ -12,6 +12,7 @@ public class KafkaConfigReport implements InboxItem { private final String kafkaClusterId; private final String consumerGroup; private final Map config; + private final String connectionStatus; // "connected" or "failed" private final long timestampNanos; private final String serviceNameOverride; @@ -20,12 +21,14 @@ public KafkaConfigReport( String kafkaClusterId, String consumerGroup, Map config, + String connectionStatus, long timestampNanos, String serviceNameOverride) { this.type = type; this.kafkaClusterId = kafkaClusterId != null ? kafkaClusterId : ""; this.consumerGroup = consumerGroup != null ? consumerGroup : ""; this.config = config; + this.connectionStatus = connectionStatus != null ? connectionStatus : ""; this.timestampNanos = timestampNanos; this.serviceNameOverride = serviceNameOverride; } @@ -46,6 +49,10 @@ public Map getConfig() { return config; } + public String getConnectionStatus() { + return connectionStatus; + } + public long getTimestampNanos() { return timestampNanos; } @@ -62,7 +69,8 @@ public boolean equals(Object o) { return Objects.equals(type, that.type) && Objects.equals(kafkaClusterId, that.kafkaClusterId) && Objects.equals(consumerGroup, that.consumerGroup) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(connectionStatus, that.connectionStatus); } @Override @@ -71,6 +79,7 @@ public int hashCode() { result = 31 * result + (kafkaClusterId != null ? kafkaClusterId.hashCode() : 0); result = 31 * result + (consumerGroup != null ? consumerGroup.hashCode() : 0); result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (connectionStatus != null ? connectionStatus.hashCode() : 0); return result; } } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index 09bd3ee1e3a..c9657c60271 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -68,7 +68,11 @@ public void trackTransaction( @Override public void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config) {} + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus) {} @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {}