fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616
Open
johannbotha wants to merge 10 commits intomainfrom
Open
fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616johannbotha wants to merge 10 commits intomainfrom
johannbotha wants to merge 10 commits intomainfrom
Conversation
Codeowners resolved as |
This comment has been minimized.
This comment has been minimized.
Author
wantsui
approved these changes
Feb 24, 2026
Collaborator
wantsui
left a comment
There was a problem hiding this comment.
Approving from an IDM perspective because I don't see anything here that affects spans.
robcarlan-datadog
approved these changes
Feb 24, 2026
…king Checkpoints already included cluster_id in edge tags, but the offset commit and produce offset backlog paths did not. This caused DSM to be unable to correctly attribute backlog data to specific Kafka clusters when multiple clusters share topic names. Changes: - Add cluster_id field to PartitionKey and ConsumerPartitionKey - Pass cluster_id through traced_commit, dsm_kafka_message_commit, and the auto-commit path in dsm_kafka_message_consume - Include kafka_cluster_id tag in serialized backlog entries - Update all tests for the new key structure Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Normalize cluster_id to "" before setting on core (prevents None propagation) - Use pytest.skip() instead of silent if-guard in backlog cluster_id test - Add comment explaining why any topic suffices for cluster_id lookup - Add TODO comment in aiokafka noting cluster_id gap for follow-up Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Flaky tests: the serialization tests used a shared module-level processor whose _serialize_buckets() clears buckets as a side effect. When test execution order varied across Python versions, buckets were already drained. Fix by using a fresh DataStreamsProcessor per test. Also remove TODO comment in aiokafka.py per rob's review. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Address review feedback: remove unhelpful comments in traced_commit, default _dd_cluster_id to "" instead of None to eliminate scattered `or ""` guards, and return "" from _get_cluster_id on failure paths. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The failure caching test asserted `is None` but _get_cluster_id now returns "" on failure paths. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The DataStreamsProcessor extends PeriodicService which starts a background thread calling _serialize_buckets() every 10s. This drains buckets between track_kafka_* calls and assertions, causing intermittent failures. Fix by calling stop() after construction. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The test produced 2 messages but only read 1, then asserted the auto-committed offset matched exactly. With 2 messages on the partition, auto-commit can batch both before the assertion runs, giving offset 2 instead of expected 1. This was a known pre-existing flake (see #15735). Fix by producing only 1 message so there is no ambiguity about which offset gets committed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Processor tests: - Remove _serialize_buckets() calls that drain shared state and cause ordering flakes. Instead inspect bucket keys directly for cluster_id. - Add dedicated test_kafka_offset_serialization_cluster_id_tag that uses unique topic names and filters backlogs to avoid cross-test interference. - Remove invalid stop() calls on never-started processors. Auto-commit test: - Use >= instead of == for offset assertion. DSM tracks offsets at poll time while the broker commits asynchronously, so the DSM offset may advance beyond the broker's reported committed offset. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… tag match Two bugs in the previous test: 1. Substring match: "ser_topic_with" matched "topic:ser_topic_without" causing false positives. Fix: use exact equality (t == "topic:...") 2. Shared processor: _serialize_buckets() drains the shared processor's buckets, causing other tests to see empty buckets when run in random order. Fix: use a standalone DataStreamsProcessor (STOPPED state by default, no background thread). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
All three offset monitoring tests now use their own DataStreamsProcessor instance instead of the shared module-level one. This prevents cross-test contamination where entries with cluster_id="" from one test cause assertion failures in another test that expects cluster_id to be set. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
fc5f4a5 to
ea4820a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Summary
kafka_cluster_idto Kafka offset/backlog tracking for confluent-kafka in Data Streams Monitoringcluster_idin edge tags, but offset commit and produce offset backlog paths did notChanges
ddtrace/internal/datastreams/processor.py:cluster_idfield toPartitionKeyandConsumerPartitionKeyNamedTuples (with""default for backward compat)cluster_id=""parameter totrack_kafka_produce()andtrack_kafka_commit()_serialize_buckets()to conditionally emitkafka_cluster_idtag in backlog entriesddtrace/contrib/internal/kafka/patch.py:traced_commit()to fetch cluster_id (using instance cache first, then extracting topic from args) and set it viacore.set_item()ddtrace/internal/datastreams/kafka.py:dsm_kafka_message_produce()callback to passcluster_idtotrack_kafka_produce()dsm_kafka_message_consume()to passcluster_idtotrack_kafka_commit()dsm_kafka_message_commit()to readcluster_idfrom core and pass totrack_kafka_commit()Tests:
PartitionKey/ConsumerPartitionKeyreferences in all test filestest_kafka_offset_monitoring_with_cluster_id— verifies serialized backlogs includekafka_cluster_idtagtest_kafka_offset_monitoring_without_cluster_id_omits_tag— verifies tag is omitted when cluster_id is emptytest_data_streams_kafka_offset_backlog_has_cluster_id— integration test against real brokerDesign Decisions
cluster_idis part of the key (not metadata): Two Kafka clusters with the same topic name have independent offset spaces. Ifcluster_idwere metadata, last-write-wins would corrupt backlog calculations.cluster_id="": Maintains backward compatibility with aiokafka (which doesn't fetch cluster_id) and any other callers.kafka_cluster_id:<value>is consistent with Java'sDataStreamsTags.createWithPartition().Not in scope
cluster_idon itsClusterMetadataclass. Needs upstream PR or monkey-patch — separate effort.Test plan
🤖 Generated with Claude Code