diff --git a/crates/core/src/commands/common.rs b/crates/core/src/commands/common.rs index 8d17b67..4ed166e 100644 --- a/crates/core/src/commands/common.rs +++ b/crates/core/src/commands/common.rs @@ -54,8 +54,9 @@ pub async fn push( ]); let child = run_command_with_env( - &CommandArguments::new(command_string, context.modifiers) - .mode(crate::commands::ChildOutputMode::Nix), + &CommandArguments::new(command_string, context.modifiers).mode( + crate::commands::ChildOutputMode::Nix(Some(context.name.clone())), + ), HashMap::from([( "NIX_SSHOPTS".into(), target.create_ssh_opts(context.modifiers)?, @@ -152,7 +153,7 @@ pub async fn evaluate_hive_attribute( let child = run_command( &CommandArguments::new(command_string, modifiers) - .mode(crate::commands::ChildOutputMode::Nix), + .mode(crate::commands::ChildOutputMode::Nix(None)), ) .await?; diff --git a/crates/core/src/commands/mod.rs b/crates/core/src/commands/mod.rs index 00d9ebe..9c065a6 100644 --- a/crates/core/src/commands/mod.rs +++ b/crates/core/src/commands/mod.rs @@ -3,7 +3,8 @@ use crate::{ commands::pty::{InteractiveChildChip, interactive_command_with_env}, - hive::node::SharedTarget, + hive::node::{Name, SharedTarget}, + status::UI_SENDER, }; use std::{ collections::HashMap, @@ -26,9 +27,9 @@ pub mod common; pub(crate) mod noninteractive; pub(crate) mod pty; -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub(crate) enum ChildOutputMode { - Nix, + Nix(Option), Generic, Interactive, } @@ -76,7 +77,7 @@ impl> CommandArguments { self } - pub(crate) const fn mode(mut self, mode: ChildOutputMode) -> Self { + pub(crate) fn mode(mut self, mode: ChildOutputMode) -> Self { self.output_mode = mode; self } @@ -156,19 +157,19 @@ impl WireCommandChip for Either { impl ChildOutputMode { /// this function is by far the biggest hotspot in the whole tree /// Returns a string if this log is notable to be stored as an error message - fn trace_slice(self, line: &mut [u8]) -> Option { - let slice = match self { + fn trace_slice(&self, line: &mut [u8]) -> Option { + let (slice, task_name) = match self { Self::Generic | Self::Interactive => { let string = String::from_utf8_lossy(line); let stripped = strip_ansi_escapes::strip_str(&string); warn!("{stripped}"); return Some(string.to_string()); } - Self::Nix => { + Self::Nix(task_name) => { let position = AHO_CORASICK.find(&line).map(|x| &mut line[x.end()..]); if let Some(json_buf) = position { - json_buf + (json_buf, task_name) } else { // usually happens when ssh is outputting something warn!("{}", String::from_utf8_lossy(line)); @@ -184,9 +185,47 @@ impl ChildOutputMode { }; let (msg, level) = match log_message { - LogMessage::Start { text, level, .. } => (text, level), + LogMessage::Start { + text, + level, + id, + r#type, + .. + } => { + if let Some(tx) = UI_SENDER.get() { + let _ = tx.send(crate::status::UiMessage::ActivityBegin( + task_name.clone(), + id, + r#type, + )); + } + + (text, level) + } + LogMessage::Stop { id } => { + if let Some(tx) = UI_SENDER.get() { + let _ = tx.send(crate::status::UiMessage::ActivityEnd( + task_name.clone(), + id, + None, + )); + } + + return None; + } + LogMessage::Result { id, r#type, .. } => { + if let Some(tx) = UI_SENDER.get() { + let _ = tx.send(crate::status::UiMessage::ActivityEnd( + task_name.clone(), + id, + Some(r#type), + )); + } + + return None; + } LogMessage::Msg { msg, level, .. } => (msg, level), - _ => return None, + LogMessage::SetPhase { .. } => return None, }; if msg.is_empty() { diff --git a/crates/core/src/commands/noninteractive.rs b/crates/core/src/commands/noninteractive.rs index 267f2a2..105eace 100644 --- a/crates/core/src/commands/noninteractive.rs +++ b/crates/core/src/commands/noninteractive.rs @@ -51,7 +51,7 @@ pub(crate) async fn non_interactive_command_with_env>( command_string = arguments.command_string.as_ref(), extra = match arguments.output_mode { ChildOutputMode::Generic | ChildOutputMode::Interactive => "", - ChildOutputMode::Nix => " --log-format internal-json", + ChildOutputMode::Nix(..) => " --log-format internal-json", } ); @@ -86,7 +86,7 @@ pub(crate) async fn non_interactive_command_with_env>( .ok_or(HiveLibError::CommandError(CommandError::NoHandle))?; let mut joinset = JoinSet::new(); - let output_mode = Arc::new(arguments.output_mode); + let output_mode = Arc::new(arguments.output_mode.clone()); joinset.spawn( handle_io( diff --git a/crates/core/src/commands/pty/mod.rs b/crates/core/src/commands/pty/mod.rs index 6005692..124d83b 100644 --- a/crates/core/src/commands/pty/mod.rs +++ b/crates/core/src/commands/pty/mod.rs @@ -138,7 +138,7 @@ pub(crate) async fn interactive_command_with_env>( "{starting}{command} {flags} {IO_SUBS} && {ending}", command = arguments.command_string.as_ref(), flags = match arguments.output_mode { - ChildOutputMode::Nix => "--log-format internal-json", + ChildOutputMode::Nix(..) => "--log-format internal-json", ChildOutputMode::Generic | ChildOutputMode::Interactive => "", }, starting = create_starting_segment(arguments, &needles.start), @@ -184,7 +184,7 @@ pub(crate) async fn interactive_command_with_env>( began_tx, reader, needles, - output_mode: arguments.output_mode, + output_mode: arguments.output_mode.clone(), stderr_collection: stderr_collection.clone(), stdout_collection: stdout_collection.clone(), span: Span::current(), diff --git a/crates/core/src/commands/pty/output.rs b/crates/core/src/commands/pty/output.rs index 74407da..06ee264 100644 --- a/crates/core/src/commands/pty/output.rs +++ b/crates/core/src/commands/pty/output.rs @@ -130,7 +130,7 @@ pub(super) fn handle_pty_stdout(arguments: WatchStdoutArguments) -> Result<(), C &stdout_collection, &mut line, log_stdout, - output_mode, + &output_mode, ); } } @@ -189,7 +189,7 @@ fn handle_normal_data( stdout_collection: &Arc>>, line: &mut [u8], log_stdout: bool, - output_mode: ChildOutputMode, + output_mode: &ChildOutputMode, ) { if line.starts_with(b"#") { let stripped = &mut line[1..]; diff --git a/crates/core/src/hive/steps/activate.rs b/crates/core/src/hive/steps/activate.rs index 99fbfb9..8b41b77 100644 --- a/crates/core/src/hive/steps/activate.rs +++ b/crates/core/src/hive/steps/activate.rs @@ -59,7 +59,9 @@ impl SwitchToConfiguration { let child = run_command( &CommandArguments::new(command_string, ctx.modifiers) - .mode(crate::commands::ChildOutputMode::Nix) + .mode(crate::commands::ChildOutputMode::Nix(Some( + ctx.name.clone(), + ))) .execute_on_remote(self.target.clone()) .privileged(&self.privilege_escalation_command), ) diff --git a/crates/core/src/hive/steps/build.rs b/crates/core/src/hive/steps/build.rs index 1068cbf..a8ad34a 100644 --- a/crates/core/src/hive/steps/build.rs +++ b/crates/core/src/hive/steps/build.rs @@ -46,7 +46,9 @@ impl ExecuteStep for Build { &CommandArguments::new(command_string, ctx.modifiers) // build remotely if asked for AND we arent applying locally .execute_on_remote(self.target.clone()) - .mode(crate::commands::ChildOutputMode::Nix) + .mode(crate::commands::ChildOutputMode::Nix(Some( + ctx.name.clone(), + ))) .log_stdout(), std::collections::HashMap::new(), ) diff --git a/crates/core/src/status.rs b/crates/core/src/status.rs index 77fb3fe..6a9b4b8 100644 --- a/crates/core/src/status.rs +++ b/crates/core/src/status.rs @@ -1,11 +1,13 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // Copyright 2024-2025 wire Contributors +use crate::hive::node::Name; +use nix_compat::log::{ActivityType, ResultType}; use owo_colors::OwoColorize; use std::{ collections::VecDeque, fmt::Write, - sync::OnceLock, + sync::{Arc, OnceLock}, time::{Duration, Instant}, }; use termion::{clear, cursor}; @@ -14,8 +16,6 @@ use tokio::sync::{ oneshot, }; -use crate::hive::node::Name; - use std::collections::HashMap; #[derive(Default)] @@ -43,10 +43,82 @@ pub enum UiMessage { Clear, /// Writes above the status line LogLine(Vec), + /// Notes that a nix activity has begun + ActivityBegin(Option, u64, ActivityType), + /// Notes that a nix activity has ended + ActivityEnd(Option, u64, Option), +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ActivityCategory { + Download, + Build, + Other, +} + +impl From for ActivityCategory { + fn from(activity_type: ActivityType) -> Self { + match activity_type { + ActivityType::CopyPath + | ActivityType::FileTransfer + | ActivityType::Substitute + | ActivityType::QueryPathInfo + | ActivityType::FetchTree + | ActivityType::CopyPaths => Self::Download, + + ActivityType::Build | ActivityType::Builds | ActivityType::BuildWaiting => Self::Build, + + _ => Self::Other, + } + } +} + +#[derive(Default)] +pub struct ActivityTracker { + active_by_id: HashMap, + + pub active_downloads: usize, + pub completed_downloads: usize, + pub active_builds: usize, + pub completed_builds: usize, + pub active_other: usize, + pub completed_other: usize, +} + +impl ActivityTracker { + pub fn begin(&mut self, id: u64, category: ActivityCategory) { + self.active_by_id.insert(id, category); + match category { + ActivityCategory::Download => self.active_downloads += 1, + ActivityCategory::Build => self.active_builds += 1, + ActivityCategory::Other => self.active_other += 1, + } + } + + pub fn end(&mut self, id: u64) { + if let Some(category) = self.active_by_id.remove(&id) { + match category { + ActivityCategory::Download => { + self.active_downloads = self.active_downloads.saturating_sub(1); + self.completed_downloads += 1; + } + ActivityCategory::Build => { + self.active_builds = self.active_builds.saturating_sub(1); + self.completed_builds += 1; + } + ActivityCategory::Other => { + self.active_other = self.active_other.saturating_sub(1); + self.completed_other += 1; + } + } + } + } } pub struct Status { - statuses: HashMap, + node_statuses: HashMap, + nix_activities: HashMap, ActivityTracker>, + began: Instant, show_progress: bool, } @@ -56,7 +128,8 @@ pub static UI_SENDER: OnceLock> = OnceLock::new impl Status { fn new() -> Self { Self { - statuses: HashMap::default(), + node_statuses: HashMap::default(), + nix_activities: HashMap::default(), began: Instant::now(), show_progress: false, } @@ -68,7 +141,7 @@ impl Status { #[must_use] fn num_finished(&self) -> usize { - self.statuses + self.node_statuses .iter() .filter(|(_, status)| matches!(status, NodeStatus::Succeeded | NodeStatus::Failed)) .count() @@ -76,7 +149,7 @@ impl Status { #[must_use] fn num_running(&self) -> usize { - self.statuses + self.node_statuses .iter() .filter(|(_, status)| matches!(status, NodeStatus::Running(..))) .count() @@ -84,7 +157,7 @@ impl Status { #[must_use] fn num_failed(&self) -> usize { - self.statuses + self.node_statuses .iter() .filter(|(_, status)| matches!(status, NodeStatus::Failed)) .count() @@ -92,11 +165,20 @@ impl Status { #[must_use] pub fn get_msg(&self) -> String { - if self.statuses.is_empty() { + if self.node_statuses.is_empty() { return String::new(); } - let mut msg = format!("[{} / {}", self.num_finished(), self.statuses.len(),); + let mut msg = String::new(); + + let _ = write!(&mut msg, "{}s Elapsed ", self.began.elapsed().as_secs()); + + let _ = write!( + &mut msg, + "[Nodes {} / {}", + self.num_finished(), + self.node_statuses.len() + ); let num_failed = self.num_failed(); let num_running = self.num_running(); @@ -121,7 +203,69 @@ impl Status { let _ = write!(&mut msg, "]"); - let _ = write!(&mut msg, " {}s", self.began.elapsed().as_secs()); + let (total_active_downloads, total_completed_downloads) = + self.nix_activities + .iter() + .fold((0, 0), |acc, (_, tracker)| { + ( + acc.0 + tracker.active_downloads, + acc.1 + tracker.completed_downloads, + ) + }); + + let (total_active_builds, total_completed_builds) = + self.nix_activities + .iter() + .fold((0, 0), |acc, (_, tracker)| { + ( + acc.0 + tracker.active_builds, + acc.1 + tracker.completed_builds, + ) + }); + + let (total_active_other, total_completed_other) = + self.nix_activities + .iter() + .fold((0, 0), |acc, (_, tracker)| { + ( + acc.0 + tracker.active_other, + acc.1 + tracker.completed_other, + ) + }); + + if total_active_downloads > 0 || total_completed_downloads > 0 { + let _ = write!( + &mut msg, + " [DL Jobs: {total_active_downloads} active, {total_completed_downloads} done]", + ); + } + + if total_active_builds > 0 || total_completed_builds > 0 { + let _ = write!( + &mut msg, + " [Build Jobs: {total_active_builds} active, {total_completed_builds} done]", + ); + } + + if total_active_other > 0 || total_completed_other > 0 { + let _ = write!( + &mut msg, + " [Other Jobs: {total_active_other} active, {total_completed_other} done]" + ); + } + + let wire_tasks = Name(Arc::from("wire tasks")); + + for name in self.nix_activities.keys() { + let _ = write!( + &mut msg, + "\n {}", + match name { + Some(name) => name.on_default_color().into_styled(), + None => wire_tasks.italic().into_styled(), + } + ); + } msg } @@ -178,14 +322,14 @@ pub async fn status_tick_worker(mut rx: UnboundedReceiver, show_progr Some(msg) = rx.recv() => { match msg { UiMessage::AddMany(names) => { - status.statuses.extend( + status.node_statuses.extend( names .iter() .map(|name| (name.0.to_string(), NodeStatus::Pending)), ); }, UiMessage::SetStatus(name, value) => { - status.statuses.insert(name.0.to_string(), value); + status.node_statuses.insert(name.0.to_string(), value); }, UiMessage::Takeover(tx) => { taken_over = true; @@ -214,6 +358,14 @@ pub async fn status_tick_worker(mut rx: UnboundedReceiver, show_progr status.write_status(&mut stderr); } }, + UiMessage::ActivityBegin(name, id, activity_type) => { + let tracker = status.nix_activities.entry(name).or_default(); + tracker.begin(id, activity_type.into()); + } + UiMessage::ActivityEnd(name, id, _activity_type) => { + let tracker = status.nix_activities.entry(name).or_default(); + tracker.end(id); + } } }