Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions millpond/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class Config:
consume_batch_size: int
stats_interval_ms: int

# Broker source label for metrics (e.g. "msk", "warpstream")
broker_source: str

# Extra librdkafka config (from KAFKA_CONSUMER_* env vars)
kafka_config_overrides: tuple[tuple[str, str], ...]

Expand Down Expand Up @@ -123,6 +126,7 @@ def load() -> Config:
fetch_max_wait_ms=int(os.environ.get("FETCH_MAX_WAIT_MS", "500")),
consume_batch_size=int(os.environ.get("CONSUME_BATCH_SIZE", "1000")),
stats_interval_ms=int(os.environ.get("STATS_INTERVAL_MS", "5000")),
broker_source=os.environ.get("BROKER_SOURCE", "").strip().lower(),
kafka_config_overrides=kafka_overrides,
)

Expand Down
2 changes: 1 addition & 1 deletion millpond/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def main():
log.info("millpond starting")

cfg = config.load()
metrics.init(f"{cfg.topic}-{cfg.ducklake_table}")
metrics.init(f"{cfg.topic}-{cfg.ducklake_table}", broker_source=cfg.broker_source)
http = server.start()
server.health.mark_started()
log.info("Health server started, probes passing")
Expand Down
101 changes: 52 additions & 49 deletions millpond/metrics.py
Original file line number Diff line number Diff line change
@@ -1,130 +1,131 @@
from prometheus_client import Counter, Gauge, Histogram


class _AutoPipelineLabels:
"""Wrapper that auto-injects the pipeline label into .labels() calls."""
class _AutoCommonLabels:
"""Wrapper that auto-injects common labels (pipeline, broker_source) into .labels() calls."""

def __init__(self, metric, pipeline: str):
def __init__(self, metric, pipeline: str, broker_source: str):
self._metric = metric
self._pipeline = pipeline
self._broker_source = broker_source

def labels(self, **kwargs):
return self._metric.labels(pipeline=self._pipeline, **kwargs)
return self._metric.labels(pipeline=self._pipeline, broker_source=self._broker_source, **kwargs)


# --- Raw metric definitions (with pipeline as first label) ---

_records_consumed_total = Counter(
"millpond_records_consumed_total",
"Records polled from Kafka",
["pipeline", "partition"],
["pipeline", "broker_source", "partition"],
)
_records_written_total = Counter(
"millpond_records_written_total",
"Records written to DuckLake",
["pipeline"],
["pipeline", "broker_source"],
)
_batches_flushed_total = Counter(
"millpond_batches_flushed_total",
"Flush cycles completed",
["pipeline", "trigger"],
["pipeline", "broker_source", "trigger"],
)
_records_skipped_total = Counter(
"millpond_records_skipped_total",
"Records skipped",
["pipeline", "reason"],
["pipeline", "broker_source", "reason"],
)
_errors_total = Counter(
"millpond_errors_total",
"Errors by type",
["pipeline", "type"],
["pipeline", "broker_source", "type"],
)

_flush_duration_seconds = Histogram(
"millpond_flush_duration_seconds",
"Time per DuckLake write",
["pipeline"],
["pipeline", "broker_source"],
)
_arrow_conversion_seconds = Histogram(
"millpond_arrow_conversion_seconds",
"Time to convert JSON to Arrow table",
["pipeline"],
["pipeline", "broker_source"],
)
_flush_size_bytes = Histogram(
"millpond_flush_size_bytes",
"Arrow bytes per flush",
["pipeline"],
["pipeline", "broker_source"],
buckets=[1e6, 5e6, 10e6, 25e6, 50e6, 100e6, 250e6, 500e6, 1e9],
)
_flush_size_records = Histogram(
"millpond_flush_size_records",
"Records per flush",
["pipeline"],
["pipeline", "broker_source"],
buckets=[100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000],
)

_pending_bytes = Gauge(
"millpond_pending_bytes",
"Current pending Arrow bytes awaiting flush",
["pipeline"],
["pipeline", "broker_source"],
)
_buffer_fullness = Gauge(
"millpond_buffer_fullness",
"Ratio of pending bytes to flush size (0.0 = empty, 1.0 = flush threshold)",
["pipeline"],
["pipeline", "broker_source"],
)
_consume_batch_size_current = Gauge(
"millpond_consume_batch_size_current",
"Current adaptive consume batch size",
["pipeline"],
["pipeline", "broker_source"],
)
_consumer_lag = Gauge(
"millpond_consumer_lag",
"Highwater mark minus committed offset",
["pipeline", "partition"],
["pipeline", "broker_source", "partition"],
)
_last_committed_offset = Gauge(
"millpond_last_committed_offset",
"Last committed offset",
["pipeline", "partition"],
["pipeline", "broker_source", "partition"],
)

_schema_columns_added_total = Counter(
"millpond_schema_columns_added_total",
"Columns added via schema evolution",
["pipeline"],
["pipeline", "broker_source"],
)
_schema_columns_widened_total = Counter(
"millpond_schema_columns_widened_total",
"Columns widened via schema evolution",
["pipeline"],
["pipeline", "broker_source"],
)

# librdkafka internal stats (via statistics.interval.ms callback)
_rdkafka_replyq = Gauge(
"millpond_rdkafka_replyq",
"Number of ops waiting for broker response",
["pipeline"],
["pipeline", "broker_source"],
)
_rdkafka_msg_cnt = Gauge(
"millpond_rdkafka_msg_cnt",
"Messages in internal producer/consumer queues",
["pipeline"],
["pipeline", "broker_source"],
)
_rdkafka_msg_size = Gauge(
"millpond_rdkafka_msg_size",
"Bytes in internal producer/consumer queues",
["pipeline"],
["pipeline", "broker_source"],
)
_rdkafka_broker_rtt_avg = Gauge(
"millpond_rdkafka_broker_rtt_avg_seconds",
"Broker round-trip time average",
["pipeline", "broker"],
["pipeline", "broker_source", "broker"],
)
_rdkafka_broker_rtt_p99 = Gauge(
"millpond_rdkafka_broker_rtt_p99_seconds",
"Broker round-trip time p99",
["pipeline", "broker"],
["pipeline", "broker_source", "broker"],
)

# --- Public names (replaced by init() with pipeline-bound instances) ---
Expand Down Expand Up @@ -152,8 +153,8 @@ def labels(self, **kwargs):
rdkafka_broker_rtt_p99 = _rdkafka_broker_rtt_p99


def init(pipeline: str):
"""Bind all metrics to a pipeline label. Must be called once at startup."""
def init(pipeline: str, broker_source: str = ""):
"""Bind all metrics to pipeline and broker_source labels. Must be called once at startup."""
global records_consumed_total, records_written_total, batches_flushed_total
global records_skipped_total, errors_total
global flush_duration_seconds, arrow_conversion_seconds
Expand All @@ -163,27 +164,29 @@ def init(pipeline: str):
global rdkafka_replyq, rdkafka_msg_cnt, rdkafka_msg_size
global rdkafka_broker_rtt_avg, rdkafka_broker_rtt_p99

# Metrics with additional labels — wrap so .labels() auto-injects pipeline
records_consumed_total = _AutoPipelineLabels(_records_consumed_total, pipeline)
batches_flushed_total = _AutoPipelineLabels(_batches_flushed_total, pipeline)
records_skipped_total = _AutoPipelineLabels(_records_skipped_total, pipeline)
errors_total = _AutoPipelineLabels(_errors_total, pipeline)
consumer_lag = _AutoPipelineLabels(_consumer_lag, pipeline)
last_committed_offset = _AutoPipelineLabels(_last_committed_offset, pipeline)
rdkafka_broker_rtt_avg = _AutoPipelineLabels(_rdkafka_broker_rtt_avg, pipeline)
rdkafka_broker_rtt_p99 = _AutoPipelineLabels(_rdkafka_broker_rtt_p99, pipeline)
bs = broker_source

# Metrics with additional labels — wrap so .labels() auto-injects common labels
records_consumed_total = _AutoCommonLabels(_records_consumed_total, pipeline, bs)
batches_flushed_total = _AutoCommonLabels(_batches_flushed_total, pipeline, bs)
records_skipped_total = _AutoCommonLabels(_records_skipped_total, pipeline, bs)
errors_total = _AutoCommonLabels(_errors_total, pipeline, bs)
consumer_lag = _AutoCommonLabels(_consumer_lag, pipeline, bs)
last_committed_offset = _AutoCommonLabels(_last_committed_offset, pipeline, bs)
rdkafka_broker_rtt_avg = _AutoCommonLabels(_rdkafka_broker_rtt_avg, pipeline, bs)
rdkafka_broker_rtt_p99 = _AutoCommonLabels(_rdkafka_broker_rtt_p99, pipeline, bs)

# Metrics with no other labels — pre-label to get direct .inc()/.set()/.observe()
records_written_total = _records_written_total.labels(pipeline=pipeline)
flush_duration_seconds = _flush_duration_seconds.labels(pipeline=pipeline)
arrow_conversion_seconds = _arrow_conversion_seconds.labels(pipeline=pipeline)
flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline)
flush_size_records = _flush_size_records.labels(pipeline=pipeline)
pending_bytes = _pending_bytes.labels(pipeline=pipeline)
buffer_fullness = _buffer_fullness.labels(pipeline=pipeline)
consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline)
schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline)
schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline)
rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline)
rdkafka_msg_cnt = _rdkafka_msg_cnt.labels(pipeline=pipeline)
rdkafka_msg_size = _rdkafka_msg_size.labels(pipeline=pipeline)
records_written_total = _records_written_total.labels(pipeline=pipeline, broker_source=bs)
flush_duration_seconds = _flush_duration_seconds.labels(pipeline=pipeline, broker_source=bs)
arrow_conversion_seconds = _arrow_conversion_seconds.labels(pipeline=pipeline, broker_source=bs)
flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline, broker_source=bs)
flush_size_records = _flush_size_records.labels(pipeline=pipeline, broker_source=bs)
pending_bytes = _pending_bytes.labels(pipeline=pipeline, broker_source=bs)
buffer_fullness = _buffer_fullness.labels(pipeline=pipeline, broker_source=bs)
consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline, broker_source=bs)
schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline, broker_source=bs)
schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline, broker_source=bs)
rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline, broker_source=bs)
rdkafka_msg_cnt = _rdkafka_msg_cnt.labels(pipeline=pipeline, broker_source=bs)
rdkafka_msg_size = _rdkafka_msg_size.labels(pipeline=pipeline, broker_source=bs)
1 change: 1 addition & 0 deletions tests/unit/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _make_cfg(**overrides) -> Config:
fetch_max_wait_ms=500,
consume_batch_size=1000,
stats_interval_ms=5000,
broker_source="",
kafka_config_overrides=(("security.protocol", "SSL"),),
)
defaults.update(overrides)
Expand Down
Loading