diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 20cfb9e6..abdcdd8f 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -123,21 +123,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags=broker_tags, ) - # Record broker buffer metrics - if broker_stats.get("outbuf_cnt"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_outbuf_requests", - broker_stats["outbuf_cnt"], - tags=broker_tags, - ) - - if broker_stats.get("outbuf_msg_cnt"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_outbuf_messages", - broker_stats["outbuf_msg_cnt"], - tags=broker_tags, - ) - # Record broker connection metrics if broker_stats.get("connects"): metrics.gauge( @@ -146,13 +131,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags=broker_tags, ) - if broker_stats.get("disconnects"): - metrics.gauge( - "arroyo.producer.librdkafka.broker_disconnects", - broker_stats["disconnects"], - tags=broker_tags, - ) - # Record broker transmission error metrics if broker_stats.get("txerrs"): metrics.gauge( @@ -183,27 +161,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No tags={"producer_name": producer_name_tag}, ) - if stats.get("msg_size"): - metrics.gauge( - "arroyo.producer.librdkafka.message_size", - stats["msg_size"], - tags={"producer_name": producer_name_tag}, - ) - - if stats.get("msg_size_max"): - metrics.gauge( - "arroyo.producer.librdkafka.message_size_max", - stats["msg_size_max"], - tags={"producer_name": producer_name_tag}, - ) - - if stats.get("txmsgs"): - metrics.gauge( - "arroyo.producer.librdkafka.txmsgs", - stats["txmsgs"], - tags={"producer_name": producer_name_tag}, - ) - def build_kafka_producer_configuration( default_config: Mapping[str, Any], diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index c9508cd6..827265b9 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -137,33 +137,15 @@ # Gauge: Maximum producer message count from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count_max", - # Gauge: Producer message size from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.message_size", - # Gauge: Maximum producer message size from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.message_size_max", - # Gauge: Total number of messages transmitted from librdkafka statistics - # Tagged by producer_name - "arroyo.producer.librdkafka.txmsgs", # Gauge: Total number of transmission requests from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_tx", # Gauge: Total number of bytes transmitted from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_txbytes", - # Gauge: Number of requests awaiting transmission to broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_outbuf_requests", - # Gauge: Number of messages awaiting transmission to broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_outbuf_messages", # Gauge: Number of connection attempts to broker from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_connects", - # Gauge: Number of disconnections from broker from librdkafka statistics - # Tagged by broker_id, producer_name - "arroyo.producer.librdkafka.broker_disconnects", # Gauge: Total number of transmission errors from librdkafka statistics # Tagged by broker_id, producer_name "arroyo.producer.librdkafka.broker_txerrs", diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index c14af64e..75183984 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -181,12 +181,6 @@ impl ClientContext for CustomContext { "topic" => topic_name, "partition" => partition_num.to_string() ); - gauge!( - "arroyo.consumer.librdkafka.fetch_queue_size", - partition.fetchq_size as u64, - "topic" => topic_name, - "partition" => partition_num.to_string() - ); } } } diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index 4dae0fbd..e3e80185 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -109,21 +109,6 @@ impl ClientContext for ProducerContext { "producer_name" => producer_name ); - // Record broker buffer metrics - gauge!( - "arroyo.producer.librdkafka.broker_outbuf_requests", - broker_stats.outbuf_cnt as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.broker_outbuf_messages", - broker_stats.outbuf_msg_cnt as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - // Record broker connection metrics (if available) if let Some(connects) = broker_stats.connects { gauge!( @@ -134,15 +119,6 @@ impl ClientContext for ProducerContext { ); } - if let Some(disconnects) = broker_stats.disconnects { - gauge!( - "arroyo.producer.librdkafka.broker_disconnects", - disconnects as i64, - "broker_id" => broker_id_str.clone(), - "producer_name" => producer_name - ); - } - // Record broker transmission error metrics gauge!( "arroyo.producer.librdkafka.broker_txerrs", @@ -171,24 +147,6 @@ impl ClientContext for ProducerContext { stats.msg_max as i64, "producer_name" => producer_name ); - - gauge!( - "arroyo.producer.librdkafka.message_size", - stats.msg_size as i64, - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.message_size_max", - stats.msg_size_max as i64, - "producer_name" => producer_name - ); - - gauge!( - "arroyo.producer.librdkafka.txmsgs", - stats.txmsgs as i64, - "producer_name" => producer_name - ); } }