From bef0cb6f772c3038e5b5646b32efca95f493739b Mon Sep 17 00:00:00 2001 From: Jakob Homan Date: Mon, 27 Apr 2026 09:02:41 -0700 Subject: [PATCH 1/2] Add broker_source label to all Prometheus metrics Adds a broker_source label (from BROKER_SOURCE env var) to every metric, alongside the existing pipeline label. Enables distinguishing MSK vs WarpStream consumers in Grafana during the ingestion migration. --- millpond/config.py | 4 ++ millpond/main.py | 2 +- millpond/metrics.py | 101 +++++++++++++++++++----------------- tests/unit/test_consumer.py | 1 + 4 files changed, 58 insertions(+), 50 deletions(-) diff --git a/millpond/config.py b/millpond/config.py index c09a6a1..e5057bd 100644 --- a/millpond/config.py +++ b/millpond/config.py @@ -47,6 +47,9 @@ class Config: consume_batch_size: int stats_interval_ms: int + # Broker source label for metrics (e.g. "msk", "warpstream") + broker_source: str + # Extra librdkafka config (from KAFKA_CONSUMER_* env vars) kafka_config_overrides: tuple[tuple[str, str], ...] @@ -123,6 +126,7 @@ def load() -> Config: fetch_max_wait_ms=int(os.environ.get("FETCH_MAX_WAIT_MS", "500")), consume_batch_size=int(os.environ.get("CONSUME_BATCH_SIZE", "1000")), stats_interval_ms=int(os.environ.get("STATS_INTERVAL_MS", "5000")), + broker_source=os.environ.get("BROKER_SOURCE", ""), kafka_config_overrides=kafka_overrides, ) diff --git a/millpond/main.py b/millpond/main.py index 2c5e7c4..082d327 100644 --- a/millpond/main.py +++ b/millpond/main.py @@ -124,7 +124,7 @@ def main(): log.info("millpond starting") cfg = config.load() - metrics.init(f"{cfg.topic}-{cfg.ducklake_table}") + metrics.init(f"{cfg.topic}-{cfg.ducklake_table}", broker_source=cfg.broker_source) http = server.start() server.health.mark_started() log.info("Health server started, probes passing") diff --git a/millpond/metrics.py b/millpond/metrics.py index 3b49f41..38920a9 100644 --- a/millpond/metrics.py +++ b/millpond/metrics.py @@ -1,15 +1,16 @@ from prometheus_client import Counter, Gauge, Histogram -class _AutoPipelineLabels: - """Wrapper that auto-injects the pipeline label into .labels() calls.""" +class _AutoCommonLabels: + """Wrapper that auto-injects common labels (pipeline, broker_source) into .labels() calls.""" - def __init__(self, metric, pipeline: str): + def __init__(self, metric, pipeline: str, broker_source: str): self._metric = metric self._pipeline = pipeline + self._broker_source = broker_source def labels(self, **kwargs): - return self._metric.labels(pipeline=self._pipeline, **kwargs) + return self._metric.labels(pipeline=self._pipeline, broker_source=self._broker_source, **kwargs) # --- Raw metric definitions (with pipeline as first label) --- @@ -17,114 +18,114 @@ def labels(self, **kwargs): _records_consumed_total = Counter( "millpond_records_consumed_total", "Records polled from Kafka", - ["pipeline", "partition"], + ["pipeline", "broker_source", "partition"], ) _records_written_total = Counter( "millpond_records_written_total", "Records written to DuckLake", - ["pipeline"], + ["pipeline", "broker_source"], ) _batches_flushed_total = Counter( "millpond_batches_flushed_total", "Flush cycles completed", - ["pipeline", "trigger"], + ["pipeline", "broker_source", "trigger"], ) _records_skipped_total = Counter( "millpond_records_skipped_total", "Records skipped", - ["pipeline", "reason"], + ["pipeline", "broker_source", "reason"], ) _errors_total = Counter( "millpond_errors_total", "Errors by type", - ["pipeline", "type"], + ["pipeline", "broker_source", "type"], ) _flush_duration_seconds = Histogram( "millpond_flush_duration_seconds", "Time per DuckLake write", - ["pipeline"], + ["pipeline", "broker_source"], ) _arrow_conversion_seconds = Histogram( "millpond_arrow_conversion_seconds", "Time to convert JSON to Arrow table", - ["pipeline"], + ["pipeline", "broker_source"], ) _flush_size_bytes = Histogram( "millpond_flush_size_bytes", "Arrow bytes per flush", - ["pipeline"], + ["pipeline", "broker_source"], buckets=[1e6, 5e6, 10e6, 25e6, 50e6, 100e6, 250e6, 500e6, 1e9], ) _flush_size_records = Histogram( "millpond_flush_size_records", "Records per flush", - ["pipeline"], + ["pipeline", "broker_source"], buckets=[100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000], ) _pending_bytes = Gauge( "millpond_pending_bytes", "Current pending Arrow bytes awaiting flush", - ["pipeline"], + ["pipeline", "broker_source"], ) _buffer_fullness = Gauge( "millpond_buffer_fullness", "Ratio of pending bytes to flush size (0.0 = empty, 1.0 = flush threshold)", - ["pipeline"], + ["pipeline", "broker_source"], ) _consume_batch_size_current = Gauge( "millpond_consume_batch_size_current", "Current adaptive consume batch size", - ["pipeline"], + ["pipeline", "broker_source"], ) _consumer_lag = Gauge( "millpond_consumer_lag", "Highwater mark minus committed offset", - ["pipeline", "partition"], + ["pipeline", "broker_source", "partition"], ) _last_committed_offset = Gauge( "millpond_last_committed_offset", "Last committed offset", - ["pipeline", "partition"], + ["pipeline", "broker_source", "partition"], ) _schema_columns_added_total = Counter( "millpond_schema_columns_added_total", "Columns added via schema evolution", - ["pipeline"], + ["pipeline", "broker_source"], ) _schema_columns_widened_total = Counter( "millpond_schema_columns_widened_total", "Columns widened via schema evolution", - ["pipeline"], + ["pipeline", "broker_source"], ) # librdkafka internal stats (via statistics.interval.ms callback) _rdkafka_replyq = Gauge( "millpond_rdkafka_replyq", "Number of ops waiting for broker response", - ["pipeline"], + ["pipeline", "broker_source"], ) _rdkafka_msg_cnt = Gauge( "millpond_rdkafka_msg_cnt", "Messages in internal producer/consumer queues", - ["pipeline"], + ["pipeline", "broker_source"], ) _rdkafka_msg_size = Gauge( "millpond_rdkafka_msg_size", "Bytes in internal producer/consumer queues", - ["pipeline"], + ["pipeline", "broker_source"], ) _rdkafka_broker_rtt_avg = Gauge( "millpond_rdkafka_broker_rtt_avg_seconds", "Broker round-trip time average", - ["pipeline", "broker"], + ["pipeline", "broker_source", "broker"], ) _rdkafka_broker_rtt_p99 = Gauge( "millpond_rdkafka_broker_rtt_p99_seconds", "Broker round-trip time p99", - ["pipeline", "broker"], + ["pipeline", "broker_source", "broker"], ) # --- Public names (replaced by init() with pipeline-bound instances) --- @@ -152,8 +153,8 @@ def labels(self, **kwargs): rdkafka_broker_rtt_p99 = _rdkafka_broker_rtt_p99 -def init(pipeline: str): - """Bind all metrics to a pipeline label. Must be called once at startup.""" +def init(pipeline: str, broker_source: str = ""): + """Bind all metrics to pipeline and broker_source labels. Must be called once at startup.""" global records_consumed_total, records_written_total, batches_flushed_total global records_skipped_total, errors_total global flush_duration_seconds, arrow_conversion_seconds @@ -163,27 +164,29 @@ def init(pipeline: str): global rdkafka_replyq, rdkafka_msg_cnt, rdkafka_msg_size global rdkafka_broker_rtt_avg, rdkafka_broker_rtt_p99 - # Metrics with additional labels — wrap so .labels() auto-injects pipeline - records_consumed_total = _AutoPipelineLabels(_records_consumed_total, pipeline) - batches_flushed_total = _AutoPipelineLabels(_batches_flushed_total, pipeline) - records_skipped_total = _AutoPipelineLabels(_records_skipped_total, pipeline) - errors_total = _AutoPipelineLabels(_errors_total, pipeline) - consumer_lag = _AutoPipelineLabels(_consumer_lag, pipeline) - last_committed_offset = _AutoPipelineLabels(_last_committed_offset, pipeline) - rdkafka_broker_rtt_avg = _AutoPipelineLabels(_rdkafka_broker_rtt_avg, pipeline) - rdkafka_broker_rtt_p99 = _AutoPipelineLabels(_rdkafka_broker_rtt_p99, pipeline) + bs = broker_source + + # Metrics with additional labels — wrap so .labels() auto-injects common labels + records_consumed_total = _AutoCommonLabels(_records_consumed_total, pipeline, bs) + batches_flushed_total = _AutoCommonLabels(_batches_flushed_total, pipeline, bs) + records_skipped_total = _AutoCommonLabels(_records_skipped_total, pipeline, bs) + errors_total = _AutoCommonLabels(_errors_total, pipeline, bs) + consumer_lag = _AutoCommonLabels(_consumer_lag, pipeline, bs) + last_committed_offset = _AutoCommonLabels(_last_committed_offset, pipeline, bs) + rdkafka_broker_rtt_avg = _AutoCommonLabels(_rdkafka_broker_rtt_avg, pipeline, bs) + rdkafka_broker_rtt_p99 = _AutoCommonLabels(_rdkafka_broker_rtt_p99, pipeline, bs) # Metrics with no other labels — pre-label to get direct .inc()/.set()/.observe() - records_written_total = _records_written_total.labels(pipeline=pipeline) - flush_duration_seconds = _flush_duration_seconds.labels(pipeline=pipeline) - arrow_conversion_seconds = _arrow_conversion_seconds.labels(pipeline=pipeline) - flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline) - flush_size_records = _flush_size_records.labels(pipeline=pipeline) - pending_bytes = _pending_bytes.labels(pipeline=pipeline) - buffer_fullness = _buffer_fullness.labels(pipeline=pipeline) - consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline) - schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline) - schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline) - rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline) - rdkafka_msg_cnt = _rdkafka_msg_cnt.labels(pipeline=pipeline) - rdkafka_msg_size = _rdkafka_msg_size.labels(pipeline=pipeline) + records_written_total = _records_written_total.labels(pipeline=pipeline, broker_source=bs) + flush_duration_seconds = _flush_duration_seconds.labels(pipeline=pipeline, broker_source=bs) + arrow_conversion_seconds = _arrow_conversion_seconds.labels(pipeline=pipeline, broker_source=bs) + flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline, broker_source=bs) + flush_size_records = _flush_size_records.labels(pipeline=pipeline, broker_source=bs) + pending_bytes = _pending_bytes.labels(pipeline=pipeline, broker_source=bs) + buffer_fullness = _buffer_fullness.labels(pipeline=pipeline, broker_source=bs) + consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline, broker_source=bs) + schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline, broker_source=bs) + schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline, broker_source=bs) + rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline, broker_source=bs) + rdkafka_msg_cnt = _rdkafka_msg_cnt.labels(pipeline=pipeline, broker_source=bs) + rdkafka_msg_size = _rdkafka_msg_size.labels(pipeline=pipeline, broker_source=bs) diff --git a/tests/unit/test_consumer.py b/tests/unit/test_consumer.py index 356d983..423ea17 100644 --- a/tests/unit/test_consumer.py +++ b/tests/unit/test_consumer.py @@ -152,6 +152,7 @@ def _make_cfg(**overrides) -> Config: fetch_max_wait_ms=500, consume_batch_size=1000, stats_interval_ms=5000, + broker_source="", kafka_config_overrides=(("security.protocol", "SSL"),), ) defaults.update(overrides) From e3b7302206937f83fcd5baa2c7da728333663d64 Mon Sep 17 00:00:00 2001 From: Jakob Homan Date: Mon, 27 Apr 2026 09:09:17 -0700 Subject: [PATCH 2/2] fix: normalize BROKER_SOURCE with strip/lower to prevent label fragmentation --- millpond/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/millpond/config.py b/millpond/config.py index e5057bd..1782a92 100644 --- a/millpond/config.py +++ b/millpond/config.py @@ -126,7 +126,7 @@ def load() -> Config: fetch_max_wait_ms=int(os.environ.get("FETCH_MAX_WAIT_MS", "500")), consume_batch_size=int(os.environ.get("CONSUME_BATCH_SIZE", "1000")), stats_interval_ms=int(os.environ.get("STATS_INTERVAL_MS", "5000")), - broker_source=os.environ.get("BROKER_SOURCE", ""), + broker_source=os.environ.get("BROKER_SOURCE", "").strip().lower(), kafka_config_overrides=kafka_overrides, )