Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions src/cli/cmd/job/logs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::Context;
use crate::{cli::sink::Error as SinkError, data::simple_message::SimpleMessage, httpclient};
use crate::{cli::sink::Error as SinkError, httpclient};
use std::time::Duration;
use tokio::signal;
use tokio::time::sleep;

use clap::{Parser, ValueHint};

Expand All @@ -10,8 +13,17 @@ use snafu::{ResultExt, Snafu};
/// List the logs of a job.
#[derive(Parser, Debug)]
pub struct Input {
/// The job name/id to get logs for.
#[arg(value_hint=ValueHint::Other)]
pub job_id: String,

/// Periodically retrieves logs, it will stop when the job finished.
#[arg(long, short, default_value_t = false)]
pub follow: bool,

/// The interval in seconds to wait between calls for logs.
#[arg(long, default_value_t = 2)]
pub follow_interval: u8,
}

#[derive(Debug, Snafu)]
Expand All @@ -25,24 +37,64 @@ pub enum Error {

impl Input {
pub async fn exec(&self, ctx: Context) -> Result<(), Error> {
if self.follow {
self.follow_logs(ctx).await
} else {
self.show_logs(&ctx, 0).await?;
Ok(())
}
}

async fn show_logs(&self, ctx: &Context, seen: usize) -> Result<usize, Error> {
let result = ctx
.client
.session_logs(&self.job_id)
.await
.context(HttpClientSnafu)?;
if let Some(lines_blob) = result.0.get("amalthea-session") {
let lines: Vec<&str> = lines_blob.lines().collect();
if lines.len() > seen {
for line in &lines[seen..] {
println!("{}", line);
}
return Ok(lines.len());
}
}
Ok(seen)
}

if let Some(lines) = result.0.get("amalthea-session") {
ctx.write_result(&SimpleMessage {
message: lines.to_string(),
})
.await
.context(WriteResultSnafu)
} else {
ctx.write_result(&SimpleMessage {
message: "No logs available.".to_string(),
})
async fn is_session_finished(&self, ctx: &Context) -> Result<bool, Error> {
let details = ctx
.client
.get_session(&self.job_id)
.await
.context(WriteResultSnafu)
.context(HttpClientSnafu)?;

match &details {
None => Ok(true),
Some(d) => Ok(!d.status.state.is_running()),
}
}

async fn follow_logs(&self, ctx: Context) -> Result<(), Error> {
let mut seen: usize = self.show_logs(&ctx, 0).await?;
if self.is_session_finished(&ctx).await? {
return Ok(());
}

loop {
tokio::select! {
_ = signal::ctrl_c() => {
eprintln!("Interrupted, exiting.");
break Ok(());
}
_ = sleep(Duration::from_secs(self.follow_interval as u64)) => {
seen = self.show_logs(&ctx, seen).await?;
if self.is_session_finished(&ctx).await? {
break Ok(());
}
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/httpclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ impl Client {
Ok(())
}

pub async fn get_session(
&self,
session_id: &str,
) -> Result<Option<SessionStartResponse>, Error> {
let path = format!("/api/data/sessions/{}", session_id);
log::debug!("Get session: {}", session_id);
self.json_get_option::<SessionStartResponse>(&path).await
}

pub async fn list_sessions(&self, mode: Option<SessionMode>) -> Result<SessionList, Error> {
let url = self.make_url("/api/data/sessions")?;
log::debug!(
Expand Down
59 changes: 49 additions & 10 deletions src/httpclient/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ where
for r in data {
let sub_id = r.submission_id.as_deref().unwrap_or("-");
let started = r.started.format();
let data = vec![&r.name, sub_id, &r.project_id, &r.status.state, &started];
let status = r.status.state.to_string();
let data = vec![&r.name, sub_id, &r.project_id, &status, &started];
builder.push_record(data);
}
builder.insert_record(
Expand All @@ -119,21 +120,59 @@ impl fmt::Display for SessionList {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SessionState {
Running,
Starting,
Stopping,
Failed,
Hibernated,
Succeeded,
}

impl fmt::Display for SessionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
SessionState::Running => "Running",
SessionState::Starting => "Starting",
SessionState::Stopping => "Stopping",
SessionState::Failed => "Failed",
SessionState::Hibernated => "Hibernated",
SessionState::Succeeded => "Succeeded",
};
f.write_str(name)
}
}

impl SessionState {
pub fn is_running(&self) -> bool {
match self {
SessionState::Running => true,
SessionState::Starting => true,
SessionState::Stopping => true,
SessionState::Failed => false,
SessionState::Hibernated => false,
SessionState::Succeeded => false,
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SessionStatus {
message: Option<String>,
state: String,
pub message: Option<String>,
pub state: SessionState,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SessionStartResponse {
image: String,
name: String,
project_id: String,
launcher_id: String,
submission_id: Option<String>,
status: SessionStatus,
started: Timestamp,
pub image: String,
pub name: String,
pub project_id: String,
pub launcher_id: String,
pub submission_id: Option<String>,
pub status: SessionStatus,
pub started: Timestamp,
}

impl fmt::Display for SessionStartResponse {
Expand Down
Loading