Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::{Notify, mpsc};
use crate::client::status::{Status, job_status};
use crate::common::serverdir::ServerDir;
use crate::server::event::Event;
use crate::server::job::JobTaskState;
use crate::server::job::{Job, JobTaskState};
use crate::server::state::{State, StateRef};
use crate::transfer::connection::accept_client;
use crate::transfer::messages::{
Expand Down Expand Up @@ -278,7 +278,18 @@ pub async fn client_rpc_loop<
response
}
FromClientMessage::ForgetJob(msg) => {
handle_job_forget(&state_ref, senders, &msg.selector, msg.filter)
let (msg, forgotten_jobs) =
handle_job_forget(&state_ref, senders, &msg.selector, msg.filter);
// The Drop implementation (even for a single job! because it can have
// thousands of tasks) can take a long time and stall the event loop.
// So we perform it on another thread.
if !forgotten_jobs.is_empty() {
let _ = tokio::task::spawn_blocking(move || {
drop(forgotten_jobs);
})
.await;
}
msg
}
FromClientMessage::JobDetail(msg) => {
compute_job_detail(&state_ref, msg.job_id_selector, msg.task_selector)
Expand Down Expand Up @@ -742,16 +753,22 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C
}
}

/// Forgetting jobs can release a lot of memory and perform a lot of destructors.
/// Since this happens in synchronous code, it can stall the event loop, which can lead to e.g.
/// missing worker heartbeats and other problems.
///
/// We thus return the forgotten jobs, so that we can drop them later in a separate thread.
fn handle_job_forget(
state_ref: &StateRef,
senders: &Senders,
selector: &IdSelector,
allowed_statuses: Vec<Status>,
) -> ToClientMessage {
) -> (ToClientMessage, Vec<Job>) {
let mut state = state_ref.get_mut();
let job_ids: Vec<JobId> = get_job_ids(&state, selector);
let mut forgotten: usize = 0;

let mut forgotten_jobs = Vec::with_capacity(job_ids.len());
for &job_id in &job_ids {
let can_be_forgotten = state
.get_job(job_id)
Expand All @@ -760,7 +777,9 @@ fn handle_job_forget(
})
.unwrap_or(false);
if can_be_forgotten {
state.forget_job(job_id);
if let Some(job) = state.forget_job(job_id) {
forgotten_jobs.push(job);
}
forgotten += 1;
}
}
Expand All @@ -769,7 +788,10 @@ fn handle_job_forget(

let ignored = job_ids.len() - forgotten;

ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored })
(
ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored }),
forgotten_jobs,
)
}

fn handle_get_list(state_ref: &StateRef, workers: bool) -> ToClientMessage {
Expand Down
Loading