diff --git a/src/broadcast/event_broadcast.rs b/src/broadcast/event_broadcast.rs index 0efae62..3cb0b1a 100644 --- a/src/broadcast/event_broadcast.rs +++ b/src/broadcast/event_broadcast.rs @@ -77,7 +77,7 @@ impl BroadcastChannel { pub async fn send_event(&self, notification: Notification, to_user: &Uuid) { let lock = self.channel.read().await; if let Some(sender) = lock.get(to_user) { - match sender.send(notification) { + match sender.send(notification.clone()) { Ok(sc) => { info!("Successfully sent {:?} broadcast event.", sc); } @@ -86,11 +86,11 @@ impl BroadcastChannel { } } } else { - if let Err(error) = self.cache.add_notification_for_user(to_user, ¬ification).await { - error!("Failed to cache notification: {}", error); - }; - self.send_undeliverable_notifications(notification, vec![to_user.clone()]).await; + self.send_undeliverable_notifications(notification.clone(), vec![to_user.clone()]).await; } + if let Err(error) = self.cache.add_notification_for_user(to_user, ¬ification).await { + error!("Failed to cache notification: {}", error); + }; } pub async fn send_event_to_all(&self, user_ids: Vec, notification: Notification) { @@ -107,11 +107,11 @@ impl BroadcastChannel { } } } else { - if let Err(error) = self.cache.add_notification_for_user(&user_id, ¬ification).await { - error!("Failed to cache notification: {}", error); - }; not_deliverable.push(user_id); } + if let Err(error) = self.cache.add_notification_for_user(&user_id, ¬ification).await { + error!("Failed to cache notification: {}", error); + }; } if not_deliverable.len() > 0 { self.send_undeliverable_notifications(notification, not_deliverable).await; @@ -134,6 +134,7 @@ impl BroadcastChannel { } pub async fn unsubscribe(&self, user_id: Uuid) { + debug!("Unsubscribing user {:?} from broadcasting events.", user_id); let mut lock = self.channel.write().await; if let Some(sender) = lock.get(&user_id) { if sender.receiver_count() > 0 { diff --git a/src/messaging/notifications.rs b/src/messaging/notifications.rs index 0bb64f8..8b66419 100644 --- a/src/messaging/notifications.rs +++ b/src/messaging/notifications.rs @@ -20,7 +20,6 @@ use crate::broadcast::{BroadcastChannel, Notification}; use crate::core::AppState; use crate::errors::{AppError, AppResponse}; use crate::keycloak::decode::KeycloakToken; -use crate::keycloak::layer::KeycloakAuthLayer; struct ConnectionGuard { user_id: Uuid, @@ -85,7 +84,8 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) { let mut broadcast_events = BroadcastChannel::get().subscribe_to_user_events(user_id.clone()).await; let _guard = ConnectionGuard { user_id }; - let mut ping_interval = time::interval(Duration::from_secs(30)); + let mut ping_interval = time::interval(Duration::from_secs(15)); + let mut last_pong_received = time::Instant::now(); loop { tokio::select! { @@ -112,6 +112,12 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) { // 2. Regular ping from ism: _ = ping_interval.tick() => { + + if last_pong_received.elapsed() > Duration::from_secs(30) { + debug!("Client did not respond to ping in time, closing websocket connection"); + break; + } + if socket.send(Message::Ping(Bytes::new())).await.is_err() { // connection is dead when we can't send ping break; } @@ -120,12 +126,21 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) { // 3. Receive messages from the client: client_msg = socket.recv() => { match client_msg { - Some(Ok(Message::Close(_))) | None => break, //client is closing connection - Some(Err(_)) => break, //client error + Some(Ok(Message::Close(_))) | None => { + debug!("Client has closed the websocket connection, closing."); + break; + }, //client is closing connection + Some(Err(_)) => { + debug!("Client has an error with the websocket connection, closing."); + break; + }, //client error Some(Ok(Message::Pong(_))) => { - debug!("Client has sent Pong"); + debug!("Client has sent Websocket-Pong"); + last_pong_received = time::Instant::now(); + } + Some(Ok(_)) => { + last_pong_received = time::Instant::now(); } - _ => {} //for the future } } }