diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index 35eb8422a..7b9745bca 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::{Notify, mpsc}; use crate::client::status::{Status, job_status}; use crate::common::serverdir::ServerDir; use crate::server::event::Event; -use crate::server::job::JobTaskState; +use crate::server::job::{Job, JobTaskState}; use crate::server::state::{State, StateRef}; use crate::transfer::connection::accept_client; use crate::transfer::messages::{ @@ -278,7 +278,18 @@ pub async fn client_rpc_loop< response } FromClientMessage::ForgetJob(msg) => { - handle_job_forget(&state_ref, senders, &msg.selector, msg.filter) + let (msg, forgotten_jobs) = + handle_job_forget(&state_ref, senders, &msg.selector, msg.filter); + // The Drop implementation (even for a single job! because it can have + // thousands of tasks) can take a long time and stall the event loop. + // So we perform it on another thread. + if !forgotten_jobs.is_empty() { + let _ = tokio::task::spawn_blocking(move || { + drop(forgotten_jobs); + }) + .await; + } + msg } FromClientMessage::JobDetail(msg) => { compute_job_detail(&state_ref, msg.job_id_selector, msg.task_selector) @@ -742,16 +753,22 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C } } +/// Forgetting jobs can release a lot of memory and perform a lot of destructors. +/// Since this happens in synchronous code, it can stall the event loop, which can lead to e.g. +/// missing worker heartbeats and other problems. +/// +/// We thus return the forgotten jobs, so that we can drop them later in a separate thread. fn handle_job_forget( state_ref: &StateRef, senders: &Senders, selector: &IdSelector, allowed_statuses: Vec, -) -> ToClientMessage { +) -> (ToClientMessage, Vec) { let mut state = state_ref.get_mut(); let job_ids: Vec = get_job_ids(&state, selector); let mut forgotten: usize = 0; + let mut forgotten_jobs = Vec::with_capacity(job_ids.len()); for &job_id in &job_ids { let can_be_forgotten = state .get_job(job_id) @@ -760,7 +777,9 @@ fn handle_job_forget( }) .unwrap_or(false); if can_be_forgotten { - state.forget_job(job_id); + if let Some(job) = state.forget_job(job_id) { + forgotten_jobs.push(job); + } forgotten += 1; } } @@ -769,7 +788,10 @@ fn handle_job_forget( let ignored = job_ids.len() - forgotten; - ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored }) + ( + ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored }), + forgotten_jobs, + ) } fn handle_get_list(state_ref: &StateRef, workers: bool) -> ToClientMessage {