Skip to content

Comments

fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616

Open
johannbotha wants to merge 10 commits intomainfrom
jj.botha/kafka-dsm-cluster-id-offsets
Open

fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616
johannbotha wants to merge 10 commits intomainfrom
jj.botha/kafka-dsm-cluster-id-offsets

Conversation

@johannbotha
Copy link

Summary

  • Adds kafka_cluster_id to Kafka offset/backlog tracking for confluent-kafka in Data Streams Monitoring
  • Checkpoints already included cluster_id in edge tags, but offset commit and produce offset backlog paths did not
  • This caused DSM to be unable to correctly attribute backlog data to specific Kafka clusters when customers have multiple clusters sharing topic names
  • Addresses item Instrument Elasticsearch #3 (Python confluent-kafka offsets) from the cross-tracer Kafka DSM cluster ID audit

Changes

ddtrace/internal/datastreams/processor.py:

  • Added cluster_id field to PartitionKey and ConsumerPartitionKey NamedTuples (with "" default for backward compat)
  • Added cluster_id="" parameter to track_kafka_produce() and track_kafka_commit()
  • Updated _serialize_buckets() to conditionally emit kafka_cluster_id tag in backlog entries

ddtrace/contrib/internal/kafka/patch.py:

  • Updated traced_commit() to fetch cluster_id (using instance cache first, then extracting topic from args) and set it via core.set_item()

ddtrace/internal/datastreams/kafka.py:

  • Updated dsm_kafka_message_produce() callback to pass cluster_id to track_kafka_produce()
  • Updated auto-commit path in dsm_kafka_message_consume() to pass cluster_id to track_kafka_commit()
  • Updated dsm_kafka_message_commit() to read cluster_id from core and pass to track_kafka_commit()

Tests:

  • Updated PartitionKey/ConsumerPartitionKey references in all test files
  • Added test_kafka_offset_monitoring_with_cluster_id — verifies serialized backlogs include kafka_cluster_id tag
  • Added test_kafka_offset_monitoring_without_cluster_id_omits_tag — verifies tag is omitted when cluster_id is empty
  • Added test_data_streams_kafka_offset_backlog_has_cluster_id — integration test against real broker

Design Decisions

  • cluster_id is part of the key (not metadata): Two Kafka clusters with the same topic name have independent offset spaces. If cluster_id were metadata, last-write-wins would corrupt backlog calculations.
  • Default cluster_id="": Maintains backward compatibility with aiokafka (which doesn't fetch cluster_id) and any other callers.
  • Tag format matches Java tracer: kafka_cluster_id:<value> is consistent with Java's DataStreamsTags.createWithPartition().

Not in scope

  • aiokafka cluster_id: aiokafka does not expose cluster_id on its ClusterMetadata class. Needs upstream PR or monkey-patch — separate effort.

Test plan

  • Processor unit tests pass (test_kafka_offset_monitoring, test_kafka_offset_monitoring_with_cluster_id, test_kafka_offset_monitoring_without_cluster_id_omits_tag)
  • confluent-kafka integration tests pass (offset monitoring with messages, offsets, auto-commit, backlog cluster_id)
  • aiokafka tests pass with empty cluster_id (backward compat)
  • Existing checkpoint tests still pass (no regression)

🤖 Generated with Claude Code

@cit-pr-commenter-54b7da
Copy link

cit-pr-commenter-54b7da bot commented Feb 20, 2026

Codeowners resolved as

tests/datastreams/test_processor.py                                     @DataDog/data-streams-monitoring

@datadog-datadog-prod-us1

This comment has been minimized.

@johannbotha
Copy link
Author

johannbotha commented Feb 20, 2026

Test setup
image (17)

Before

Screenshot 2026-02-20 at 2 11 46 PM

After
image

@johannbotha johannbotha marked this pull request as ready for review February 20, 2026 19:34
@johannbotha johannbotha requested review from a team as code owners February 20, 2026 19:34
Copy link
Collaborator

@wantsui wantsui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving from an IDM perspective because I don't see anything here that affects spans.

johannbotha and others added 10 commits February 24, 2026 21:42
…king

Checkpoints already included cluster_id in edge tags, but the offset
commit and produce offset backlog paths did not. This caused DSM to be
unable to correctly attribute backlog data to specific Kafka clusters
when multiple clusters share topic names.

Changes:
- Add cluster_id field to PartitionKey and ConsumerPartitionKey
- Pass cluster_id through traced_commit, dsm_kafka_message_commit,
  and the auto-commit path in dsm_kafka_message_consume
- Include kafka_cluster_id tag in serialized backlog entries
- Update all tests for the new key structure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Normalize cluster_id to "" before setting on core (prevents None propagation)
- Use pytest.skip() instead of silent if-guard in backlog cluster_id test
- Add comment explaining why any topic suffices for cluster_id lookup
- Add TODO comment in aiokafka noting cluster_id gap for follow-up

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Flaky tests: the serialization tests used a shared module-level
processor whose _serialize_buckets() clears buckets as a side effect.
When test execution order varied across Python versions, buckets were
already drained. Fix by using a fresh DataStreamsProcessor per test.

Also remove TODO comment in aiokafka.py per rob's review.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Address review feedback: remove unhelpful comments in traced_commit,
default _dd_cluster_id to "" instead of None to eliminate scattered
`or ""` guards, and return "" from _get_cluster_id on failure paths.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The failure caching test asserted `is None` but _get_cluster_id now
returns "" on failure paths.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The DataStreamsProcessor extends PeriodicService which starts a
background thread calling _serialize_buckets() every 10s. This
drains buckets between track_kafka_* calls and assertions, causing
intermittent failures. Fix by calling stop() after construction.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The test produced 2 messages but only read 1, then asserted the
auto-committed offset matched exactly. With 2 messages on the
partition, auto-commit can batch both before the assertion runs,
giving offset 2 instead of expected 1. This was a known pre-existing
flake (see #15735). Fix by producing only 1 message so there is no
ambiguity about which offset gets committed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Processor tests:
- Remove _serialize_buckets() calls that drain shared state and cause
  ordering flakes. Instead inspect bucket keys directly for cluster_id.
- Add dedicated test_kafka_offset_serialization_cluster_id_tag that
  uses unique topic names and filters backlogs to avoid cross-test
  interference.
- Remove invalid stop() calls on never-started processors.

Auto-commit test:
- Use >= instead of == for offset assertion. DSM tracks offsets at
  poll time while the broker commits asynchronously, so the DSM
  offset may advance beyond the broker's reported committed offset.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… tag match

Two bugs in the previous test:
1. Substring match: "ser_topic_with" matched "topic:ser_topic_without"
   causing false positives. Fix: use exact equality (t == "topic:...")
2. Shared processor: _serialize_buckets() drains the shared processor's
   buckets, causing other tests to see empty buckets when run in random
   order. Fix: use a standalone DataStreamsProcessor (STOPPED state by
   default, no background thread).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
All three offset monitoring tests now use their own DataStreamsProcessor
instance instead of the shared module-level one. This prevents cross-test
contamination where entries with cluster_id="" from one test cause
assertion failures in another test that expects cluster_id to be set.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@johannbotha johannbotha force-pushed the jj.botha/kafka-dsm-cluster-id-offsets branch from fc5f4a5 to ea4820a Compare February 25, 2026 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants