diff --git a/src/cli/cmd/job/logs.rs b/src/cli/cmd/job/logs.rs index 6cc4bb4..5946f64 100644 --- a/src/cli/cmd/job/logs.rs +++ b/src/cli/cmd/job/logs.rs @@ -1,5 +1,8 @@ use super::Context; -use crate::{cli::sink::Error as SinkError, data::simple_message::SimpleMessage, httpclient}; +use crate::{cli::sink::Error as SinkError, httpclient}; +use std::time::Duration; +use tokio::signal; +use tokio::time::sleep; use clap::{Parser, ValueHint}; @@ -10,8 +13,17 @@ use snafu::{ResultExt, Snafu}; /// List the logs of a job. #[derive(Parser, Debug)] pub struct Input { + /// The job name/id to get logs for. #[arg(value_hint=ValueHint::Other)] pub job_id: String, + + /// Periodically retrieves logs, it will stop when the job finished. + #[arg(long, short, default_value_t = false)] + pub follow: bool, + + /// The interval in seconds to wait between calls for logs. + #[arg(long, default_value_t = 2)] + pub follow_interval: u8, } #[derive(Debug, Snafu)] @@ -25,24 +37,64 @@ pub enum Error { impl Input { pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + if self.follow { + self.follow_logs(ctx).await + } else { + self.show_logs(&ctx, 0).await?; + Ok(()) + } + } + + async fn show_logs(&self, ctx: &Context, seen: usize) -> Result { let result = ctx .client .session_logs(&self.job_id) .await .context(HttpClientSnafu)?; + if let Some(lines_blob) = result.0.get("amalthea-session") { + let lines: Vec<&str> = lines_blob.lines().collect(); + if lines.len() > seen { + for line in &lines[seen..] { + println!("{}", line); + } + return Ok(lines.len()); + } + } + Ok(seen) + } - if let Some(lines) = result.0.get("amalthea-session") { - ctx.write_result(&SimpleMessage { - message: lines.to_string(), - }) - .await - .context(WriteResultSnafu) - } else { - ctx.write_result(&SimpleMessage { - message: "No logs available.".to_string(), - }) + async fn is_session_finished(&self, ctx: &Context) -> Result { + let details = ctx + .client + .get_session(&self.job_id) .await - .context(WriteResultSnafu) + .context(HttpClientSnafu)?; + + match &details { + None => Ok(true), + Some(d) => Ok(!d.status.state.is_running()), + } + } + + async fn follow_logs(&self, ctx: Context) -> Result<(), Error> { + let mut seen: usize = self.show_logs(&ctx, 0).await?; + if self.is_session_finished(&ctx).await? { + return Ok(()); + } + + loop { + tokio::select! { + _ = signal::ctrl_c() => { + eprintln!("Interrupted, exiting."); + break Ok(()); + } + _ = sleep(Duration::from_secs(self.follow_interval as u64)) => { + seen = self.show_logs(&ctx, seen).await?; + if self.is_session_finished(&ctx).await? { + break Ok(()); + } + } + } } } } diff --git a/src/httpclient.rs b/src/httpclient.rs index a0be177..000502d 100644 --- a/src/httpclient.rs +++ b/src/httpclient.rs @@ -398,6 +398,15 @@ impl Client { Ok(()) } + pub async fn get_session( + &self, + session_id: &str, + ) -> Result, Error> { + let path = format!("/api/data/sessions/{}", session_id); + log::debug!("Get session: {}", session_id); + self.json_get_option::(&path).await + } + pub async fn list_sessions(&self, mode: Option) -> Result { let url = self.make_url("/api/data/sessions")?; log::debug!( diff --git a/src/httpclient/data.rs b/src/httpclient/data.rs index 22fdbfa..fd16f47 100644 --- a/src/httpclient/data.rs +++ b/src/httpclient/data.rs @@ -93,7 +93,8 @@ where for r in data { let sub_id = r.submission_id.as_deref().unwrap_or("-"); let started = r.started.format(); - let data = vec![&r.name, sub_id, &r.project_id, &r.status.state, &started]; + let status = r.status.state.to_string(); + let data = vec![&r.name, sub_id, &r.project_id, &status, &started]; builder.push_record(data); } builder.insert_record( @@ -119,21 +120,59 @@ impl fmt::Display for SessionList { } } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SessionState { + Running, + Starting, + Stopping, + Failed, + Hibernated, + Succeeded, +} + +impl fmt::Display for SessionState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = match self { + SessionState::Running => "Running", + SessionState::Starting => "Starting", + SessionState::Stopping => "Stopping", + SessionState::Failed => "Failed", + SessionState::Hibernated => "Hibernated", + SessionState::Succeeded => "Succeeded", + }; + f.write_str(name) + } +} + +impl SessionState { + pub fn is_running(&self) -> bool { + match self { + SessionState::Running => true, + SessionState::Starting => true, + SessionState::Stopping => true, + SessionState::Failed => false, + SessionState::Hibernated => false, + SessionState::Succeeded => false, + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct SessionStatus { - message: Option, - state: String, + pub message: Option, + pub state: SessionState, } #[derive(Debug, Serialize, Deserialize)] pub struct SessionStartResponse { - image: String, - name: String, - project_id: String, - launcher_id: String, - submission_id: Option, - status: SessionStatus, - started: Timestamp, + pub image: String, + pub name: String, + pub project_id: String, + pub launcher_id: String, + pub submission_id: Option, + pub status: SessionStatus, + pub started: Timestamp, } impl fmt::Display for SessionStartResponse {