Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 58 additions & 28 deletions src/commands/upload/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,9 @@ fn do_one_downscale(
crf: None,
audio_bitrate: None,
};
match create_rendition(&original_path, def) {
let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct));
let info_cb = |msg: &str| progress_handle.add_info(msg);
match create_rendition(&original_path, def, Some(&mut progress_cb), Some(&info_cb)) {
Ok(upload_path) => FileToUpload { upload_path, original_path },
Err(e) => {
let _ = progress_handle.add_error(format!(
Expand All @@ -574,25 +576,31 @@ fn do_one_downscale(
}
}
}
DownscaleWork::MxfAudio(original_path) => match convert_to_mp3(&original_path, None) {
Ok(upload_path) => FileToUpload { upload_path, original_path },
Err(e) => {
let _ = progress_handle.add_error(format!(
"MXF to MP3 failed for {}: {}",
original_path.display(),
e
));
return Ok(None);
DownscaleWork::MxfAudio(original_path) => {
let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct));
let info_cb = |msg: &str| progress_handle.add_info(msg);
match convert_to_mp3(&original_path, None, Some(&mut progress_cb), Some(&info_cb)) {
Ok(upload_path) => FileToUpload { upload_path, original_path },
Err(e) => {
let _ = progress_handle.add_error(format!(
"MXF to MP3 failed for {}: {}",
original_path.display(),
e
));
return Ok(None);
}
}
},
}
DownscaleWork::Video(original_path) => {
let def = RenditionDefinition {
quality: Some(qualities[0]),
preset,
crf: None,
audio_bitrate: None,
};
match create_rendition(&original_path, def) {
let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct));
let info_cb = |msg: &str| progress_handle.add_info(msg);
match create_rendition(&original_path, def, Some(&mut progress_cb), Some(&info_cb)) {
Ok(upload_path) => FileToUpload { upload_path, original_path },
Err(e) => {
let _ = progress_handle.add_error(format!(
Expand All @@ -604,18 +612,22 @@ fn do_one_downscale(
}
}
}
DownscaleWork::Audio(original_path) => match normalize_audio_to_mp3(&original_path, Some(192)) {
Ok(upload_path) => FileToUpload {
upload_path,
original_path,
},
Err(e) => {
let _ = progress_handle.add_error(format!(
"Audio normalization failed for {}: {}",
original_path.display(),
e
));
return Ok(None);
DownscaleWork::Audio(original_path) => {
let mut progress_cb = |pct: f64| progress_handle.set_downscale_current_pct(Some(pct));
let info_cb = |msg: &str| progress_handle.add_info(msg);
match normalize_audio_to_mp3(&original_path, Some(192), Some(&mut progress_cb), Some(&info_cb)) {
Ok(upload_path) => FileToUpload {
upload_path,
original_path,
},
Err(e) => {
let _ = progress_handle.add_error(format!(
"Audio normalization failed for {}: {}",
original_path.display(),
e
));
return Ok(None);
}
}
},
DownscaleWork::Passthrough(original_path) => FileToUpload {
Expand Down Expand Up @@ -755,21 +767,25 @@ fn run_two_queue_pipeline(
.next()
.ok_or_else(|| "missing presigned response".to_string())?;

progress_handle.set_upload_current_pct(Some(0.0));
if let Err(e) = upload_file_to_presigned(
&file_info.upload_path,
&upload_resp,
http.as_ref(),
&cfg,
&api_key,
bearer_header,
Some(&progress_handle),
)
.await
{
let _ = progress_handle.add_error(e.clone());
progress_handle.set_upload_current(None::<&str>);
progress_handle.set_upload_current_pct(None);
return Err(e);
}

progress_handle.set_upload_current_pct(None);
if let Err(e) = uploads_tracking::record_upload(
&user_id,
file_info.upload_path.as_path(),
Expand All @@ -784,6 +800,7 @@ fn run_two_queue_pipeline(
}
completed_responses.push(upload_resp);
progress_handle.set_upload_current(None::<&str>);
progress_handle.set_upload_current_pct(None);
}

if !completed_responses.is_empty() {
Expand Down Expand Up @@ -812,15 +829,17 @@ fn run_two_queue_pipeline(
Ok(())
};

let ((), ()) = tokio::try_join!(producer, consumer)?;
Ok::<_, String>((render_handle, progress))
let join_result = tokio::try_join!(producer, consumer);
Ok::<_, String>((render_handle, progress, join_result))
});

let (render_handle, mut progress) = block_result?;
let (render_handle, mut progress, join_result) = block_result?;
rt.block_on(TwoQueueProgress::stop_render_loop(render_handle));
progress.finish()?;
let _ = progress.finish();
progress.print_messages_to_stderr();
println!();

join_result?;
Ok(())
}

Expand Down Expand Up @@ -1053,6 +1072,7 @@ pub async fn upload_file_to_presigned(
_cfg: &Configuration,
_api_key: &str,
_bearer_opt: Option<&str>,
progress_handle: Option<&TwoQueueProgressHandle>,
) -> Result<(), String> {
let total_bytes = std::fs::metadata(file_path)
.map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))?
Expand All @@ -1063,6 +1083,7 @@ pub async fn upload_file_to_presigned(
.map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?;
let mut buf = Vec::with_capacity(total_bytes as usize);
let mut chunk = vec![0u8; (1024 * 1024).min(total_bytes as usize)];
let mut read_so_far = 0u64;
loop {
let n = f
.read(&mut chunk)
Expand All @@ -1071,6 +1092,15 @@ pub async fn upload_file_to_presigned(
break;
}
buf.extend_from_slice(&chunk[..n]);
read_so_far += n as u64;
if let Some(ph) = progress_handle {
let pct = if total_bytes > 0 {
100.0 * (read_so_far as f64 / total_bytes as f64)
} else {
100.0
};
ph.set_upload_current_pct(Some(pct));
}
}

let content_type = mime_guess::from_path(file_path)
Expand Down
Loading