Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2024"

[dependencies]
log = "0.4.29"
axum = { version = "0.8.8", features = ["multipart"] }
axum = { version = "0.8.8", features = ["multipart", "ws"] }
tokio = {version = "1.49.0", features = ["full"]}
tower = "0.5.3"
config = "0.15.18"
Expand Down
2 changes: 1 addition & 1 deletion default.config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ism_url = "localhost"
ism_port= 5403
log_level = "info"
log_level = "debug"
cors_origin = "http://localhost:4200"
use_kafka = true
push_notification_access_token="oiqhfriuhf"
Expand Down
73 changes: 71 additions & 2 deletions src/messaging/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ use std::sync::Arc;
use std::time::Duration;
use axum::{Extension, Json};
use axum::extract::{Query, State};
use axum::response::{Sse};
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::response::{IntoResponse, Sse};
use axum::response::sse::Event;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::Stream;
use log::error;
use tokio::time;
use log::{debug, error};
use serde::Deserialize;
use tokio::sync::broadcast::error::RecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::warn;
use uuid::Uuid;
use crate::broadcast::{BroadcastChannel, Notification};
use crate::core::AppState;
use crate::errors::{AppError, AppResponse};
use crate::keycloak::decode::KeycloakToken;
use crate::keycloak::layer::KeycloakAuthLayer;

struct ConnectionGuard {
user_id: Uuid,
Expand Down Expand Up @@ -64,6 +70,69 @@ pub async fn stream_server_events(
)
}


pub async fn websocket_server_events(
websocket: WebSocketUpgrade,
Extension(token): Extension<KeycloakToken<String>>
) -> impl IntoResponse {

websocket
.on_failed_upgrade(|error| warn!("Error upgrading websocket: {}", error))
.on_upgrade(move |socket| handle_socket(socket, token.subject.clone()))
}

async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {

let mut broadcast_events = BroadcastChannel::get().subscribe_to_user_events(user_id.clone()).await;
let _guard = ConnectionGuard { user_id };
let mut ping_interval = time::interval(Duration::from_secs(30));

loop {
tokio::select! {
// 1. Handle new broadcasting event:
notification_result = broadcast_events.recv() => {
match notification_result {
Ok(event) => {
let json_msg = serde_json::to_string(&event).unwrap();
let ws_message = Message::text(json_msg);

if socket.send(ws_message).await.is_err() {
error!("Failed to send message to client");
}
}
Err(RecvError::Closed) => {
debug!("Client disconnected or channel closed");
break;
}
Err(RecvError::Lagged(_)) => {
debug!("Client is too slow!")
}
}
}

// 2. Regular ping from ism:
_ = ping_interval.tick() => {
if socket.send(Message::Ping(Bytes::new())).await.is_err() { // connection is dead when we can't send ping
break;
}
}

// 3. Receive messages from the client:
client_msg = socket.recv() => {
match client_msg {
Some(Ok(Message::Close(_))) | None => break, //client is closing connection
Some(Err(_)) => break, //client error
Some(Ok(Message::Pong(_))) => {
debug!("Client has sent Pong");
}
_ => {} //for the future
}
}
}
}
}


#[derive(Deserialize)]
pub struct NotificationQueryParam {
timestamp: DateTime<Utc>
Expand Down
5 changes: 3 additions & 2 deletions src/messaging/routes.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::Arc;
use axum::Router;
use axum::routing::{get, post};
use axum::routing::{any, get, post};
use crate::core::AppState;
use crate::messaging::handler::handle_send_message;
use crate::messaging::notifications::{get_latest_notification_events, stream_server_events};
use crate::messaging::notifications::{get_latest_notification_events, stream_server_events, websocket_server_events};

pub fn create_messaging_routes() -> Router<Arc<AppState>> {
Router::new() //add new routes here
.route("/api/notifications", get(get_latest_notification_events))
.route("/api/sse", get(stream_server_events))
.route("/api/wss", any(websocket_server_events))
.route("/api/send-msg", post(handle_send_message))
}
Loading