From 5138589b8a88e2671cc86dd13cc3a84918ebabd5 Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Tue, 17 Feb 2026 10:41:43 +0100 Subject: [PATCH 1/4] better ui --- src/commands/upload/main.rs | 576 +++++++++++++++++++++++++++--------- src/tui/inline_progress.rs | 4 +- src/tui/mod.rs | 2 + 3 files changed, 435 insertions(+), 147 deletions(-) diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 4a381c9..296162a 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -20,8 +20,9 @@ use crate::media::video_file_ext::has_video_ext; use crate::media::video_quality::parse_quality; use crate::media::video_quality::VideoQuality; use crate::output; -use crate::tui::ProgressHandle; +use crate::tui::{ProgressHandle, TwoQueueProgress, TwoQueueProgressHandle}; use crate::uploads_tracking; +use tokio::sync::mpsc as tokio_mpsc; use tellers_api_client::apis::accepts_api_key_api as api; use tellers_api_client::apis::configuration::Configuration; @@ -72,6 +73,14 @@ struct FileToUpload { original_path: PathBuf, } +/// Work item for the downscale queue (one per file, no batching). +enum DownscaleWork { + MxfVideo(PathBuf), + MxfAudio(PathBuf), + Video(PathBuf), + Passthrough(PathBuf), +} + fn has_extension(file_path: &PathBuf, extensions: &[String]) -> bool { if extensions.is_empty() { return true; @@ -183,138 +192,6 @@ pub fn run(args: UploadArgs) -> Result<(), String> { ); } - let mut files_to_upload: Vec = Vec::new(); - if args.local_encoding { - ensure_ffmpeg_available()?; - output::info(format!( - "Local encoding enabled; generating renditions in temp dir: {}", - args.qualities - .iter() - .map(|q| q.as_label()) - .collect::>() - .join(", ") - )); - - for original_file in &original_files { - if is_mxf_file(original_file) { - match has_video_streams(original_file) { - Ok(true) => { - if args.qualities.len() > 1 { - return Err("Only supporting single quality for now".to_string()); - } - output::info(format!( - "MXF file {} contains video, converting to MP4", - original_file.display() - )); - let encoded_file = create_rendition( - original_file, - RenditionDefinition { - quality: Some(args.qualities[0]), - preset: args.preset, - crf: None, - audio_bitrate: None, - }, - ) - .map_err(|e| { - format!( - "failed to encode MXF rendition for {}: {}", - original_file.display(), - e - ) - })?; - files_to_upload.push(FileToUpload { - upload_path: encoded_file, - original_path: original_file.clone(), - }); - } - Ok(false) => { - output::info(format!( - "MXF file {} is audio-only, converting to MP3", - original_file.display() - )); - let encoded_file = convert_to_mp3(original_file, None).map_err(|e| { - format!( - "failed to convert MXF to MP3 for {}: {}", - original_file.display(), - e - ) - })?; - files_to_upload.push(FileToUpload { - upload_path: encoded_file, - original_path: original_file.clone(), - }); - } - Err(e) => { - return Err(format!( - "failed to check video streams in MXF file {}: {}", - original_file.display(), - e - )); - } - } - } else if has_video_ext(original_file) { - if args.qualities.len() > 1 { - return Err("Only supporting single quality for now".to_string()); - } - let encoded_file = create_rendition( - original_file, - RenditionDefinition { - quality: Some(args.qualities[0]), - preset: args.preset, - crf: None, - audio_bitrate: None, - }, - ) - .map_err(|e| { - format!( - "failed to encode rendition for {}: {}", - original_file.display(), - e - ) - })?; - files_to_upload.push(FileToUpload { - upload_path: encoded_file, - original_path: original_file.clone(), - }); - } else { - files_to_upload.push(FileToUpload { - upload_path: original_file.clone(), - original_path: original_file.clone(), - }); - } - } - - output::info(format!( - "Prepared {} file(s) for upload (including renditions)", - files_to_upload.len() - )); - for f in &files_to_upload { - if let Ok(md) = std::fs::metadata(&f.upload_path) { - output::item(format!("{} ({} bytes)", f.upload_path.display(), md.len())); - } else { - output::item(format!("{}", f.upload_path.display())); - } - } - } else { - output::info(format!( - "Discovered {} file(s) to upload", - original_files.len() - )); - for f in &original_files { - if let Ok(md) = std::fs::metadata(f) { - output::item(format!("{} ({} bytes)", f.display(), md.len())); - } else { - output::item(format!("{}", f.display())); - } - } - for original_file in original_files { - files_to_upload.push(FileToUpload { - upload_path: original_file.clone(), - original_path: original_file, - }); - } - } - let cfg = api_config::create_config(); let api_key = api_config::get_api_key(None)?; output::info(format!("API base: {}", cfg.base_path)); @@ -323,28 +200,67 @@ pub fn run(args: UploadArgs) -> Result<(), String> { let user_id = auth::get_user_id_from_bearer(bearer_header_for_auth.as_deref()); if !args.force_upload { - let original_count = files_to_upload.len(); - files_to_upload.retain(|file_info| { - !super::utils::is_already_uploaded( - &file_info.original_path, - &user_id, - &base_dir, - &args.in_app_path, - ) + let before = original_files.len(); + original_files.retain(|path| { + !super::utils::is_already_uploaded(path, &user_id, &base_dir, &args.in_app_path) }); - - let skipped = original_count - files_to_upload.len(); + let skipped = before - original_files.len(); if skipped > 0 { output::info(format!("Skipped {} already uploaded file(s)", skipped)); } - - if files_to_upload.is_empty() { + if original_files.is_empty() { return Err("no files to upload (all files were already uploaded)".to_string()); } } let upload_request_id = Uuid::new_v4().to_string(); + if args.local_encoding { + ensure_ffmpeg_available()?; + output::info(format!( + "Local encoding: downscale + upload queues ({} quality)", + args.qualities + .iter() + .map(|q| q.as_label()) + .collect::>() + .join(", ") + )); + if args.qualities.len() > 1 { + return Err("Only supporting single quality for now".to_string()); + } + let work_items: Vec = build_downscale_work(&original_files)?; + output::info(format!("{} file(s) in downscale queue", work_items.len())); + return run_two_queue_pipeline( + work_items, + &base_dir, + &args, + &cfg, + &api_key, + bearer_header_for_auth.as_deref(), + &user_id, + &upload_request_id, + ); + } + + let mut files_to_upload: Vec = Vec::new(); + output::info(format!( + "Discovered {} file(s) to upload", + original_files.len() + )); + for f in &original_files { + if let Ok(md) = std::fs::metadata(f) { + output::item(format!("{} ({} bytes)", f.display(), md.len())); + } else { + output::item(format!("{}", f.display())); + } + } + for original_file in original_files { + files_to_upload.push(FileToUpload { + upload_path: original_file.clone(), + original_path: original_file, + }); + } + let mut requests: Vec = Vec::with_capacity(files_to_upload.len()); let mut file_upload_ids: Vec = Vec::with_capacity(files_to_upload.len()); let mut file_in_app_paths: Vec = Vec::with_capacity(files_to_upload.len()); @@ -491,6 +407,321 @@ pub fn run(args: UploadArgs) -> Result<(), String> { }) } +fn work_item_file_name(w: &DownscaleWork) -> String { + let p = match w { + DownscaleWork::MxfVideo(p) | DownscaleWork::MxfAudio(p) | DownscaleWork::Video(p) | DownscaleWork::Passthrough(p) => p, + }; + p.file_name().unwrap_or_default().to_string_lossy().to_string() +} + +/// Runs in spawn_blocking. Returns Ok(Some(file)) on success, Ok(None) on skip/error (reported via handle). +fn do_one_downscale( + work: DownscaleWork, + progress_handle: &TwoQueueProgressHandle, + qualities: &[VideoQuality], + preset: Option, +) -> Result, String> { + let file_to_upload = match work { + DownscaleWork::MxfVideo(original_path) => { + let def = RenditionDefinition { + quality: Some(qualities[0]), + preset, + crf: None, + audio_bitrate: None, + }; + match create_rendition(&original_path, def) { + Ok(upload_path) => FileToUpload { upload_path, original_path }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "Downscale failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } + } + } + DownscaleWork::MxfAudio(original_path) => match convert_to_mp3(&original_path, None) { + Ok(upload_path) => FileToUpload { upload_path, original_path }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "MXF to MP3 failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } + }, + DownscaleWork::Video(original_path) => { + let def = RenditionDefinition { + quality: Some(qualities[0]), + preset, + crf: None, + audio_bitrate: None, + }; + match create_rendition(&original_path, def) { + Ok(upload_path) => FileToUpload { upload_path, original_path }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "Downscale failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } + } + } + DownscaleWork::Passthrough(original_path) => FileToUpload { + upload_path: original_path.clone(), + original_path, + }, + }; + Ok(Some(file_to_upload)) +} + +fn build_downscale_work(original_files: &[PathBuf]) -> Result, String> { + let mut work = Vec::with_capacity(original_files.len()); + for path in original_files { + if is_mxf_file(path) { + match has_video_streams(path) { + Ok(true) => work.push(DownscaleWork::MxfVideo(path.to_path_buf())), + Ok(false) => work.push(DownscaleWork::MxfAudio(path.to_path_buf())), + Err(e) => { + return Err(format!( + "failed to check video streams in MXF {}: {}", + path.display(), + e + )); + } + } + } else if has_video_ext(path) { + work.push(DownscaleWork::Video(path.to_path_buf())); + } else { + work.push(DownscaleWork::Passthrough(path.to_path_buf())); + } + } + Ok(work) +} + +fn run_two_queue_pipeline( + work_items: Vec, + base_dir: &PathBuf, + args: &UploadArgs, + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, + user_id: &str, + upload_request_id: &str, +) -> Result<(), String> { + let (upload_tx, mut upload_rx) = tokio_mpsc::channel::(64); + + let mut progress = TwoQueueProgress::new()?; + let progress_handle = progress.clone_handle(); + progress_handle.set_downscale_queued(work_items.len()); + let downscale_pending_names: Vec = + work_items.iter().map(work_item_file_name).collect(); + progress_handle.set_downscale_pending(downscale_pending_names); + + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("failed to start runtime: {}", e))?; + + let base_dir = base_dir.clone(); + let qualities = args.qualities.clone(); + let preset = args.preset; + + let cfg = cfg.clone(); + let api_key = api_key.to_string(); + let bearer = bearer_opt.map(String::from); + let base_dir_async = base_dir.clone(); + let in_app_path = args.in_app_path.clone(); + let user_id = user_id.to_string(); + let upload_request_id = upload_request_id.to_string(); + let disable_description_generation = args.disable_description_generation; + + let block_result = rt.block_on(async move { + // Start render loop inside runtime so tokio::spawn has a current runtime + let render_handle = progress.start_render_loop(progress_handle.clone()); + + let http = Arc::new( + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(60)) + .build() + .map_err(|e| format!("failed to build http client: {}", e))?, + ); + let bearer_header = bearer.as_deref(); + + let progress_producer = progress_handle.clone(); + let upload_tx_producer = upload_tx.clone(); + let producer = async move { + for w in work_items { + progress_producer.decrement_downscale_queued(); + progress_producer.pop_downscale_pending(); + let name = work_item_file_name(&w); + progress_producer.set_downscale_current(Some(name)); + let ph = progress_producer.clone(); + let qual = qualities.clone(); + let file = tokio::task::spawn_blocking(move || do_one_downscale(w, &ph, &qual, preset)) + .await + .map_err(|e| format!("downscale task join: {}", e))??; + progress_producer.set_downscale_current(None::<&str>); + if let Some(f) = file { + progress_producer.increment_upload_queued(); + let upload_name = f + .original_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + progress_producer.push_upload_pending(upload_name); + upload_tx_producer.send(f).await.map_err(|_| "upload channel closed".to_string())?; + } + } + drop(upload_tx_producer); + Ok::<(), String>(()) + }; + drop(upload_tx); + + let consumer = async move { + let mut completed_responses: Vec = Vec::new(); + while let Some(file_info) = upload_rx.recv().await { + progress_handle.decrement_upload_queued(); + progress_handle.pop_upload_pending(); + let file_name = file_info + .original_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + progress_handle.set_upload_current(Some(file_name.clone())); + + let (req, _upload_id, in_app_path_str) = build_single_upload_request( + &file_info, + &base_dir_async, + &in_app_path, + )?; + let responses = + request_presigned_urls(&cfg, &vec![req], &api_key, bearer_header).await?; + let upload_resp = responses + .into_iter() + .next() + .ok_or_else(|| "missing presigned response".to_string())?; + + if let Err(e) = upload_file_to_presigned( + &file_info.upload_path, + &upload_resp, + http.as_ref(), + ) + .await + { + let _ = progress_handle.add_error(e.clone()); + progress_handle.set_upload_current(None::<&str>); + return Err(e); + } + + if let Err(e) = uploads_tracking::record_upload( + &user_id, + &file_info.upload_path, + &in_app_path_str, + &upload_resp.asset_id, + &upload_request_id, + ) { + let _ = progress_handle.add_warning(format!( + "Failed to record upload in tracking file: {}", + e + )); + } + completed_responses.push(upload_resp); + progress_handle.set_upload_current(None::<&str>); + } + + if !completed_responses.is_empty() { + let mut preproc_req = ProcessAssetsRequest::new( + completed_responses.clone(), + None::, + ); + preproc_req.generate_time_based_media_description = + Some(!disable_description_generation); + let _ = progress_handle.add_info("Triggering preprocessing..."); + let preproc_tasks = api::process_assets_users_assets_preprocess_post( + &cfg, + preproc_req, + None, + Some(&api_key), + bearer_header, + ) + .await + .map_err(|e| format!("failed to trigger preprocess: {}", e))?; + let _ = progress_handle.add_success(format!( + "Preprocess tasks queued: {}", + preproc_tasks.len() + )); + } + + Ok(()) + }; + + let ((), ()) = tokio::try_join!(producer, consumer)?; + Ok::<_, String>((render_handle, progress)) + }); + + let (render_handle, mut progress) = block_result?; + rt.block_on(TwoQueueProgress::stop_render_loop(render_handle)); + progress.finish()?; + println!(); + + Ok(()) +} + +fn build_single_upload_request( + file_info: &FileToUpload, + base_dir: &PathBuf, + in_app_path: &Option, +) -> Result<(AssetUploadRequest, String, String), String> { + let content_length = std::fs::metadata(&file_info.upload_path) + .map_err(|e| format!("failed to stat {}: {}", file_info.upload_path.display(), e))? + .len(); + let upload_id = Uuid::new_v4().to_string(); + let file_in_app_path = + super::utils::compute_in_app_path(&file_info.original_path, base_dir, in_app_path); + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i32; + let file_name_str = file_info + .original_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + let umid = extract_media_metadata(&file_info.original_path).ok(); + let mut source_info = SourceFileInfo::new( + "__user_upload__".to_string(), + None, + None, + vec!["__current_user__".to_string()], + Some(now_secs), + now_secs, + vec![file_in_app_path.clone()], + Some(file_name_str), + None, + vec![], + ); + if let Some(metadata) = umid { + if let Some(umid_value) = metadata.material_package_umid { + source_info.capture_device_umid = Some(Some(umid_value)); + } + if let Some(first_umid) = metadata.file_package_umids.first() { + source_info.umid = Some(Some(first_umid.clone())); + } + } + let req = AssetUploadRequest::new( + i32::try_from(content_length).unwrap_or(i32::MAX), + upload_id.clone(), + source_info, + ); + Ok((req, upload_id, file_in_app_path)) +} + async fn request_presigned_urls( cfg: &Configuration, requests: &Vec, @@ -644,6 +875,61 @@ async fn upload_to_presigned_urls( Ok(()) } +/// Upload file to presigned URL (no progress UI, no tracking). Used by the two-queue pipeline. +async fn upload_file_to_presigned( + file_path: &PathBuf, + upload_resp: &AssetUploadResponse, + http: &reqwest::Client, +) -> Result<(), String> { + let total_bytes = std::fs::metadata(file_path) + .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? + .len(); + let upload_url = upload_resp.presigned_put_url.clone(); + + let mut f = File::open(file_path) + .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; + let mut buf = Vec::with_capacity(total_bytes as usize); + let mut chunk = vec![0u8; (1024 * 1024).min(total_bytes as usize)]; + loop { + let n = f + .read(&mut chunk) + .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; + if n == 0 { + break; + } + buf.extend_from_slice(&chunk[..n]); + } + + let content_type = mime_guess::from_path(file_path) + .first_or_text_plain() + .essence_str() + .to_string(); + + let put_res = http + .put(upload_url) + .header(reqwest::header::CONTENT_LENGTH, total_bytes) + .header(reqwest::header::CONTENT_TYPE, &content_type) + .body(buf) + .send() + .await + .map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?; + + if !put_res.status().is_success() { + let status = put_res.status(); + let body = put_res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "Upload failed for {}: HTTP {} - {}", + file_path.display(), + status, + body + )); + } + Ok(()) +} + async fn upload_single_file( file_path: &PathBuf, _upload_id: &str, diff --git a/src/tui/inline_progress.rs b/src/tui/inline_progress.rs index 4a91893..dbc4e9e 100644 --- a/src/tui/inline_progress.rs +++ b/src/tui/inline_progress.rs @@ -41,8 +41,8 @@ pub(crate) enum MessageType { #[derive(Clone)] pub(crate) struct Message { - text: String, - msg_type: MessageType, + pub(crate) text: String, + pub(crate) msg_type: MessageType, } pub(crate) struct ProgressState { diff --git a/src/tui/mod.rs b/src/tui/mod.rs index 9352dfb..1b53090 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -1,3 +1,5 @@ mod inline_progress; +mod two_queue_progress; pub use inline_progress::{InlineProgress, ProgressHandle}; +pub use two_queue_progress::{TwoQueueProgress, TwoQueueProgressHandle}; From cae24adb58bcc6a5575d7d4f157621cabce9ce9a Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Tue, 17 Feb 2026 10:50:37 +0100 Subject: [PATCH 2/4] add normalization for audio Add missing file Remove unused function --- src/commands/upload/main.rs | 62 ++++-- src/media/transcode.rs | 64 ++++++ src/tui/README.md | 19 +- src/tui/inline_progress.rs | 106 ---------- src/tui/two_queue_progress.rs | 375 ++++++++++++++++++++++++++++++++++ 5 files changed, 490 insertions(+), 136 deletions(-) create mode 100644 src/tui/two_queue_progress.rs diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 296162a..b75911c 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -13,8 +13,10 @@ use crate::auth; use crate::commands::api_config; use crate::media::ffmpeg::ensure_ffmpeg_available; use crate::media::metadata::extract_media_metadata; +use crate::media::media_file_type::is_audio_file; use crate::media::transcode::{ - convert_to_mp3, create_rendition, has_video_streams, is_mxf_file, Preset, RenditionDefinition, + convert_to_mp3, create_rendition, has_video_streams, is_mxf_file, normalize_audio_to_mp3, + Preset, RenditionDefinition, }; use crate::media::video_file_ext::has_video_ext; use crate::media::video_quality::parse_quality; @@ -78,6 +80,8 @@ enum DownscaleWork { MxfVideo(PathBuf), MxfAudio(PathBuf), Video(PathBuf), + /// Any other audio (MP3, WAV, FLAC, etc.) → normalize to MP3 for streaming. + Audio(PathBuf), Passthrough(PathBuf), } @@ -409,7 +413,7 @@ pub fn run(args: UploadArgs) -> Result<(), String> { fn work_item_file_name(w: &DownscaleWork) -> String { let p = match w { - DownscaleWork::MxfVideo(p) | DownscaleWork::MxfAudio(p) | DownscaleWork::Video(p) | DownscaleWork::Passthrough(p) => p, + DownscaleWork::MxfVideo(p) | DownscaleWork::MxfAudio(p) | DownscaleWork::Video(p) | DownscaleWork::Audio(p) | DownscaleWork::Passthrough(p) => p, }; p.file_name().unwrap_or_default().to_string_lossy().to_string() } @@ -452,26 +456,40 @@ fn do_one_downscale( return Ok(None); } }, - DownscaleWork::Video(original_path) => { - let def = RenditionDefinition { - quality: Some(qualities[0]), - preset, - crf: None, - audio_bitrate: None, - }; - match create_rendition(&original_path, def) { - Ok(upload_path) => FileToUpload { upload_path, original_path }, - Err(e) => { - let _ = progress_handle.add_error(format!( - "Downscale failed for {}: {}", - original_path.display(), - e - )); - return Ok(None); + DownscaleWork::Video(original_path) => { + let def = RenditionDefinition { + quality: Some(qualities[0]), + preset, + crf: None, + audio_bitrate: None, + }; + match create_rendition(&original_path, def) { + Ok(upload_path) => FileToUpload { upload_path, original_path }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "Downscale failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } + } } - } - } - DownscaleWork::Passthrough(original_path) => FileToUpload { + DownscaleWork::Audio(original_path) => match normalize_audio_to_mp3(&original_path, Some(192)) { + Ok(upload_path) => FileToUpload { + upload_path, + original_path, + }, + Err(e) => { + let _ = progress_handle.add_error(format!( + "Audio normalization failed for {}: {}", + original_path.display(), + e + )); + return Ok(None); + } + }, + DownscaleWork::Passthrough(original_path) => FileToUpload { upload_path: original_path.clone(), original_path, }, @@ -496,6 +514,8 @@ fn build_downscale_work(original_files: &[PathBuf]) -> Result } } else if has_video_ext(path) { work.push(DownscaleWork::Video(path.to_path_buf())); + } else if is_audio_file(path) { + work.push(DownscaleWork::Audio(path.to_path_buf())); } else { work.push(DownscaleWork::Passthrough(path.to_path_buf())); } diff --git a/src/media/transcode.rs b/src/media/transcode.rs index 07540d5..839b730 100644 --- a/src/media/transcode.rs +++ b/src/media/transcode.rs @@ -153,6 +153,70 @@ fn compute_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> PathBuf { )) } +/// Output path for normalized audio (so we don't overwrite convert_to_mp3 cache). +fn compute_normalized_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> PathBuf { + out_base.join(format!( + "{}_norm.mp3", + input.file_stem().unwrap().to_string_lossy() + )) +} + +/// Normalize any audio to MP3 with loudness normalization (-14 LUFS) and good bitrate for streaming. +/// Uses FFmpeg loudnorm filter; output is 192k MP3 by default. +pub fn normalize_audio_to_mp3( + input: &PathBuf, + audio_bitrate: Option, +) -> Result { + let temp_base = get_temp_rendition_dir()?; + let output = compute_normalized_audio_output_path(input, &temp_base); + + if let (Ok(in_md), Ok(out_md)) = (std::fs::metadata(input), std::fs::metadata(&output)) { + if let (Ok(in_time), Ok(out_time)) = (in_md.modified(), out_md.modified()) { + if out_time >= in_time { + crate::output::info(format!( + "Reusing existing normalized MP3 for {} at {}", + input.display(), + output.display() + )); + return Ok(output); + } + } + } + + let bitrate_k = audio_bitrate.unwrap_or(192); + // Loudness normalization for streaming: -14 LUFS, true peak -1.5 dBTP, LRA 11 + let loudnorm = "loudnorm=I=-14:LRA=11:TP=-1.5"; + + let mut cmd = FfmpegCommand::new(); + cmd.overwrite() + .input(input.to_string_lossy()) + .args(["-af", loudnorm]) + .codec_audio("libmp3lame") + .args(["-b:a", &format!("{}k", bitrate_k)]); + + if std::env::var("TELLERS_DEBUG_FFMPEG").ok().as_deref() == Some("1") { + cmd.print_command(); + } + + cmd.output(output.to_string_lossy()); + + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to start ffmpeg: {}", e))?; + let status = child + .wait() + .map_err(|e| format!("failed to wait for ffmpeg: {}", e))?; + if !status.success() { + return Err(format!( + "ffmpeg failed normalizing audio to MP3: {} -> {}", + input.display(), + output.display() + )); + } + + Ok(output) +} + pub fn convert_to_mp3(input: &PathBuf, audio_bitrate: Option) -> Result { let temp_base = get_temp_rendition_dir()?; let output = compute_audio_output_path(input, &temp_base); diff --git a/src/tui/README.md b/src/tui/README.md index 2c1696a..6156506 100644 --- a/src/tui/README.md +++ b/src/tui/README.md @@ -11,23 +11,26 @@ This module provides a simple, reusable abstraction for displaying progress usin ### Simple Synchronous Usage +Use `clone_handle()` and the handle's methods (render loop is optional for sync use): + ```rust -use crate::tui::InlineProgress; +use crate::tui::{InlineProgress, ProgressHandle}; let mut progress = InlineProgress::new("Uploading Files", total_files)?; +let progress_handle = progress.clone_handle(); for (i, file) in files.iter().enumerate() { let file_size = std::fs::metadata(file)?.len(); - progress.start_task(i, file.display().to_string(), file_size)?; + let _ = progress_handle.start_task(i, file.display().to_string(), file_size); // Simulate upload progress for chunk in 0..100 { let uploaded = (chunk * file_size / 100) as u64; - progress.update_task(i, uploaded)?; + let _ = progress_handle.update_task(i, uploaded); std::thread::sleep(Duration::from_millis(10)); } - progress.finish_task(i, true)?; + let _ = progress_handle.finish_task(i, true); } progress.finish()?; @@ -84,11 +87,9 @@ progress.finish()?; ### `InlineProgress` - `new(title, total_tasks)` - Create a new progress display -- `start_task(task_id, label, total_bytes)` - Start tracking a task -- `update_task(task_id, uploaded_bytes)` - Update task progress -- `finish_task(task_id, success)` - Mark task as complete -- `add_message(msg)` - Add a status message -- `clone_handle()` - Get a thread-safe handle +- `clone_handle()` - Get a thread-safe handle for updates +- `start_render_loop(handle)` - Start the periodic render task (async) +- `stop_render_loop(render_handle)` - Stop the render loop - `finish()` - Finalize and cleanup ### `ProgressHandle` diff --git a/src/tui/inline_progress.rs b/src/tui/inline_progress.rs index dbc4e9e..dfc3791 100644 --- a/src/tui/inline_progress.rs +++ b/src/tui/inline_progress.rs @@ -77,98 +77,6 @@ impl InlineProgress { }) } - pub fn start_task( - &self, - task_id: TaskId, - label: impl Into, - total_bytes: u64, - ) -> Result<(), String> { - let mut state = self.state.lock().unwrap(); - state.in_progress.insert( - task_id, - TaskProgress { - label: label.into(), - started_at: Instant::now(), - progress: 0.0, - total_bytes, - uploaded_bytes: 0, - completed: false, - }, - ); - drop(state); - self.render()?; - Ok(()) - } - - pub fn update_task(&self, task_id: TaskId, uploaded_bytes: u64) -> Result<(), String> { - let mut state = self.state.lock().unwrap(); - if let Some(task) = state.in_progress.get_mut(&task_id) { - task.uploaded_bytes = uploaded_bytes; - if task.total_bytes > 0 { - task.progress = (uploaded_bytes as f64 / task.total_bytes as f64) * 100.0; - } - } - drop(state); - self.render()?; - Ok(()) - } - - pub fn finish_task(&self, task_id: TaskId, success: bool) -> Result<(), String> { - let mut state = self.state.lock().unwrap(); - if let Some(task) = state.in_progress.get_mut(&task_id) { - if success { - task.completed = true; - task.progress = 100.0; - state.completed += 1; - // Don't add to messages - task stays visible in the list with completion status - } - } - drop(state); - self.render()?; - Ok(()) - } - - pub fn add_message(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Info) - } - - pub fn add_info(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Info) - } - - pub fn add_warning(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Warning) - } - - pub fn add_error(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Error) - } - - pub fn add_success(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Success) - } - - fn add_typed_message( - &self, - msg: impl Into, - msg_type: MessageType, - ) -> Result<(), String> { - let mut state = self.state.lock().unwrap(); - state.recent_messages.insert( - 0, - Message { - text: msg.into(), - msg_type, - }, - ); - if state.recent_messages.len() > state.max_messages { - state.recent_messages.pop(); - } - drop(state); - self.render()?; - Ok(()) - } - pub fn clone_handle(&self) -> ProgressHandle { ProgressHandle { state: Arc::clone(&self.state), @@ -213,16 +121,6 @@ impl InlineProgress { let _ = render_handle.await; } - fn render(&self) -> Result<(), String> { - if let Some(ref mut terminal) = *self.terminal.borrow_mut() { - let state = self.state.lock().unwrap(); - terminal - .draw(|frame| draw_ui_internal(frame, &state)) - .map_err(|e| format!("failed to render: {}", e))?; - } - Ok(()) - } - pub fn finish(&mut self) -> Result<(), String> { if let Some(mut terminal) = self.terminal.get_mut().take() { let state = self.state.lock().unwrap(); @@ -297,10 +195,6 @@ impl ProgressHandle { Ok(()) } - pub fn add_message(&self, msg: impl Into) -> Result<(), String> { - self.add_typed_message(msg, MessageType::Info) - } - pub fn add_info(&self, msg: impl Into) -> Result<(), String> { self.add_typed_message(msg, MessageType::Info) } diff --git a/src/tui/two_queue_progress.rs b/src/tui/two_queue_progress.rs new file mode 100644 index 0000000..67ccd69 --- /dev/null +++ b/src/tui/two_queue_progress.rs @@ -0,0 +1,375 @@ +//! Two-queue progress UI: downscale queue and upload queue, each showing queue size and current file. + +use ratatui::{ + backend::CrosstermBackend, + layout::{Constraint, Layout, Rect}, + style::{Color, Modifier, Style}, + text::{Line, Span}, + widgets::{Block, List, ListItem, Paragraph}, + Frame, Terminal, TerminalOptions, Viewport, +}; +use std::{ + cell::RefCell, + io::stdout, + sync::{Arc, Mutex}, +}; + +use super::inline_progress::{Message, MessageType}; + +const PENDING_DISPLAY: usize = 5; + +#[derive(Clone, Default)] +pub(crate) struct TwoQueueState { + pub downscale_queued: usize, + pub downscale_current: Option, + /// Pending file names (next in line); first is next to process. + pub downscale_pending: Vec, + pub upload_queued: usize, + pub upload_current: Option, + pub upload_pending: Vec, + pub recent_messages: Vec, + pub max_messages: usize, +} + +impl TwoQueueState { + fn downscale_display(&self) -> String { + let current = self + .downscale_current + .as_deref() + .unwrap_or("—") + .to_string(); + truncate_string(¤t, 35) + } + + fn upload_display(&self) -> String { + let current = self + .upload_current + .as_deref() + .unwrap_or("—") + .to_string(); + truncate_string(¤t, 35) + } + + fn downscale_pending_next(&self) -> impl Iterator { + self.downscale_pending.iter().take(PENDING_DISPLAY).map(|s| s.as_str()) + } + + fn upload_pending_next(&self) -> impl Iterator { + self.upload_pending.iter().take(PENDING_DISPLAY).map(|s| s.as_str()) + } +} + +pub struct TwoQueueProgress { + terminal: RefCell>>>, + state: Arc>, +} + +impl TwoQueueProgress { + pub fn new() -> Result { + let terminal = Terminal::with_options( + CrosstermBackend::new(stdout()), + TerminalOptions { + viewport: Viewport::Inline(14), + }, + ) + .map_err(|e| format!("failed to initialize terminal: {}", e))?; + + Ok(Self { + terminal: RefCell::new(Some(terminal)), + state: Arc::new(Mutex::new(TwoQueueState { + max_messages: 5, + ..Default::default() + })), + }) + } + + pub fn clone_handle(&self) -> TwoQueueProgressHandle { + TwoQueueProgressHandle { + state: Arc::clone(&self.state), + } + } + + pub fn start_render_loop( + &mut self, + handle: TwoQueueProgressHandle, + ) -> tokio::task::JoinHandle> { + use tokio::time::{interval, Duration}; + let terminal_opt = self.terminal.get_mut().take(); + let terminal_mutex = Arc::new(Mutex::new(terminal_opt)); + let handle_clone = handle.clone(); + let terminal_clone = Arc::clone(&terminal_mutex); + + tokio::spawn(async move { + let mut interval = interval(Duration::from_millis(100)); + loop { + interval.tick().await; + if let Some(ref mut terminal) = *terminal_clone.lock().unwrap() { + let state = handle_clone.state.lock().unwrap(); + if terminal.draw(|f| draw_two_queue_ui(f, &state)).is_err() { + break; + } + } else { + break; + } + } + Ok(()) + }) + } + + pub async fn stop_render_loop( + render_handle: tokio::task::JoinHandle>, + ) { + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + render_handle.abort(); + let _ = render_handle.await; + } + + pub fn finish(&mut self) -> Result<(), String> { + if let Some(mut terminal) = self.terminal.get_mut().take() { + let state = self.state.lock().unwrap(); + terminal + .draw(|f| draw_two_queue_ui(f, &state)) + .map_err(|e| format!("failed to render: {}", e))?; + drop(terminal); + } + Ok(()) + } +} + +#[derive(Clone)] +pub struct TwoQueueProgressHandle { + pub(crate) state: Arc>, +} + +impl TwoQueueProgressHandle { + pub fn set_downscale_queued(&self, n: usize) { + let mut s = self.state.lock().unwrap(); + s.downscale_queued = n; + } + + pub fn decrement_downscale_queued(&self) { + let mut s = self.state.lock().unwrap(); + s.downscale_queued = s.downscale_queued.saturating_sub(1); + } + + pub fn set_downscale_current(&self, label: Option>) { + let mut s = self.state.lock().unwrap(); + s.downscale_current = label.map(Into::into); + } + + /// Set the list of pending file names for downscale (next in line). Call at start; then pop when starting each. + pub fn set_downscale_pending(&self, names: Vec) { + let mut s = self.state.lock().unwrap(); + s.downscale_pending = names; + } + + /// Remove the first pending downscale (the one now being processed). Call when starting a downscale. + pub fn pop_downscale_pending(&self) { + let mut s = self.state.lock().unwrap(); + if !s.downscale_pending.is_empty() { + s.downscale_pending.remove(0); + } + } + + pub fn increment_upload_queued(&self) { + let mut s = self.state.lock().unwrap(); + s.upload_queued = s.upload_queued.saturating_add(1); + } + + pub fn decrement_upload_queued(&self) { + let mut s = self.state.lock().unwrap(); + s.upload_queued = s.upload_queued.saturating_sub(1); + } + + pub fn set_upload_current(&self, label: Option>) { + let mut s = self.state.lock().unwrap(); + s.upload_current = label.map(Into::into); + } + + /// Add a file name to the upload pending list (when enqueueing an upload). + pub fn push_upload_pending(&self, name: impl Into) { + let mut s = self.state.lock().unwrap(); + s.upload_pending.push(name.into()); + } + + /// Remove the first pending upload (the one now being uploaded). Call when starting an upload. + pub fn pop_upload_pending(&self) { + let mut s = self.state.lock().unwrap(); + if !s.upload_pending.is_empty() { + s.upload_pending.remove(0); + } + } + + pub fn add_info(&self, msg: impl Into) { + self.add_typed_message(msg, MessageType::Info); + } + + pub fn add_warning(&self, msg: impl Into) { + self.add_typed_message(msg, MessageType::Warning); + } + + pub fn add_error(&self, msg: impl Into) { + self.add_typed_message(msg, MessageType::Error); + } + + pub fn add_success(&self, msg: impl Into) { + self.add_typed_message(msg, MessageType::Success); + } + + fn add_typed_message(&self, msg: impl Into, msg_type: MessageType) { + let mut s = self.state.lock().unwrap(); + s.recent_messages.insert( + 0, + Message { + text: msg.into(), + msg_type, + }, + ); + if s.recent_messages.len() > s.max_messages { + s.recent_messages.pop(); + } + } +} + +pub fn draw_two_queue_ui(frame: &mut Frame, state: &TwoQueueState) { + let area = frame.area(); + let block = Block::default().title(" Downscale │ Upload "); + frame.render_widget(block, area); + + let has_messages = !state.recent_messages.is_empty(); + let msg_space = if has_messages { 4 } else { 0 }; + + let vertical = Layout::vertical([ + Constraint::Min(3), + Constraint::Length(msg_space), + ]) + .margin(1); + + let areas = vertical.split(area); + let main_area = areas[0]; + let bottom_area = areas[1]; + + // Two columns: each shows "Queue: N" and "Current: " + let cols = Layout::horizontal([ + Constraint::Percentage(50), + Constraint::Percentage(50), + ]) + .split(main_area); + + let downscale_block = Block::default().title(Span::styled( + " Downscale ", + Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD), + )); + frame.render_widget(downscale_block, cols[0]); + + let downscale_inner = Rect::new( + cols[0].x + 1, + cols[0].y + 1, + cols[0].width.saturating_sub(2), + cols[0].height.saturating_sub(2), + ); + let mut downscale_lines: Vec> = vec![ + Line::from(vec![ + Span::raw("Queue: "), + Span::styled( + state.downscale_queued.to_string(), + Style::default().fg(Color::Yellow), + ), + ]), + Line::from(vec![ + Span::raw("Current: "), + Span::styled( + state.downscale_display(), + Style::default().fg(Color::LightGreen), + ), + ]), + ]; + let pending_down: Vec = state.downscale_pending_next().map(|s| truncate_string(s, 32)).collect(); + if !pending_down.is_empty() { + downscale_lines.push(Line::from(Span::styled("Next:", Style::default().fg(Color::DarkGray)))); + for name in &pending_down { + downscale_lines.push(Line::from(Span::styled( + format!(" {}", name), + Style::default().fg(Color::DarkGray), + ))); + } + } + let downscale_para = Paragraph::new(downscale_lines); + frame.render_widget(downscale_para, downscale_inner); + + let upload_block = Block::default().title(Span::styled( + " Upload ", + Style::default().fg(Color::Blue).add_modifier(Modifier::BOLD), + )); + frame.render_widget(upload_block, cols[1]); + + let upload_inner = Rect::new( + cols[1].x + 1, + cols[1].y + 1, + cols[1].width.saturating_sub(2), + cols[1].height.saturating_sub(2), + ); + let mut upload_lines: Vec> = vec![ + Line::from(vec![ + Span::raw("Queue: "), + Span::styled( + state.upload_queued.to_string(), + Style::default().fg(Color::Yellow), + ), + ]), + Line::from(vec![ + Span::raw("Current: "), + Span::styled( + state.upload_display(), + Style::default().fg(Color::LightGreen), + ), + ]), + ]; + let pending_up: Vec = state.upload_pending_next().map(|s| truncate_string(s, 32)).collect(); + if !pending_up.is_empty() { + upload_lines.push(Line::from(Span::styled("Next:", Style::default().fg(Color::DarkGray)))); + for name in &pending_up { + upload_lines.push(Line::from(Span::styled( + format!(" {}", name), + Style::default().fg(Color::DarkGray), + ))); + } + } + let upload_para = Paragraph::new(upload_lines); + frame.render_widget(upload_para, upload_inner); + + if has_messages && bottom_area.height > 0 { + let messages: Vec = state + .recent_messages + .iter() + .take(bottom_area.height as usize) + .map(|msg| { + let (icon, color) = match msg.msg_type { + MessageType::Info => ("ℹ", Color::Cyan), + MessageType::Warning => ("⚠", Color::Yellow), + MessageType::Error => ("✗", Color::Red), + MessageType::Success => ("✓", Color::Green), + }; + let text = truncate_string(&msg.text, 70); + ListItem::new(Line::from(vec![ + Span::styled( + format!("{} ", icon), + Style::default().fg(color).add_modifier(Modifier::BOLD), + ), + Span::styled(text, Style::default().fg(color)), + ])) + }) + .collect(); + if !messages.is_empty() { + frame.render_widget(List::new(messages), bottom_area); + } + } +} + +fn truncate_string(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + format!("{}...", &s[..max_len.saturating_sub(3)]) + } +} From 90493a1af67178be97a96961409e5a576deabad5 Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Tue, 17 Feb 2026 11:00:01 +0100 Subject: [PATCH 3/4] clean up code --- src/commands/upload/main.rs | 4 ---- src/media/transcode.rs | 4 ---- src/tui/two_queue_progress.rs | 3 --- 3 files changed, 11 deletions(-) diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index b75911c..0db5af8 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -75,12 +75,10 @@ struct FileToUpload { original_path: PathBuf, } -/// Work item for the downscale queue (one per file, no batching). enum DownscaleWork { MxfVideo(PathBuf), MxfAudio(PathBuf), Video(PathBuf), - /// Any other audio (MP3, WAV, FLAC, etc.) → normalize to MP3 for streaming. Audio(PathBuf), Passthrough(PathBuf), } @@ -418,7 +416,6 @@ fn work_item_file_name(w: &DownscaleWork) -> String { p.file_name().unwrap_or_default().to_string_lossy().to_string() } -/// Runs in spawn_blocking. Returns Ok(Some(file)) on success, Ok(None) on skip/error (reported via handle). fn do_one_downscale( work: DownscaleWork, progress_handle: &TwoQueueProgressHandle, @@ -895,7 +892,6 @@ async fn upload_to_presigned_urls( Ok(()) } -/// Upload file to presigned URL (no progress UI, no tracking). Used by the two-queue pipeline. async fn upload_file_to_presigned( file_path: &PathBuf, upload_resp: &AssetUploadResponse, diff --git a/src/media/transcode.rs b/src/media/transcode.rs index 839b730..ad96340 100644 --- a/src/media/transcode.rs +++ b/src/media/transcode.rs @@ -153,7 +153,6 @@ fn compute_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> PathBuf { )) } -/// Output path for normalized audio (so we don't overwrite convert_to_mp3 cache). fn compute_normalized_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> PathBuf { out_base.join(format!( "{}_norm.mp3", @@ -161,8 +160,6 @@ fn compute_normalized_audio_output_path(input: &PathBuf, out_base: &PathBuf) -> )) } -/// Normalize any audio to MP3 with loudness normalization (-14 LUFS) and good bitrate for streaming. -/// Uses FFmpeg loudnorm filter; output is 192k MP3 by default. pub fn normalize_audio_to_mp3( input: &PathBuf, audio_bitrate: Option, @@ -184,7 +181,6 @@ pub fn normalize_audio_to_mp3( } let bitrate_k = audio_bitrate.unwrap_or(192); - // Loudness normalization for streaming: -14 LUFS, true peak -1.5 dBTP, LRA 11 let loudnorm = "loudnorm=I=-14:LRA=11:TP=-1.5"; let mut cmd = FfmpegCommand::new(); diff --git a/src/tui/two_queue_progress.rs b/src/tui/two_queue_progress.rs index 67ccd69..3643a14 100644 --- a/src/tui/two_queue_progress.rs +++ b/src/tui/two_queue_progress.rs @@ -22,7 +22,6 @@ const PENDING_DISPLAY: usize = 5; pub(crate) struct TwoQueueState { pub downscale_queued: usize, pub downscale_current: Option, - /// Pending file names (next in line); first is next to process. pub downscale_pending: Vec, pub upload_queued: usize, pub upload_current: Option, @@ -157,13 +156,11 @@ impl TwoQueueProgressHandle { s.downscale_current = label.map(Into::into); } - /// Set the list of pending file names for downscale (next in line). Call at start; then pop when starting each. pub fn set_downscale_pending(&self, names: Vec) { let mut s = self.state.lock().unwrap(); s.downscale_pending = names; } - /// Remove the first pending downscale (the one now being processed). Call when starting a downscale. pub fn pop_downscale_pending(&self) { let mut s = self.state.lock().unwrap(); if !s.downscale_pending.is_empty() { From f2773a43f92cddbc2b9e551eb9c44f33bf7fa50d Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Tue, 17 Feb 2026 11:35:10 +0100 Subject: [PATCH 4/4] set openapi version --- .github/workflows/rust.yml | 2 +- scripts/generate_api.sh | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3a0a6f6..d6b6065 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -20,7 +20,7 @@ jobs: with: generator: rust openapi-file: src/tellers_api/openapi.tellers_public_api.yaml - generator-tag: latest + generator-tag: v7.17.0 command-args: > -o generated/tellers_api_client --additional-properties=packageName=tellers_api_client,packageVersion=0.1.0,library=reqwest,supportAsync=true,reqwestClient=true diff --git a/scripts/generate_api.sh b/scripts/generate_api.sh index 9328a7a..af00d8b 100755 --- a/scripts/generate_api.sh +++ b/scripts/generate_api.sh @@ -15,10 +15,9 @@ elif [ -n "${OPENAPI_GENERATOR_CLI_JAR:-}" ] && [ -f "${OPENAPI_GENERATOR_CLI_JA elif [ -f "${ROOT_DIR}/openapi-generator-cli.jar" ]; then GENERATOR="java -jar ${ROOT_DIR}/openapi-generator-cli.jar" else - echo "OpenAPI Generator not found. Install one of the following:" - echo " - brew install openapi-generator" - echo " - npm i -g @openapitools/openapi-generator-cli" - echo " - download the JAR and set OPENAPI_GENERATOR_CLI_JAR=/path/to/openapi-generator-cli.jar" + echo "OpenAPI Generator not found. We use openapi-generator-cli 7.17.0. Install with version control:" + echo " - brew: brew install openapi-generator@7.17" + echo " - npm: npm i -g @openapitools/openapi-generator-cli" exit 1 fi @@ -31,7 +30,7 @@ ${GENERATOR} generate \ --additional-properties=packageName=tellers_api_client,packageVersion=0.1.0,library=reqwest,supportAsync=true,reqwestClient=true echo "Generated client at ${OUT_DIR}" - + # Inject crate-wide lint allowances for generator naming quirks LIB_RS="${OUT_DIR}/src/lib.rs" if [ -f "${LIB_RS}" ]; then