From d58191a21e242eeb951c914c36645c1a08f3ea92 Mon Sep 17 00:00:00 2001 From: JrTimha Date: Sat, 21 Feb 2026 23:18:48 +0100 Subject: [PATCH] add WebSocket support for server events, including `/api/wss` route and connection handling logic --- Cargo.lock | 44 ++++++++++++++++++++ Cargo.toml | 2 +- default.config.toml | 2 +- src/messaging/notifications.rs | 73 +++++++++++++++++++++++++++++++++- src/messaging/routes.rs | 5 ++- 5 files changed, 120 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8136b2e..f2b75fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,6 +273,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "base64 0.22.1", "bytes", "form_urlencoded", "futures-util", @@ -292,8 +293,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -897,6 +900,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.9" @@ -4560,6 +4569,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -4741,6 +4762,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.0", + "sha1", + "thiserror 2.0.18", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -4852,6 +4890,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index 6b756ea..6535018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] log = "0.4.29" -axum = { version = "0.8.8", features = ["multipart"] } +axum = { version = "0.8.8", features = ["multipart", "ws"] } tokio = {version = "1.49.0", features = ["full"]} tower = "0.5.3" config = "0.15.18" diff --git a/default.config.toml b/default.config.toml index 8586eaf..c6a76c0 100644 --- a/default.config.toml +++ b/default.config.toml @@ -1,6 +1,6 @@ ism_url = "localhost" ism_port= 5403 -log_level = "info" +log_level = "debug" cors_origin = "http://localhost:4200" use_kafka = true push_notification_access_token="oiqhfriuhf" diff --git a/src/messaging/notifications.rs b/src/messaging/notifications.rs index d6188c8..0bb64f8 100644 --- a/src/messaging/notifications.rs +++ b/src/messaging/notifications.rs @@ -2,19 +2,25 @@ use std::sync::Arc; use std::time::Duration; use axum::{Extension, Json}; use axum::extract::{Query, State}; -use axum::response::{Sse}; +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; +use axum::response::{IntoResponse, Sse}; use axum::response::sse::Event; +use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::Stream; -use log::error; +use tokio::time; +use log::{debug, error}; use serde::Deserialize; +use tokio::sync::broadcast::error::RecvError; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tracing::warn; use uuid::Uuid; use crate::broadcast::{BroadcastChannel, Notification}; use crate::core::AppState; use crate::errors::{AppError, AppResponse}; use crate::keycloak::decode::KeycloakToken; +use crate::keycloak::layer::KeycloakAuthLayer; struct ConnectionGuard { user_id: Uuid, @@ -64,6 +70,69 @@ pub async fn stream_server_events( ) } + +pub async fn websocket_server_events( + websocket: WebSocketUpgrade, + Extension(token): Extension> +) -> impl IntoResponse { + + websocket + .on_failed_upgrade(|error| warn!("Error upgrading websocket: {}", error)) + .on_upgrade(move |socket| handle_socket(socket, token.subject.clone())) +} + +async fn handle_socket(mut socket: WebSocket, user_id: Uuid) { + + let mut broadcast_events = BroadcastChannel::get().subscribe_to_user_events(user_id.clone()).await; + let _guard = ConnectionGuard { user_id }; + let mut ping_interval = time::interval(Duration::from_secs(30)); + + loop { + tokio::select! { + // 1. Handle new broadcasting event: + notification_result = broadcast_events.recv() => { + match notification_result { + Ok(event) => { + let json_msg = serde_json::to_string(&event).unwrap(); + let ws_message = Message::text(json_msg); + + if socket.send(ws_message).await.is_err() { + error!("Failed to send message to client"); + } + } + Err(RecvError::Closed) => { + debug!("Client disconnected or channel closed"); + break; + } + Err(RecvError::Lagged(_)) => { + debug!("Client is too slow!") + } + } + } + + // 2. Regular ping from ism: + _ = ping_interval.tick() => { + if socket.send(Message::Ping(Bytes::new())).await.is_err() { // connection is dead when we can't send ping + break; + } + } + + // 3. Receive messages from the client: + client_msg = socket.recv() => { + match client_msg { + Some(Ok(Message::Close(_))) | None => break, //client is closing connection + Some(Err(_)) => break, //client error + Some(Ok(Message::Pong(_))) => { + debug!("Client has sent Pong"); + } + _ => {} //for the future + } + } + } + } +} + + #[derive(Deserialize)] pub struct NotificationQueryParam { timestamp: DateTime diff --git a/src/messaging/routes.rs b/src/messaging/routes.rs index 1ef4eeb..769a532 100644 --- a/src/messaging/routes.rs +++ b/src/messaging/routes.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use axum::Router; -use axum::routing::{get, post}; +use axum::routing::{any, get, post}; use crate::core::AppState; use crate::messaging::handler::handle_send_message; -use crate::messaging::notifications::{get_latest_notification_events, stream_server_events}; +use crate::messaging::notifications::{get_latest_notification_events, stream_server_events, websocket_server_events}; pub fn create_messaging_routes() -> Router> { Router::new() //add new routes here .route("/api/notifications", get(get_latest_notification_events)) .route("/api/sse", get(stream_server_events)) + .route("/api/wss", any(websocket_server_events)) .route("/api/send-msg", post(handle_send_message)) } \ No newline at end of file