Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/broadcast/event_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl BroadcastChannel {
pub async fn send_event(&self, notification: Notification, to_user: &Uuid) {
let lock = self.channel.read().await;
if let Some(sender) = lock.get(to_user) {
match sender.send(notification) {
match sender.send(notification.clone()) {
Ok(sc) => {
info!("Successfully sent {:?} broadcast event.", sc);
}
Expand All @@ -86,11 +86,11 @@ impl BroadcastChannel {
}
}
} else {
if let Err(error) = self.cache.add_notification_for_user(to_user, &notification).await {
error!("Failed to cache notification: {}", error);
};
self.send_undeliverable_notifications(notification, vec![to_user.clone()]).await;
self.send_undeliverable_notifications(notification.clone(), vec![to_user.clone()]).await;
}
if let Err(error) = self.cache.add_notification_for_user(to_user, &notification).await {
error!("Failed to cache notification: {}", error);
};
}

pub async fn send_event_to_all(&self, user_ids: Vec<Uuid>, notification: Notification) {
Expand All @@ -107,11 +107,11 @@ impl BroadcastChannel {
}
}
} else {
if let Err(error) = self.cache.add_notification_for_user(&user_id, &notification).await {
error!("Failed to cache notification: {}", error);
};
not_deliverable.push(user_id);
}
if let Err(error) = self.cache.add_notification_for_user(&user_id, &notification).await {
error!("Failed to cache notification: {}", error);
};
}
if not_deliverable.len() > 0 {
self.send_undeliverable_notifications(notification, not_deliverable).await;
Expand All @@ -134,6 +134,7 @@ impl BroadcastChannel {
}

pub async fn unsubscribe(&self, user_id: Uuid) {
debug!("Unsubscribing user {:?} from broadcasting events.", user_id);
let mut lock = self.channel.write().await;
if let Some(sender) = lock.get(&user_id) {
if sender.receiver_count() > 0 {
Expand Down
27 changes: 21 additions & 6 deletions src/messaging/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::broadcast::{BroadcastChannel, Notification};
use crate::core::AppState;
use crate::errors::{AppError, AppResponse};
use crate::keycloak::decode::KeycloakToken;
use crate::keycloak::layer::KeycloakAuthLayer;

struct ConnectionGuard {
user_id: Uuid,
Expand Down Expand Up @@ -85,7 +84,8 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {

let mut broadcast_events = BroadcastChannel::get().subscribe_to_user_events(user_id.clone()).await;
let _guard = ConnectionGuard { user_id };
let mut ping_interval = time::interval(Duration::from_secs(30));
let mut ping_interval = time::interval(Duration::from_secs(15));
let mut last_pong_received = time::Instant::now();

loop {
tokio::select! {
Expand All @@ -112,6 +112,12 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {

// 2. Regular ping from ism:
_ = ping_interval.tick() => {

if last_pong_received.elapsed() > Duration::from_secs(30) {
debug!("Client did not respond to ping in time, closing websocket connection");
break;
}

if socket.send(Message::Ping(Bytes::new())).await.is_err() { // connection is dead when we can't send ping
break;
}
Expand All @@ -120,12 +126,21 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {
// 3. Receive messages from the client:
client_msg = socket.recv() => {
match client_msg {
Some(Ok(Message::Close(_))) | None => break, //client is closing connection
Some(Err(_)) => break, //client error
Some(Ok(Message::Close(_))) | None => {
debug!("Client has closed the websocket connection, closing.");
break;
}, //client is closing connection
Some(Err(_)) => {
debug!("Client has an error with the websocket connection, closing.");
break;
}, //client error
Some(Ok(Message::Pong(_))) => {
debug!("Client has sent Pong");
debug!("Client has sent Websocket-Pong");
last_pong_received = time::Instant::now();
}
Some(Ok(_)) => {
last_pong_received = time::Instant::now();
}
_ => {} //for the future
}
}
}
Expand Down
Loading