Skip to content

Commit 5fddd3a

Browse files
committed
feat: add stream_user_stage_status handler and service
1 parent 214b408 commit 5fddd3a

8 files changed

Lines changed: 162 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ clap = { version = "4.5.41", features = ["derive", "env"] }
2929
dotenv = "0.15.0"
3030
flate2 = "1.1.2"
3131
fs_extra = "1.3.0"
32+
futures = "0.3.31"
3233
ghrepo = "0.7.1"
3334
http-body-util = "0.1.3"
3435
indexmap = {version = "2.9.0", features = ["serde"] }
@@ -42,6 +43,7 @@ tar = "0.4.44"
4243
tempfile = "3.20.0"
4344
thiserror = "2.0.12"
4445
tokio = { version = "1.46.1", features = ["full"] }
46+
tokio-stream = "0.1.17"
4547
tower-http = { version = "0.6.6", features = ["cors"] }
4648
tracing = "0.1.41"
4749
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

openapi.json

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,52 @@
696696
}
697697
]
698698
}
699+
},
700+
"/v1/user/courses/{slug}/stages/{stage_slug}/status": {
701+
"get": {
702+
"tags": [
703+
"User",
704+
"Stage"
705+
],
706+
"summary": "Stream the status of a specific stage for the current user.",
707+
"operationId": "stream_user_stage_status",
708+
"parameters": [
709+
{
710+
"name": "slug",
711+
"in": "path",
712+
"description": "The slug of course",
713+
"required": true,
714+
"schema": {
715+
"type": "string"
716+
}
717+
},
718+
{
719+
"name": "stage_slug",
720+
"in": "path",
721+
"description": "The slug of stage",
722+
"required": true,
723+
"schema": {
724+
"type": "string"
725+
}
726+
}
727+
],
728+
"responses": {
729+
"200": {
730+
"description": "Successfully started streaming stage status updates"
731+
},
732+
"404": {
733+
"description": "Course or stage not found"
734+
},
735+
"500": {
736+
"description": "Failed to stream stage status"
737+
}
738+
},
739+
"security": [
740+
{
741+
"JWTBearerAuth": []
742+
}
743+
]
744+
}
699745
}
700746
},
701747
"components": {
@@ -1107,6 +1153,23 @@
11071153
"description": "Current progress status (in_progress, completed)"
11081154
}
11091155
}
1156+
},
1157+
"UserStageStatusResponse": {
1158+
"type": "object",
1159+
"required": [
1160+
"status",
1161+
"test"
1162+
],
1163+
"properties": {
1164+
"status": {
1165+
"type": "string",
1166+
"description": "Current progress status (in_progress, completed)"
1167+
},
1168+
"test": {
1169+
"type": "string",
1170+
"description": "Test result status (passed, failed)"
1171+
}
1172+
}
11101173
}
11111174
},
11121175
"securitySchemes": {

src/handler/stage.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@ use axum::{
1616
Json,
1717
extract::{Path, State},
1818
http::StatusCode,
19-
response::IntoResponse,
19+
response::{
20+
IntoResponse, Sse,
21+
sse::{Event, KeepAlive},
22+
},
2023
};
21-
use std::sync::Arc;
24+
use futures::{Stream, StreamExt};
25+
use std::{convert::Infallible, sync::Arc};
26+
use tokio_stream::wrappers::ReceiverStream;
27+
use tracing::{error, info};
2228

2329
use crate::{
2430
context::Context,
@@ -193,3 +199,59 @@ pub async fn complete_stage(
193199
let res = StageService::complete(ctx, &claims.id, &slug, &req.slug).await?;
194200
Ok((StatusCode::OK, Json(res)))
195201
}
202+
203+
/// Stream the status of a specific stage for the current user.
204+
#[utoipa::path(
205+
operation_id = "stream_user_stage_status",
206+
get, path = "/v1/user/courses/{slug}/stages/{stage_slug}/status",
207+
params(
208+
("slug" = String, description = "The slug of course"),
209+
("stage_slug" = String, description = "The slug of stage"),
210+
),
211+
responses(
212+
(status = 200, description = "Successfully started streaming stage status updates"),
213+
(status = 404, description = "Course or stage not found"),
214+
(status = 500, description = "Failed to stream stage status")
215+
),
216+
security(("JWTBearerAuth" = [])),
217+
tags = ["User", "Stage"]
218+
)]
219+
pub async fn stream_user_stage_status(
220+
claims: Claims,
221+
State(ctx): State<Arc<Context>>,
222+
Path((slug, stage_slug)): Path<(String, String)>,
223+
) -> Sse<impl Stream<Item = axum::response::Result<Event, Infallible>>> {
224+
info!(
225+
"Starting to stream status updates for stage {} in course {} for user {}...",
226+
stage_slug, slug, claims.id
227+
);
228+
229+
// Create a channel for sending status updates.
230+
let (sender, receiver) = tokio::sync::mpsc::channel(100);
231+
232+
// Spawn a background task to fetch and send status updates.
233+
tokio::spawn(async move {
234+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
235+
loop {
236+
interval.tick().await;
237+
let status =
238+
StageService::get_user_stage_status(&ctx, &claims.id, &slug, &stage_slug).await;
239+
if let Ok(status) = status {
240+
let event = Event::default().json_data(status).unwrap_or_else(|e| {
241+
error!("Failed to serialize status update: {}", e);
242+
Event::default().data("status update error")
243+
});
244+
if sender.send(event).await.is_err() {
245+
break;
246+
}
247+
}
248+
}
249+
});
250+
251+
// Convert the receiver into a stream.
252+
let stream = ReceiverStream::new(receiver);
253+
let stream = stream.map(Ok);
254+
255+
// Return the SSE stream with keep-alive.
256+
Sse::new(stream).keep_alive(KeepAlive::default())
257+
}

src/response/stage.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,12 @@ impl From<UserStageModel> for UserStageResponse {
134134
}
135135
}
136136
}
137+
138+
#[derive(Debug, Serialize, Deserialize, ToSchema)]
139+
pub struct UserStageStatusResponse {
140+
/// Current progress status (in_progress, completed)
141+
pub status: String,
142+
143+
/// Test result status (passed, failed)
144+
pub test: String,
145+
}

src/routes.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ pub fn build() -> Router<Arc<Context>> {
5656
.route("/v1/user/courses/{slug}/stages", get(stage::find_user_stages))
5757
.route("/v1/user/courses/{slug}/stages", post(stage::complete_stage))
5858
.route("/v1/user/courses/{slug}/stages/{stage_slug}", get(stage::get_user_stage))
59+
.route(
60+
"/v1/user/courses/{slug}/stages/{stage_slug}/status",
61+
get(stage::stream_user_stage_status),
62+
)
5963
}

src/service/stage.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
errors::{ApiError, Result},
2121
model::{UserCourseModel, UserStageModel},
2222
repository::{CourseRepository, StageRepository},
23-
response::{StageDetailResponse, StageResponse, UserStageResponse},
23+
response::{StageDetailResponse, StageResponse, UserStageResponse, UserStageStatusResponse},
2424
};
2525

2626
/// Service for managing stages
@@ -153,4 +153,19 @@ impl StageService {
153153

154154
Ok(())
155155
}
156+
157+
/// Get the current status of a stage for the user.
158+
pub async fn get_user_stage_status(
159+
ctx: &Arc<Context>,
160+
user_id: &str,
161+
course_slug: &str,
162+
stage_slug: &str,
163+
) -> Result<UserStageStatusResponse> {
164+
// Fetch the user's stage record from the database.
165+
let user_stage =
166+
StageRepository::get_user_stage(&ctx.database, user_id, course_slug, stage_slug)
167+
.await?;
168+
169+
Ok(UserStageStatusResponse { status: user_stage.status, test: "failed".into() })
170+
}
156171
}

src/swagger.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use crate::{handler, request, response};
4747
handler::stage::find_user_stages,
4848
handler::stage::complete_stage,
4949
handler::stage::get_user_stage,
50+
handler::stage::stream_user_stage_status
5051
),
5152
components(
5253
schemas(
@@ -63,6 +64,7 @@ use crate::{handler, request, response};
6364
request::UpdateUserCourseRequest,
6465
response::UserCourseResponse,
6566
response::UserStageResponse,
67+
response::UserStageStatusResponse,
6668
)
6769
),
6870
tags(

0 commit comments

Comments
 (0)