From 8f52734bf692103cc4fc8d9f359d5920f8833756 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 24 Mar 2026 15:45:18 -0700 Subject: [PATCH 1/6] Remove `consumer.librdkafka.*` Metrics --- arroyo/backends/kafka/configuration.py | 6 ++---- rust-arroyo/src/backends/kafka/mod.rs | 23 ++--------------------- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 20cfb9e6..903fdf04 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -44,10 +44,8 @@ def build_kafka_configuration( def stats_callback(stats_json: str) -> None: - stats = json.loads(stats_json) - get_metrics().gauge( - "arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0) - ) + # We can keep this for the future + pass def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> None: diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index c14af64e..e1db4bff 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -168,27 +168,8 @@ impl ClientContext for CustomContext { }) } - fn stats(&self, stats: Statistics) { - gauge!( - "arroyo.consumer.librdkafka.total_queue_size", - stats.replyq as u64, - ); - for (topic_name, topic) in stats.topics.iter() { - for (partition_num, partition) in topic.partitions.iter() { - gauge!( - "arroyo.consumer.librdkafka.fetch_queue_count", - partition.fetchq_cnt as u64, - "topic" => topic_name, - "partition" => partition_num.to_string() - ); - gauge!( - "arroyo.consumer.librdkafka.fetch_queue_size", - partition.fetchq_size as u64, - "topic" => topic_name, - "partition" => partition_num.to_string() - ); - } - } + fn stats(&self, _stats: Statistics) { + // Keep to avoid logging all statistics } } From 23399376a77d01f2ea71b6aeceb0da3d3c947c92 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Sat, 28 Mar 2026 13:48:02 -0700 Subject: [PATCH 2/6] Remove Unused Arroyo Producer Broker Metrics --- arroyo/backends/kafka/configuration.py | 22 -------------------- rust-arroyo/src/backends/kafka/producer.rs | 24 ---------------------- 2 files changed, 46 deletions(-) diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 903fdf04..256620aa 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -121,21 +121,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags=broker_tags, ) - # Record broker buffer metrics - if broker_stats.get("outbuf_cnt"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_outbuf_requests", - broker_stats["outbuf_cnt"], - tags=broker_tags, - ) - - if broker_stats.get("outbuf_msg_cnt"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_outbuf_messages", - broker_stats["outbuf_msg_cnt"], - tags=broker_tags, - ) - # Record broker connection metrics if broker_stats.get("connects"): metrics.gauge( @@ -144,13 +129,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags=broker_tags, ) - if broker_stats.get("disconnects"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_disconnects", - broker_stats["disconnects"], - tags=broker_tags, - ) - # Record broker transmission error metrics if broker_stats.get("txerrs"): metrics.gauge( diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index 4dae0fbd..db83baad 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -109,21 +109,6 @@ impl ClientContext for ProducerContext { "producer_name" => producer_name ); - // Record broker buffer metrics - gauge!( - "arroyo.producer.librdkafka.broker_outbuf_requests", - broker_stats.outbuf_cnt as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.broker_outbuf_messages", - broker_stats.outbuf_msg_cnt as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - // Record broker connection metrics (if available) if let Some(connects) = broker_stats.connects { gauge!( @@ -134,15 +119,6 @@ impl ClientContext for ProducerContext { ); } - if let Some(disconnects) = broker_stats.disconnects { - gauge!( - "arroyo.producer.librdkafka.broker_disconnects", - disconnects as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - } - // Record broker transmission error metrics gauge!( "arroyo.producer.librdkafka.broker_txerrs", From bc42914a6ac9bfc077eb0da29eab00840bbf9c45 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Sat, 28 Mar 2026 13:49:22 -0700 Subject: [PATCH 3/6] Remove Unused Arroyo Producer Message Metrics --- arroyo/backends/kafka/configuration.py | 21 --------------------- rust-arroyo/src/backends/kafka/producer.rs | 18 ------------------ 2 files changed, 39 deletions(-) diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 256620aa..50c2a898 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -159,27 +159,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags={"producer_name": producer_name_tag}, ) - if stats.get("msg_size"): - metrics.gauge( - "arroyo.producer.librdkafka.message_size", - stats["msg_size"], - tags={"producer_name": producer_name_tag}, - ) - - if stats.get("msg_size_max"): - metrics.gauge( - "arroyo.producer.librdkafka.message_size_max", - stats["msg_size_max"], - tags={"producer_name": producer_name_tag}, - ) - - if stats.get("txmsgs"): - metrics.gauge( - "arroyo.producer.librdkafka.txmsgs", - stats["txmsgs"], - tags={"producer_name": producer_name_tag}, - ) - def build_kafka_producer_configuration( default_config: Mapping[str, Any], diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index db83baad..e3e80185 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -147,24 +147,6 @@ impl ClientContext for ProducerContext { stats.msg_max as i64, "producer_name" => producer_name ); - - gauge!( - "arroyo.producer.librdkafka.message_size", - stats.msg_size as i64, - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.message_size_max", - stats.msg_size_max as i64, - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.txmsgs", - stats.txmsgs as i64, - "producer_name" => producer_name - ); } } From 943617049535f0e4284d4225a1f53c3505dadd9f Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 30 Mar 2026 12:32:13 -0700 Subject: [PATCH 4/6] Add Back Total Queue Size and Fetch Queue Count --- arroyo/backends/kafka/configuration.py | 6 ++++-- rust-arroyo/src/backends/kafka/mod.rs | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 50c2a898..abdcdd8f 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -44,8 +44,10 @@ def build_kafka_configuration( def stats_callback(stats_json: str) -> None: - # We can keep this for the future - pass + stats = json.loads(stats_json) + get_metrics().gauge( + "arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0) + ) def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> None: diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index e1db4bff..75183984 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -168,8 +168,21 @@ impl ClientContext for CustomContext { }) } - fn stats(&self, _stats: Statistics) { - // Keep to avoid logging all statistics + fn stats(&self, stats: Statistics) { + gauge!( + "arroyo.consumer.librdkafka.total_queue_size", + stats.replyq as u64, + ); + for (topic_name, topic) in stats.topics.iter() { + for (partition_num, partition) in topic.partitions.iter() { + gauge!( + "arroyo.consumer.librdkafka.fetch_queue_count", + partition.fetchq_cnt as u64, + "topic" => topic_name, + "partition" => partition_num.to_string() + ); + } + } } } From 7ea8a8c58bf2ed360adda2f4cc42d14943272481 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 30 Mar 2026 13:31:10 -0700 Subject: [PATCH 5/6] Trigger Cursor Bugbot --- rust-arroyo/src/backends/kafka/producer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index e3e80185..a704c99d 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -109,7 +109,7 @@ impl ClientContext for ProducerContext { "producer_name" => producer_name ); - // Record broker connection metrics (if available) + // Record broker connection metrics if available if let Some(connects) = broker_stats.connects { gauge!( "arroyo.producer.librdkafka.broker_connects", From 161728d21abe6f1d031342c2c94173671c422dee Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 30 Mar 2026 14:12:07 -0700 Subject: [PATCH 6/6] Remove Unused Metric Definitions --- arroyo/utils/metric_defs.py | 18 ------------------ rust-arroyo/src/backends/kafka/producer.rs | 2 +- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index c9508cd6..827265b9 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -137,33 +137,15 @@ # Gauge: Maximum producer message count from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count_max", - # Gauge: Producer message size from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.message_size", - # Gauge: Maximum producer message size from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.message_size_max", - # Gauge: Total number of messages transmitted from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.txmsgs", # Gauge: Total number of transmission requests from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_tx", # Gauge: Total number of bytes transmitted from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_txbytes", - # Gauge: Number of requests awaiting transmission to broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_outbuf_requests", - # Gauge: Number of messages awaiting transmission to broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_outbuf_messages", # Gauge: Number of connection attempts to broker from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_connects", - # Gauge: Number of disconnections from broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_disconnects", # Gauge: Total number of transmission errors from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_txerrs", diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index a704c99d..e3e80185 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -109,7 +109,7 @@ impl ClientContext for ProducerContext { "producer_name" => producer_name ); - // Record broker connection metrics if available + // Record broker connection metrics (if available) if let Some(connects) = broker_stats.connects { gauge!( "arroyo.producer.librdkafka.broker_connects",