Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions iroh-relay/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod server {
use tracing::{Instrument, debug, info, info_span};

use super::*;
use crate::server::Metrics;
pub use crate::server::QuicConfig;

pub struct QuicServer {
Expand Down Expand Up @@ -92,7 +93,10 @@ pub(crate) mod server {
/// If there is a panic during a connection, it will be propagated
/// up here. Any other errors in a connection will be logged as a
/// warning.
pub(crate) fn spawn(mut quic_config: QuicConfig) -> Result<Self, QuicSpawnError> {
pub(crate) fn spawn(
mut quic_config: QuicConfig,
metrics: Arc<Metrics>,
) -> Result<Self, QuicSpawnError> {
quic_config.server_config.alpn_protocols =
vec![crate::quic::ALPN_QUIC_ADDR_DISC.to_vec()];
let server_config = QuicServerConfig::try_from(quic_config.server_config)?;
Expand Down Expand Up @@ -129,18 +133,17 @@ pub(crate) mod server {
Some(res) = set.join_next() => {
if let Err(err) = res {
if err.is_panic() {
panic!("task panicked: {err:#?}");
panic!("quic task panicked: {err:#?}");
} else {
debug!("error accepting incoming connection: {err:#?}");
debug!("quic task cancelled: {err:#?}");
}
}
}
res = endpoint.accept() => match res {
Some(conn) => {
debug!("accepting connection");
let remote_addr = conn.remote_address();
Some(incoming) => {
let remote_addr = incoming.remote_address();
set.spawn(
handle_connection(conn).instrument(info_span!("qad-conn", %remote_addr))
handle_connection(incoming, metrics.clone()).instrument(info_span!("qad-conn", %remote_addr))
); }
None => {
debug!("endpoint closed");
Expand Down Expand Up @@ -200,23 +203,37 @@ pub(crate) mod server {
}

/// Handle the connection from the client.
async fn handle_connection(incoming: noq::Incoming) -> Result<(), ConnectionError> {
async fn handle_connection(
incoming: noq::Incoming,
metrics: Arc<Metrics>,
) -> Result<(), ConnectionError> {
metrics.qad_incoming.inc();
debug!("incoming");
let connection = match incoming.await {
Ok(conn) => conn,
Err(e) => {
debug!("establishing failed: {e:#}");
metrics.qad_incoming_error.inc();
return Err(e);
}
};
metrics.qad_connections.inc();
debug!("established");
// wait for the client to close the connection
let connection_err = connection.closed().await;
metrics.qad_connections_closed.inc();
match connection_err {
noq::ConnectionError::ApplicationClosed(ApplicationClose { error_code, .. })
if error_code == QUIC_ADDR_DISC_CLOSE_CODE =>
{
debug!("peer disconnected");
Ok(())
}
_ => Err(connection_err),
_ => {
debug!("peer disconnected with {connection_err:#}");
metrics.qad_connections_errored.inc();
Err(connection_err)
}
}
}
}
Expand Down Expand Up @@ -371,10 +388,13 @@ mod tests {
// create a server config with self signed certificates
let (_, server_config) = super::super::server::testing::self_signed_tls_certs_and_config();
let bind_addr = SocketAddr::new(host.into(), 0);
let quic_server = QuicServer::spawn(QuicConfig {
server_config,
bind_addr,
})?;
let quic_server = QuicServer::spawn(
QuicConfig {
server_config,
bind_addr,
},
Default::default(),
)?;

// create a client-side endpoint
let client_endpoint =
Expand Down
5 changes: 4 additions & 1 deletion iroh-relay/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ impl Server {
let quic_server = match config.quic {
Some(quic_config) => {
debug!("Starting QUIC server {}", quic_config.bind_addr);
Some(QuicServer::spawn(quic_config).map_err(|err| e!(SpawnError::QuicSpawn, err))?)
Some(
QuicServer::spawn(quic_config, metrics.server.clone())
.map_err(|err| e!(SpawnError::QuicSpawn, err))?,
)
}
None => None,
};
Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ where
}

self.clients
.unregister(self.connection_id, self.endpoint_id);
.unregister(self.connection_id, self.endpoint_id, &self.metrics);
self.metrics.disconnects.inc();
}

Expand Down
12 changes: 10 additions & 2 deletions iroh-relay/src/server/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Clients {
let connection_id = self.get_connection_id();
trace!(remote_endpoint = %endpoint_id.fmt_short(), "registering client");

let client = Client::new(client_config, connection_id, self, metrics);
let client = Client::new(client_config, connection_id, self, metrics.clone());
match self.0.clients.entry(endpoint_id) {
dashmap::Entry::Occupied(mut entry) => {
let state = entry.get_mut();
Expand All @@ -91,6 +91,7 @@ impl Clients {
.try_send_health("Another endpoint connected with the same endpoint id. No more messages will be received".to_string())
.ok();
state.inactive.push(old_client);
metrics.clients_inactive_added.inc();
}
dashmap::Entry::Vacant(entry) => {
entry.insert(ClientState {
Expand All @@ -110,7 +111,12 @@ impl Clients {
/// peer is gone from the network.
///
/// Must be passed a matching connection_id.
pub(super) fn unregister(&self, connection_id: u64, endpoint_id: EndpointId) {
pub(super) fn unregister(
&self,
connection_id: u64,
endpoint_id: EndpointId,
metrics: &Metrics,
) {
trace!(
endpoint_id = %endpoint_id.fmt_short(),
connection_id, "unregistering client"
Expand All @@ -122,6 +128,7 @@ impl Clients {
if state.active.connection_id() == connection_id {
// The unregistering client is the currently active client
if let Some(last_inactive_client) = state.inactive.pop() {
metrics.clients_inactive_removed.inc();
// There is an inactive client, promote to active again.
state.active = last_inactive_client;
// Don't remove the entry from client map.
Expand All @@ -137,6 +144,7 @@ impl Clients {
state
.inactive
.retain(|client| client.connection_id() != connection_id);
metrics.clients_inactive_removed.inc();
// Active client is unmodified: keep entry in map.
false
}
Expand Down
5 changes: 5 additions & 0 deletions iroh-relay/src/server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,8 @@ impl RelayService {
tls_config: Option<TlsConfig>,
establish_timeout: Duration,
) {
let metrics = self.0.metrics.clone();
metrics.http_connections.inc();
// We create a notification token to be triggered once the connection is fully established
// and passed to the relay server.
let on_establish = Arc::new(Notify::new());
Expand Down Expand Up @@ -974,6 +976,8 @@ impl RelayService {
.map_err(|_elapsed| e!(ServeConnectionError::EstablishTimeout))
.flatten();

metrics.http_connections_closed.inc();

if let Err(error) = res {
match error {
ServeConnectionError::ManualAccept { source, .. }
Expand All @@ -991,6 +995,7 @@ impl RelayService {
debug!(reason=?source, "peer disconnected");
}
_ => {
metrics.http_connections_errored.inc();
error!(?error, "failed to handle connection");
}
}
Expand Down
58 changes: 55 additions & 3 deletions iroh-relay/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,63 @@ pub struct Metrics {

/// Number of unique client keys per day
pub unique_client_keys: Counter,
// TODO: enable when we can have multiple connections for one endpoint id
// pub duplicate_client_keys: Counter,
// pub duplicate_client_conns: Counter,

/// Number of times a client was moved into the inactive state.
///
/// A client becomes inactive when a new client connects with the same endpoint id. An inactive
/// client can still send messages, but won't receive anything. If the currently-active client
/// disconnects, and if there are inactive clients, the most-recent inactive client becomes
/// active again.
///
/// The number of inactive clients at any time is `clients_inactive_added` - `clients_inactive_removed`.
pub clients_inactive_added: Counter,

/// Number of times a client was removed from the inactive state.
///
/// This is increased whenever a client disconnects while being inactive, or if a client is upgraded to be
/// active again (happens only when the currently-active client for that endpoint id disconnects).
///
/// See [`Self::clients_inactive_added`] for details on when a client becomes inactive.
pub clients_inactive_removed: Counter,

// TODO: only important stat that we cannot track right now
// pub average_queue_duration:
//
/// Number of incoming QAD connections.
///
/// After completion, each is counted in either `qad_incoming_error` or `qad_connections`.
///
/// Thus the number of inflight incomings is `qad_incoming` - `qad_incoming_error` - `qad_connections`.
pub qad_incoming: Counter,

/// Number of QAD QUIC connections that aborted before completing the handshake.
pub qad_incoming_error: Counter,

/// Number of accepted QAD QUIC connections.
///
/// The number of active connections is `qad_connections` - `qad_connections_closed`.
pub qad_connections: Counter,

/// Number of QAD QUIC connections that disconnected after being accepted.
pub qad_connections_closed: Counter,

/// Number of QAD QUIC connections that disconnected after being accepted, with an error.
///
/// The number is *included* in `qad_connections_closed` (not in addition to).
pub qad_connections_errored: Counter,

/// Number of accepted HTTP(S) connections.
///
/// The number of active connections at any time is `http_connections` - `http_connections_closed`
pub http_connections: Counter,

/// Number of terminated HTTP(S) connections.
pub http_connections_closed: Counter,

/// Number of HTTP(S) connections that terminated with an error.
///
/// The number is *included* in `http_connections_closed` (not in addition to).
pub http_connections_errored: Counter,
}

/// All metrics tracked in the relay server.
Expand Down
Loading