Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions arroyo/backends/kafka/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
tags=broker_tags,
)

# Record broker buffer metrics
Comment thread
cursor[bot] marked this conversation as resolved.
if broker_stats.get("outbuf_cnt"):
metrics.gauge(
"arroyo.producer.librdkafka.broker_outbuf_requests",
broker_stats["outbuf_cnt"],
tags=broker_tags,
)

if broker_stats.get("outbuf_msg_cnt"):
metrics.gauge(
"arroyo.producer.librdkafka.broker_outbuf_messages",
broker_stats["outbuf_msg_cnt"],
tags=broker_tags,
)

# Record broker connection metrics
if broker_stats.get("connects"):
metrics.gauge(
Expand All @@ -146,13 +131,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
tags=broker_tags,
)

if broker_stats.get("disconnects"):
metrics.gauge(
"arroyo.producer.librdkafka.broker_disconnects",
broker_stats["disconnects"],
tags=broker_tags,
)

# Record broker transmission error metrics
if broker_stats.get("txerrs"):
metrics.gauge(
Expand Down Expand Up @@ -183,27 +161,6 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
tags={"producer_name": producer_name_tag},
)

if stats.get("msg_size"):
metrics.gauge(
"arroyo.producer.librdkafka.message_size",
stats["msg_size"],
tags={"producer_name": producer_name_tag},
)

if stats.get("msg_size_max"):
metrics.gauge(
"arroyo.producer.librdkafka.message_size_max",
stats["msg_size_max"],
tags={"producer_name": producer_name_tag},
)

if stats.get("txmsgs"):
metrics.gauge(
"arroyo.producer.librdkafka.txmsgs",
stats["txmsgs"],
tags={"producer_name": producer_name_tag},
)


def build_kafka_producer_configuration(
default_config: Mapping[str, Any],
Expand Down
18 changes: 0 additions & 18 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,33 +137,15 @@
# Gauge: Maximum producer message count from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_count_max",
# Gauge: Producer message size from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_size",
# Gauge: Maximum producer message size from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_size_max",
# Gauge: Total number of messages transmitted from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.txmsgs",
# Gauge: Total number of transmission requests from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_tx",
# Gauge: Total number of bytes transmitted from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_txbytes",
# Gauge: Number of requests awaiting transmission to broker from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_outbuf_requests",
# Gauge: Number of messages awaiting transmission to broker from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_outbuf_messages",
# Gauge: Number of connection attempts to broker from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_connects",
# Gauge: Number of disconnections from broker from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_disconnects",
# Gauge: Total number of transmission errors from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_txerrs",
Expand Down
6 changes: 0 additions & 6 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ impl<C: AssignmentCallbacks + Send + Sync> ClientContext for CustomContext<C> {
"topic" => topic_name,
"partition" => partition_num.to_string()
);
gauge!(
Comment thread
george-sentry marked this conversation as resolved.
"arroyo.consumer.librdkafka.fetch_queue_size",
partition.fetchq_size as u64,
"topic" => topic_name,
"partition" => partition_num.to_string()
);
}
}
}
Expand Down
42 changes: 0 additions & 42 deletions rust-arroyo/src/backends/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,6 @@ impl ClientContext for ProducerContext {
"producer_name" => producer_name
);

// Record broker buffer metrics
gauge!(
"arroyo.producer.librdkafka.broker_outbuf_requests",
broker_stats.outbuf_cnt as i64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);

gauge!(
"arroyo.producer.librdkafka.broker_outbuf_messages",
broker_stats.outbuf_msg_cnt as i64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);

// Record broker connection metrics (if available)
if let Some(connects) = broker_stats.connects {
gauge!(
Expand All @@ -134,15 +119,6 @@ impl ClientContext for ProducerContext {
);
}

if let Some(disconnects) = broker_stats.disconnects {
gauge!(
"arroyo.producer.librdkafka.broker_disconnects",
disconnects as i64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);
}

// Record broker transmission error metrics
gauge!(
"arroyo.producer.librdkafka.broker_txerrs",
Expand Down Expand Up @@ -171,24 +147,6 @@ impl ClientContext for ProducerContext {
stats.msg_max as i64,
"producer_name" => producer_name
);

gauge!(
"arroyo.producer.librdkafka.message_size",
stats.msg_size as i64,
"producer_name" => producer_name
);

gauge!(
"arroyo.producer.librdkafka.message_size_max",
stats.msg_size_max as i64,
"producer_name" => producer_name
);

gauge!(
"arroyo.producer.librdkafka.txmsgs",
stats.txmsgs as i64,
"producer_name" => producer_name
);
}
}

Expand Down
Loading