diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 4a370ac..f371ed9 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -562,7 +562,9 @@ fn do_one_downscale( crf: None, audio_bitrate: None, }; - match create_rendition(&original_path, def) { + let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct)); + let info_cb = |msg: &str| progress_handle.add_info(msg); + match create_rendition(&original_path, def, Some(&mut progress_cb), Some(&info_cb)) { Ok(upload_path) => FileToUpload { upload_path, original_path }, Err(e) => { let _ = progress_handle.add_error(format!( @@ -574,17 +576,21 @@ fn do_one_downscale( } } } - DownscaleWork::MxfAudio(original_path) => match convert_to_mp3(&original_path, None) { - Ok(upload_path) => FileToUpload { upload_path, original_path }, - Err(e) => { - let _ = progress_handle.add_error(format!( - "MXF to MP3 failed for {}: {}", - original_path.display(), - e - )); - return Ok(None); + DownscaleWork::MxfAudio(original_path) => { + let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct)); + let info_cb = |msg: &str| progress_handle.add_info(msg); + match convert_to_mp3(&original_path, None, Some(&mut progress_cb), Some(&info_cb)) { + Ok(upload_path) => FileToUpload { upload_path, original_path }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "MXF to MP3 failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } } - }, + } DownscaleWork::Video(original_path) => { let def = RenditionDefinition { quality: Some(qualities[0]), @@ -592,7 +598,9 @@ fn do_one_downscale( crf: None, audio_bitrate: None, }; - match create_rendition(&original_path, def) { + let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct)); + let info_cb = |msg: &str| progress_handle.add_info(msg); + match create_rendition(&original_path, def, Some(&mut progress_cb), Some(&info_cb)) { Ok(upload_path) => FileToUpload { upload_path, original_path }, Err(e) => { let _ = progress_handle.add_error(format!( @@ -604,18 +612,22 @@ fn do_one_downscale( } } } - DownscaleWork::Audio(original_path) => match normalize_audio_to_mp3(&original_path, Some(192)) { - Ok(upload_path) => FileToUpload { - upload_path, - original_path, - }, - Err(e) => { - let _ = progress_handle.add_error(format!( - "Audio normalization failed for {}: {}", - original_path.display(), - e - )); - return Ok(None); + DownscaleWork::Audio(original_path) => { + let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct)); + let info_cb = |msg: &str| progress_handle.add_info(msg); + match normalize_audio_to_mp3(&original_path, Some(192), Some(&mut progress_cb), Some(&info_cb)) { + Ok(upload_path) => FileToUpload { + upload_path, + original_path, + }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "Audio normalization failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } } }, DownscaleWork::Passthrough(original_path) => FileToUpload { @@ -755,6 +767,7 @@ fn run_two_queue_pipeline( .next() .ok_or_else(|| "missing presigned response".to_string())?; + progress_handle.set_upload_current_pct(Some(0.0)); if let Err(e) = upload_file_to_presigned( &file_info.upload_path, &upload_resp, @@ -762,14 +775,17 @@ fn run_two_queue_pipeline( &cfg, &api_key, bearer_header, + Some(&progress_handle), ) .await { let _ = progress_handle.add_error(e.clone()); progress_handle.set_upload_current(None::<&str>); + progress_handle.set_upload_current_pct(None); return Err(e); } + progress_handle.set_upload_current_pct(None); if let Err(e) = uploads_tracking::record_upload( &user_id, file_info.upload_path.as_path(), @@ -784,6 +800,7 @@ fn run_two_queue_pipeline( } completed_responses.push(upload_resp); progress_handle.set_upload_current(None::<&str>); + progress_handle.set_upload_current_pct(None); } if !completed_responses.is_empty() { @@ -812,15 +829,17 @@ fn run_two_queue_pipeline( Ok(()) }; - let ((), ()) = tokio::try_join!(producer, consumer)?; - Ok::<_, String>((render_handle, progress)) + let join_result = tokio::try_join!(producer, consumer); + Ok::<_, String>((render_handle, progress, join_result)) }); - let (render_handle, mut progress) = block_result?; + let (render_handle, mut progress, join_result) = block_result?; rt.block_on(TwoQueueProgress::stop_render_loop(render_handle)); - progress.finish()?; + let _ = progress.finish(); + progress.print_messages_to_stderr(); println!(); + join_result?; Ok(()) } @@ -1053,6 +1072,7 @@ pub async fn upload_file_to_presigned( _cfg: &Configuration, _api_key: &str, _bearer_opt: Option<&str>, + progress_handle: Option<&TwoQueueProgressHandle>, ) -> Result<(), String> { let total_bytes = std::fs::metadata(file_path) .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? @@ -1063,6 +1083,7 @@ pub async fn upload_file_to_presigned( .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; let mut buf = Vec::with_capacity(total_bytes as usize); let mut chunk = vec![0u8; (1024 * 1024).min(total_bytes as usize)]; + let mut read_so_far = 0u64; loop { let n = f .read(&mut chunk) @@ -1071,6 +1092,15 @@ pub async fn upload_file_to_presigned( break; } buf.extend_from_slice(&chunk[..n]); + read_so_far += n as u64; + if let Some(ph) = progress_handle { + let pct = if total_bytes > 0 { + 100.0 * (read_so_far as f64 / total_bytes as f64) + } else { + 100.0 + }; + ph.set_upload_current_pct(Some(pct)); + } } let content_type = mime_guess::from_path(file_path) diff --git a/src/media/transcode.rs b/src/media/transcode.rs index ad96340..024f0fd 100644 --- a/src/media/transcode.rs +++ b/src/media/transcode.rs @@ -1,9 +1,24 @@ use clap::ValueEnum; use ffmpeg_sidecar::command::FfmpegCommand; +use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel}; use std::path::PathBuf; use crate::media::video_quality::VideoQuality; +/// Parse ffmpeg time string e.g. "00:01:23.45" into seconds. +fn parse_ffmpeg_time(s: &str) -> Option { + let s = s.trim(); + let parts: Vec<&str> = s.splitn(3, ':').collect(); + if parts.len() != 3 { + return None; + } + let hours: f64 = parts[0].trim().parse().ok()?; + let minutes: f64 = parts[1].trim().parse().ok()?; + let secs_str = parts[2].trim(); + let seconds: f64 = secs_str.parse().ok()?; + Some(hours * 3600.0 + minutes * 60.0 + seconds) +} + #[derive(Copy, Clone, Debug, ValueEnum)] #[value(rename_all = "lowercase")] pub enum Preset { @@ -83,6 +98,8 @@ fn compute_rendition_output_path( pub fn create_rendition( input: &PathBuf, definition: RenditionDefinition, + progress_cb: Option<&mut dyn FnMut(f64)>, + info_cb: Option<&dyn Fn(&str)>, ) -> Result { let temp_base = get_temp_rendition_dir()?; let output = compute_rendition_output_path(input, &definition, &temp_base); @@ -90,12 +107,17 @@ pub fn create_rendition( if let (Ok(in_md), Ok(out_md)) = (std::fs::metadata(input), std::fs::metadata(&output)) { if let (Ok(in_time), Ok(out_time)) = (in_md.modified(), out_md.modified()) { if out_time >= in_time { - crate::output::info(format!( + let msg = format!( "Reusing existing rendition for {} at {} ({})", input.display(), output.display(), definition.to_name() - )); + ); + if let Some(f) = info_cb { + f(&msg); + } else if progress_cb.is_none() { + crate::output::info(msg); + } return Ok(output); } } @@ -129,18 +151,69 @@ pub fn create_rendition( cmd.output(output.to_string_lossy()); - let mut child = cmd - .spawn() - .map_err(|e| format!("failed to start ffmpeg: {}", e))?; - let status = child - .wait() - .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; - if !status.success() { - return Err(format!( - "ffmpeg failed creating rendition: {} -> {}", - input.display(), - output.display() - )); + if let Some(cb) = progress_cb { + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to start ffmpeg: {}", e))?; + let mut iter = child + .iter() + .map_err(|e| format!("failed to create ffmpeg event iterator: {}", e))?; + let mut total_duration: Option = None; + let mut stderr_lines: Vec = Vec::new(); + while let Some(event) = iter.next() { + match &event { + FfmpegEvent::ParsedDuration(d) => { + total_duration = Some(d.duration); + } + FfmpegEvent::Progress(p) => { + if let (Some(total), Some(current_secs)) = + (total_duration, parse_ffmpeg_time(&p.time)) + { + if total > 0.0 { + let pct = (100.0 * current_secs / total).min(100.0); + cb(pct); + } + } + } + FfmpegEvent::Error(e) => { + stderr_lines.push(e.clone()); + } + FfmpegEvent::Log(LogLevel::Error, msg) | FfmpegEvent::Log(LogLevel::Fatal, msg) => { + stderr_lines.push(msg.clone()); + } + _ => {} + } + } + let status = child + .wait() + .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; + if !status.success() { + let log_suffix = if stderr_lines.is_empty() { + String::new() + } else { + format!("\nffmpeg log:\n{}", stderr_lines.join("\n")) + }; + return Err(format!( + "ffmpeg failed creating rendition: {} -> {}{}", + input.display(), + output.display(), + log_suffix + )); + } + } else { + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to start ffmpeg: {}", e))?; + let status = child + .wait() + .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; + if !status.success() { + return Err(format!( + "ffmpeg failed creating rendition: {} -> {}", + input.display(), + output.display() + )); + } } Ok(output) @@ -163,6 +236,8 @@ fn compute_normalized_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> pub fn normalize_audio_to_mp3( input: &PathBuf, audio_bitrate: Option, + progress_cb: Option<&mut dyn FnMut(f64)>, + info_cb: Option<&dyn Fn(&str)>, ) -> Result { let temp_base = get_temp_rendition_dir()?; let output = compute_normalized_audio_output_path(input, &temp_base); @@ -170,11 +245,16 @@ pub fn normalize_audio_to_mp3( if let (Ok(in_md), Ok(out_md)) = (std::fs::metadata(input), std::fs::metadata(&output)) { if let (Ok(in_time), Ok(out_time)) = (in_md.modified(), out_md.modified()) { if out_time >= in_time { - crate::output::info(format!( + let msg = format!( "Reusing existing normalized MP3 for {} at {}", input.display(), output.display() - )); + ); + if let Some(f) = info_cb { + f(&msg); + } else if progress_cb.is_none() { + crate::output::info(msg); + } return Ok(output); } } @@ -196,35 +276,114 @@ pub fn normalize_audio_to_mp3( cmd.output(output.to_string_lossy()); - let mut child = cmd - .spawn() - .map_err(|e| format!("failed to start ffmpeg: {}", e))?; - let status = child - .wait() - .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; - if !status.success() { - return Err(format!( - "ffmpeg failed normalizing audio to MP3: {} -> {}", - input.display(), - output.display() - )); - } + run_ffmpeg_with_progress( + cmd, + input, + &output, + "normalizing audio to MP3", + progress_cb, + ) +} - Ok(output) +fn run_ffmpeg_with_progress( + mut cmd: FfmpegCommand, + input: &PathBuf, + output: &PathBuf, + operation: &str, + progress_cb: Option<&mut dyn FnMut(f64)>, +) -> Result { + if let Some(cb) = progress_cb { + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to start ffmpeg: {}", e))?; + let mut iter = child + .iter() + .map_err(|e| format!("failed to create ffmpeg event iterator: {}", e))?; + let mut total_duration: Option = None; + let mut stderr_lines: Vec = Vec::new(); + while let Some(event) = iter.next() { + match &event { + FfmpegEvent::ParsedDuration(d) => { + total_duration = Some(d.duration); + } + FfmpegEvent::Progress(p) => { + if let (Some(total), Some(current_secs)) = + (total_duration, parse_ffmpeg_time(&p.time)) + { + if total > 0.0 { + let pct = (100.0 * current_secs / total).min(100.0); + cb(pct); + } + } + } + FfmpegEvent::Error(e) => { + stderr_lines.push(e.clone()); + } + FfmpegEvent::Log(LogLevel::Error, msg) | FfmpegEvent::Log(LogLevel::Fatal, msg) => { + stderr_lines.push(msg.clone()); + } + _ => {} + } + } + let status = child + .wait() + .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; + if !status.success() { + let log_suffix = if stderr_lines.is_empty() { + String::new() + } else { + format!("\nffmpeg log:\n{}", stderr_lines.join("\n")) + }; + return Err(format!( + "ffmpeg failed {}: {} -> {}{}", + operation, + input.display(), + output.display(), + log_suffix + )); + } + Ok(output.clone()) + } else { + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to start ffmpeg: {}", e))?; + let status = child + .wait() + .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; + if !status.success() { + return Err(format!( + "ffmpeg failed {}: {} -> {}", + operation, + input.display(), + output.display() + )); + } + Ok(output.clone()) + } } -pub fn convert_to_mp3(input: &PathBuf, audio_bitrate: Option) -> Result { +pub fn convert_to_mp3( + input: &PathBuf, + audio_bitrate: Option, + progress_cb: Option<&mut dyn FnMut(f64)>, + info_cb: Option<&dyn Fn(&str)>, +) -> Result { let temp_base = get_temp_rendition_dir()?; let output = compute_audio_output_path(input, &temp_base); if let (Ok(in_md), Ok(out_md)) = (std::fs::metadata(input), std::fs::metadata(&output)) { if let (Ok(in_time), Ok(out_time)) = (in_md.modified(), out_md.modified()) { if out_time >= in_time { - crate::output::info(format!( + let msg = format!( "Reusing existing MP3 conversion for {} at {}", input.display(), output.display() - )); + ); + if let Some(f) = info_cb { + f(&msg); + } else if progress_cb.is_none() { + crate::output::info(msg); + } return Ok(output); } } @@ -247,21 +406,13 @@ pub fn convert_to_mp3(input: &PathBuf, audio_bitrate: Option) -> Result {}", - input.display(), - output.display() - )); - } - - Ok(output) + run_ffmpeg_with_progress( + cmd, + input, + &output, + "converting to MP3", + progress_cb, + ) } pub fn is_mxf_file(path: &PathBuf) -> bool { diff --git a/src/tui/inline_progress.rs b/src/tui/inline_progress.rs index dfc3791..33b1394 100644 --- a/src/tui/inline_progress.rs +++ b/src/tui/inline_progress.rs @@ -349,10 +349,11 @@ pub fn draw_ui_internal(frame: &mut Frame, state: &ProgressState) { } else { Color::Yellow }; + let pct_label = format!(" {:.0}%", task.progress); let gauge = Gauge::default() .gauge_style(Style::default().fg(gauge_color)) .ratio(task.progress / 100.0) - .label(""); + .label(pct_label); let y = list_area.top().saturating_add(i as u16); if y >= list_area.bottom() || y >= gauge_area.bottom() { diff --git a/src/tui/two_queue_progress.rs b/src/tui/two_queue_progress.rs index 3643a14..a7a5903 100644 --- a/src/tui/two_queue_progress.rs +++ b/src/tui/two_queue_progress.rs @@ -22,9 +22,11 @@ const PENDING_DISPLAY: usize = 5; pub(crate) struct TwoQueueState { pub downscale_queued: usize, pub downscale_current: Option, + pub downscale_current_pct: Option, pub downscale_pending: Vec, pub upload_queued: usize, pub upload_current: Option, + pub upload_current_pct: Option, pub upload_pending: Vec, pub recent_messages: Vec, pub max_messages: usize, @@ -37,7 +39,12 @@ impl TwoQueueState { .as_deref() .unwrap_or("—") .to_string(); - truncate_string(¤t, 35) + let s = truncate_string(¤t, 35); + if let Some(pct) = self.downscale_current_pct { + format!("{} ({:.0}%)", s, pct) + } else { + s + } } fn upload_display(&self) -> String { @@ -46,7 +53,12 @@ impl TwoQueueState { .as_deref() .unwrap_or("—") .to_string(); - truncate_string(¤t, 35) + let s = truncate_string(¤t, 35); + if let Some(pct) = self.upload_current_pct { + format!("{} ({:.0}%)", s, pct) + } else { + s + } } fn downscale_pending_next(&self) -> impl Iterator { @@ -76,7 +88,7 @@ impl TwoQueueProgress { Ok(Self { terminal: RefCell::new(Some(terminal)), state: Arc::new(Mutex::new(TwoQueueState { - max_messages: 5, + max_messages: 100, ..Default::default() })), }) @@ -133,6 +145,19 @@ impl TwoQueueProgress { } Ok(()) } + + /// Print all error and warning messages to stderr so the user has a persistent log + /// after the TUI is closed. Call this after `finish()` when the run is done. + pub fn print_messages_to_stderr(&self) { + let state = self.state.lock().unwrap(); + for msg in state.recent_messages.iter().rev() { + match msg.msg_type { + MessageType::Error => eprintln!("Error: {}", msg.text), + MessageType::Warning => eprintln!("Warning: {}", msg.text), + _ => {} + } + } + } } #[derive(Clone)] @@ -154,6 +179,12 @@ impl TwoQueueProgressHandle { pub fn set_downscale_current(&self, label: Option>) { let mut s = self.state.lock().unwrap(); s.downscale_current = label.map(Into::into); + s.downscale_current_pct = None; + } + + pub fn set_downscale_current_pct(&self, pct: Option) { + let mut s = self.state.lock().unwrap(); + s.downscale_current_pct = pct; } pub fn set_downscale_pending(&self, names: Vec) { @@ -181,6 +212,12 @@ impl TwoQueueProgressHandle { pub fn set_upload_current(&self, label: Option>) { let mut s = self.state.lock().unwrap(); s.upload_current = label.map(Into::into); + s.upload_current_pct = None; + } + + pub fn set_upload_current_pct(&self, pct: Option) { + let mut s = self.state.lock().unwrap(); + s.upload_current_pct = pct; } /// Add a file name to the upload pending list (when enqueueing an upload).