From 38f41b277c3bf708a2d293d1cf138c6690e2e8ca Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Wed, 18 Feb 2026 20:23:24 +0100 Subject: [PATCH 1/2] bump openapi --- README.md | 2 + src/commands/asset/preprocess.rs | 8 +- src/commands/entity/create.rs | 87 ++--- src/commands/upload/main.rs | 332 +++++++++++++++--- src/commands/upload/mod.rs | 2 +- .../openapi.tellers_public_api.yaml | 50 +++ 6 files changed, 360 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index dc599e2..61e0e92 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ tellers upload /path/to/media_folder - `--ext ` — Filter files by extension (e.g., `--ext mp4 --ext mov`) - `--in-app-path ` — Set the in-app path for uploaded files +Files ≥ 10 MiB use **multipart S3 upload** (presigned part URLs, then complete); smaller files use a single presigned PUT. + ## Implementation Notes - Argument parsing via Clap 4.x. See `clap` docs: [docs.rs/clap](https://docs.rs/clap/latest/clap/) diff --git a/src/commands/asset/preprocess.rs b/src/commands/asset/preprocess.rs index 4f55745..541ddc4 100644 --- a/src/commands/asset/preprocess.rs +++ b/src/commands/asset/preprocess.rs @@ -30,13 +30,7 @@ pub fn run(args: PreprocessArgs) -> Result<(), String> { let assets: Vec = args .ids .into_iter() - .map(|asset_id| { - AssetUploadResponse::new( - String::new(), - String::new(), - asset_id, - ) - }) + .map(|asset_id| AssetUploadResponse::new(String::new(), asset_id)) .collect(); let mut preproc_req = ProcessAssetsRequest::new( diff --git a/src/commands/entity/create.rs b/src/commands/entity/create.rs index 839eb46..7b56a2a 100644 --- a/src/commands/entity/create.rs +++ b/src/commands/entity/create.rs @@ -1,6 +1,4 @@ use clap::Args; -use std::fs::File; -use std::io::Read; use std::path::PathBuf; use tellers_api_client::apis::accepts_api_key_api as api; use tellers_api_client::models::{ @@ -111,13 +109,12 @@ pub fn run(args: CreateArgs) -> Result<(), String> { if let Some(asset_id) = asset_id { output::info(format!("Associating asset {} with entity {}", asset_id, entity_id)); - let asset = AssetUploadResponse::new( - "".to_string(), - "".to_string(), - asset_id.clone(), - ); + let asset = AssetUploadResponse::new(String::new(), asset_id.clone()); - let process_req = ProcessEntityRequest::new(entity_id.clone(), vec![asset]); + let process_req = ProcessEntityRequest { + entity_id: entity_id.clone(), + assets: vec![asset], + }; let process_resp = api::process_entity_users_entity_preprocess_post( &cfg, @@ -209,11 +206,25 @@ fn upload_file_and_get_asset_id( } } - let upload_req = AssetUploadRequest::new( - i32::try_from(content_length).unwrap_or(i32::MAX), + const MULTIPART_THRESHOLD_BYTES: u64 = 10 * 1024 * 1024; + const MULTIPART_MIN_PART_SIZE_BYTES: i32 = 5 * 1024 * 1024; + const MULTIPART_MAX_PARTS: u64 = 1000; + + fn multipart_part_size_for(size: u64) -> i32 { + let min_size = MULTIPART_MIN_PART_SIZE_BYTES as u64; + let size_for_1000 = (size + MULTIPART_MAX_PARTS - 1) / MULTIPART_MAX_PARTS; + (min_size.max(size_for_1000) as i32).max(MULTIPART_MIN_PART_SIZE_BYTES) + } + + let mut upload_req = AssetUploadRequest::new( + i64::try_from(content_length).unwrap_or(i64::MAX), upload_id.clone(), source_info, ); + if content_length >= MULTIPART_THRESHOLD_BYTES { + upload_req.multipart = Some(true); + upload_req.multipart_part_size = Some(multipart_part_size_for(content_length)); + } output::info(format!("Requesting presigned URL for {}", file_path.display())); @@ -248,64 +259,28 @@ fn upload_file_and_get_asset_id( } let upload_resp = responses.remove(0); - let upload_url = upload_resp.presigned_put_url.clone(); let asset_id = upload_resp.asset_id.clone(); output::info(format!("Uploading file to presigned URL...")); - let mut f = File::open(file_path) - .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; - let mut buf = Vec::with_capacity(content_length as usize); - - const CHUNK_SIZE: usize = 1024 * 1024; - let mut chunk = vec![0u8; CHUNK_SIZE.min(content_length as usize)]; - - loop { - let n = f - .read(&mut chunk) - .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; - if n == 0 { - break; - } - buf.extend_from_slice(&chunk[..n]); - } - - let content_type = mime_guess::from_path(file_path) - .first_or_text_plain() - .essence_str() - .to_string(); - let http = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(60)) .build() .map_err(|e| format!("failed to build http client: {}", e))?; - let put_res = http - .put(upload_url) - .header(reqwest::header::CONTENT_LENGTH, content_length) - .header(reqwest::header::CONTENT_TYPE, &content_type) - .body(buf) - .send() - .await - .map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?; - - if !put_res.status().is_success() { - let status = put_res.status(); - let body = put_res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Upload failed for {}: HTTP {} - {}", - file_path.display(), - status, - body - )); - } + crate::commands::upload::upload_file_to_presigned( + file_path, + &upload_resp, + &http, + cfg, + api_key, + bearer_header, + ) + .await?; if let Err(e) = uploads_tracking::record_upload( user_id, - file_path, + file_path.as_path(), &in_app_path, &asset_id, &upload_request_id, diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 0db5af8..4da3786 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -29,7 +29,8 @@ use tokio::sync::mpsc as tokio_mpsc; use tellers_api_client::apis::accepts_api_key_api as api; use tellers_api_client::apis::configuration::Configuration; use tellers_api_client::models::{ - AssetUploadRequest, AssetUploadResponse, ProcessAssetsRequest, SourceFileInfo, + AssetUploadRequest, AssetUploadResponse, MultipartAbortRequest, MultipartCompleteRequest, + MultipartPart, ProcessAssetsRequest, SourceFileInfo, }; #[derive(Args, Debug)] @@ -70,6 +71,20 @@ pub struct UploadArgs { pub disable_description_generation: bool, } +/// Use multipart upload for files at least this size (10 MiB). +const MULTIPART_THRESHOLD_BYTES: u64 = 10 * 1024 * 1024; +/// S3 minimum part size (5 MiB); last part can be smaller. +const MULTIPART_MIN_PART_SIZE_BYTES: i32 = 5 * 1024 * 1024; +/// S3 maximum number of parts per multipart upload. +const MULTIPART_MAX_PARTS: u64 = 1000; + +/// Part size so that part_count = ceil(content_length / part_size) <= MULTIPART_MAX_PARTS. +fn multipart_part_size_for(content_length: u64) -> i32 { + let min_size = MULTIPART_MIN_PART_SIZE_BYTES as u64; + let size_for_1000 = (content_length + MULTIPART_MAX_PARTS - 1) / MULTIPART_MAX_PARTS; + (min_size.max(size_for_1000) as i32).max(MULTIPART_MIN_PART_SIZE_BYTES) +} + struct FileToUpload { upload_path: PathBuf, original_path: PathBuf, @@ -325,11 +340,15 @@ pub fn run(args: UploadArgs) -> Result<(), String> { } } - let req = AssetUploadRequest::new( - i32::try_from(content_length).unwrap_or(i32::MAX), + let mut req = AssetUploadRequest::new( + i64::try_from(content_length).unwrap_or(i64::MAX), upload_id, source_info, ); + if content_length >= MULTIPART_THRESHOLD_BYTES { + req.multipart = Some(true); + req.multipart_part_size = Some(multipart_part_size_for(content_length)); + } requests.push(req); } @@ -366,6 +385,9 @@ pub fn run(args: UploadArgs) -> Result<(), String> { &user_id, args.parallel_uploads, &progress_handle, + &cfg, + &api_key, + bearer_header.as_deref(), ) .await; @@ -627,6 +649,9 @@ fn run_two_queue_pipeline( &file_info.upload_path, &upload_resp, http.as_ref(), + &cfg, + &api_key, + bearer_header, ) .await { @@ -637,7 +662,7 @@ fn run_two_queue_pipeline( if let Err(e) = uploads_tracking::record_upload( &user_id, - &file_info.upload_path, + file_info.upload_path.as_path(), &in_app_path_str, &upload_resp.asset_id, &upload_request_id, @@ -731,11 +756,15 @@ fn build_single_upload_request( source_info.umid = Some(Some(first_umid.clone())); } } - let req = AssetUploadRequest::new( - i32::try_from(content_length).unwrap_or(i32::MAX), + let mut req = AssetUploadRequest::new( + i64::try_from(content_length).unwrap_or(i64::MAX), upload_id.clone(), source_info, ); + if content_length >= MULTIPART_THRESHOLD_BYTES { + req.multipart = Some(true); + req.multipart_part_size = Some(multipart_part_size_for(content_length)); + } Ok((req, upload_id, file_in_app_path)) } @@ -818,6 +847,9 @@ async fn upload_to_presigned_urls( user_id: &str, max_concurrent: usize, progress_handle: &ProgressHandle, + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, ) -> Result<(), String> { let http = Arc::new( reqwest::Client::builder() @@ -828,6 +860,9 @@ async fn upload_to_presigned_urls( let semaphore = Arc::new(Semaphore::new(max_concurrent)); let mut upload_tasks = Vec::new(); + let cfg = cfg.clone(); + let api_key = api_key.to_string(); + let bearer_opt = bearer_opt.map(String::from); for (i, file_path) in files.iter().enumerate() { let file_path = file_path.clone(); @@ -843,6 +878,9 @@ async fn upload_to_presigned_urls( let user_id = user_id.to_string(); let upload_request_id = upload_request_id.to_string(); let task_id = i; + let cfg_clone = cfg.clone(); + let api_key_clone = api_key.clone(); + let bearer_clone = bearer_opt.clone(); let file_name = file_path .file_name() @@ -872,6 +910,9 @@ async fn upload_to_presigned_urls( &http_clone, &progress_handle_clone, file_size, + &cfg_clone, + &api_key_clone, + bearer_clone.as_deref(), ) .await; @@ -892,15 +933,171 @@ async fn upload_to_presigned_urls( Ok(()) } -async fn upload_file_to_presigned( +fn single_put_url(resp: &AssetUploadResponse) -> Option { + resp.presigned_put_url + .as_ref() + .and_then(|o| o.as_ref()) + .cloned() +} + +fn is_multipart_response(resp: &AssetUploadResponse) -> bool { + resp.presigned_put_urls + .as_ref() + .and_then(|o| o.as_ref()) + .map(|u| !u.is_empty()) + == Some(true) +} + +async fn upload_multipart_then_complete( file_path: &PathBuf, upload_resp: &AssetUploadResponse, http: &reqwest::Client, + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, ) -> Result<(), String> { + let urls = upload_resp + .presigned_put_urls + .as_ref() + .and_then(|o| o.as_ref()) + .ok_or_else(|| "multipart response missing presigned_put_urls".to_string())?; + let multipart_upload_id = upload_resp + .multipart_upload_id + .as_ref() + .and_then(|o| o.as_ref()) + .cloned() + .ok_or_else(|| "multipart response missing multipart_upload_id".to_string())?; + let part_size = upload_resp + .multipart_part_size + .as_ref() + .and_then(|o| o.as_ref()) + .copied() + .ok_or_else(|| "multipart response missing multipart_part_size".to_string())? + as u64; + let part_count = urls.len(); + + let mut f = File::open(file_path) + .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; + let content_type = mime_guess::from_path(file_path) + .first_or_text_plain() + .essence_str() + .to_string(); + + let mut parts: Vec = Vec::with_capacity(part_count); + for (i, url) in urls.iter().enumerate() { + let part_num = (i + 1) as i32; + let read_size = if i + 1 < part_count { + part_size as usize + } else { + // Last part: read remainder + let total = std::fs::metadata(file_path) + .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? + .len(); + let offset = (part_count - 1) as u64 * part_size; + (total.saturating_sub(offset)) as usize + }; + let mut buf = vec![0u8; read_size]; + let n = f + .read(&mut buf) + .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; + buf.truncate(n); + + let put_res = http + .put(url.as_str()) + .header(reqwest::header::CONTENT_LENGTH, buf.len()) + .header(reqwest::header::CONTENT_TYPE, &content_type) + .body(buf) + .send() + .await + .map_err(|e| format!("multipart part {} upload failed: {}", part_num, e))?; + + if !put_res.status().is_success() { + let status = put_res.status(); + let body = put_res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + let _ = api::abort_multipart_asset_upload_users_assets_multipart_abort_post( + cfg, + MultipartAbortRequest::new( + upload_resp.asset_id.clone(), + multipart_upload_id.clone(), + ), + Some(api_key), + bearer_opt, + ) + .await; + return Err(format!( + "Multipart part {} failed for {}: HTTP {} - {}", + part_num, + file_path.display(), + status, + body + )); + } + + let etag = put_res + .headers() + .get(reqwest::header::ETAG) + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim_matches('"').to_string()) + .ok_or_else(|| { + format!( + "multipart part {} for {}: response missing ETag", + part_num, + file_path.display() + ) + })?; + parts.push(MultipartPart::new(part_num, etag)); + } + + let complete_req = MultipartCompleteRequest::new( + upload_resp.asset_id.clone(), + multipart_upload_id, + parts, + ); + api::complete_multipart_asset_upload_users_assets_multipart_complete_post( + cfg, + complete_req, + Some(api_key), + bearer_opt, + ) + .await + .map_err(|e| { + let mut m = format!("multipart complete failed for {}: {}", file_path.display(), e); + if let tellers_api_client::apis::Error::ResponseError(ref r) = e { + m.push_str(&format!("; body: {}", r.content)); + } + m + })?; + Ok(()) +} + +pub async fn upload_file_to_presigned( + file_path: &PathBuf, + upload_resp: &AssetUploadResponse, + http: &reqwest::Client, + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, +) -> Result<(), String> { + if is_multipart_response(upload_resp) { + return upload_multipart_then_complete( + file_path, + upload_resp, + http, + cfg, + api_key, + bearer_opt, + ) + .await; + } + let total_bytes = std::fs::metadata(file_path) .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? .len(); - let upload_url = upload_resp.presigned_put_url.clone(); + let upload_url: String = single_put_url(upload_resp) + .ok_or_else(|| "response missing presigned_put_url".to_string())?; let mut f = File::open(file_path) .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; @@ -922,7 +1119,7 @@ async fn upload_file_to_presigned( .to_string(); let put_res = http - .put(upload_url) + .put(upload_url.as_str()) .header(reqwest::header::CONTENT_LENGTH, total_bytes) .header(reqwest::header::CONTENT_TYPE, &content_type) .body(buf) @@ -957,64 +1154,85 @@ async fn upload_single_file( http: &reqwest::Client, progress_handle: &ProgressHandle, total_bytes: u64, + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, ) -> Result<(), String> { - let upload_url = upload_resp.presigned_put_url.clone(); - - let mut f = File::open(file_path) - .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; - let mut buf = Vec::with_capacity(total_bytes as usize); - - const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks - let mut uploaded = 0u64; - let mut chunk = vec![0u8; CHUNK_SIZE.min(total_bytes as usize)]; - - loop { - let n = f - .read(&mut chunk) - .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; - if n == 0 { - break; + if is_multipart_response(upload_resp) { + let result = upload_multipart_then_complete( + file_path, + upload_resp, + http, + cfg, + api_key, + bearer_opt, + ) + .await; + let _ = progress_handle.update_task(task_id, total_bytes); + if let Err(ref e) = result { + let _ = progress_handle.add_error(e.clone()); + return result; + } + } else { + let upload_url: String = single_put_url(upload_resp) + .ok_or_else(|| "response missing presigned_put_url".to_string())?; + + let mut f = File::open(file_path) + .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; + let mut buf = Vec::with_capacity(total_bytes as usize); + + const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks + let mut uploaded = 0u64; + let mut chunk = vec![0u8; CHUNK_SIZE.min(total_bytes as usize)]; + + loop { + let n = f + .read(&mut chunk) + .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; + if n == 0 { + break; + } + buf.extend_from_slice(&chunk[..n]); + uploaded += n as u64; + let _ = progress_handle.update_task(task_id, uploaded); } - buf.extend_from_slice(&chunk[..n]); - uploaded += n as u64; - let _ = progress_handle.update_task(task_id, uploaded); - } - let content_type = mime_guess::from_path(file_path) - .first_or_text_plain() - .essence_str() - .to_string(); + let content_type = mime_guess::from_path(file_path) + .first_or_text_plain() + .essence_str() + .to_string(); - let put_res = http - .put(upload_url) - .header(reqwest::header::CONTENT_LENGTH, total_bytes) - .header(reqwest::header::CONTENT_TYPE, &content_type) - .body(buf) - .send() - .await - .map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?; + let put_res = http + .put(upload_url.as_str()) + .header(reqwest::header::CONTENT_LENGTH, total_bytes) + .header(reqwest::header::CONTENT_TYPE, &content_type) + .body(buf) + .send() + .await + .map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?; - let _ = progress_handle.update_task(task_id, total_bytes); + let _ = progress_handle.update_task(task_id, total_bytes); - if !put_res.status().is_success() { - let status = put_res.status(); - let body = put_res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - let error_msg = format!( - "Upload failed for {}: HTTP {} - {}", - file_path.display(), - status, - body - ); - let _ = progress_handle.add_error(error_msg.clone()); - return Err(error_msg); + if !put_res.status().is_success() { + let status = put_res.status(); + let body = put_res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + let error_msg = format!( + "Upload failed for {}: HTTP {} - {}", + file_path.display(), + status, + body + ); + let _ = progress_handle.add_error(error_msg.clone()); + return Err(error_msg); + } } if let Err(e) = uploads_tracking::record_upload( user_id, - file_path, + file_path.as_path(), in_app_path, &upload_resp.asset_id, upload_request_id, diff --git a/src/commands/upload/mod.rs b/src/commands/upload/mod.rs index 74363fe..ebf8683 100644 --- a/src/commands/upload/mod.rs +++ b/src/commands/upload/mod.rs @@ -2,6 +2,6 @@ mod dry_run; mod main; mod utils; -pub use main::{run, UploadArgs}; +pub use main::{run, upload_file_to_presigned, UploadArgs}; diff --git a/src/tellers_api/openapi.tellers_public_api.yaml b/src/tellers_api/openapi.tellers_public_api.yaml index b2dec89..56eea06 100644 --- a/src/tellers_api/openapi.tellers_public_api.yaml +++ b/src/tellers_api/openapi.tellers_public_api.yaml @@ -174,6 +174,56 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /users/assets/generate_description: + post: + tags: + - accepts-api-key + summary: Generate Description + operationId: generate_description_users_assets_generate_description_post + parameters: + - name: asset_id + in: query + required: true + schema: + type: string + title: Asset Id + - name: x-api-key + in: header + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: X-Api-Key + - name: authorization + in: header + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Authorization + requestBody: + required: true + content: + application/json: + schema: + type: array + items: + type: string + title: Entity Ids + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /users/entity/list: get: tags: From 38e60fefeaa68888dc99551d2fbc3d8aba0699e3 Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Wed, 18 Feb 2026 20:32:24 +0100 Subject: [PATCH 2/2] Fix arguments --- src/commands/asset/preprocess.rs | 2 +- src/commands/entity/create.rs | 29 ++-- src/commands/upload/main.rs | 224 +++---------------------------- 3 files changed, 26 insertions(+), 229 deletions(-) diff --git a/src/commands/asset/preprocess.rs b/src/commands/asset/preprocess.rs index 541ddc4..7729508 100644 --- a/src/commands/asset/preprocess.rs +++ b/src/commands/asset/preprocess.rs @@ -30,7 +30,7 @@ pub fn run(args: PreprocessArgs) -> Result<(), String> { let assets: Vec = args .ids .into_iter() - .map(|asset_id| AssetUploadResponse::new(String::new(), asset_id)) + .map(|asset_id| AssetUploadResponse::new(String::new(), String::new(), asset_id)) .collect(); let mut preproc_req = ProcessAssetsRequest::new( diff --git a/src/commands/entity/create.rs b/src/commands/entity/create.rs index 7b56a2a..8b866a8 100644 --- a/src/commands/entity/create.rs +++ b/src/commands/entity/create.rs @@ -109,12 +109,13 @@ pub fn run(args: CreateArgs) -> Result<(), String> { if let Some(asset_id) = asset_id { output::info(format!("Associating asset {} with entity {}", asset_id, entity_id)); - let asset = AssetUploadResponse::new(String::new(), asset_id.clone()); + let asset = AssetUploadResponse::new( + String::new(), + String::new(), + asset_id.clone(), + ); - let process_req = ProcessEntityRequest { - entity_id: entity_id.clone(), - assets: vec![asset], - }; + let process_req = ProcessEntityRequest::new(entity_id.clone(), vec![asset]); let process_resp = api::process_entity_users_entity_preprocess_post( &cfg, @@ -206,25 +207,11 @@ fn upload_file_and_get_asset_id( } } - const MULTIPART_THRESHOLD_BYTES: u64 = 10 * 1024 * 1024; - const MULTIPART_MIN_PART_SIZE_BYTES: i32 = 5 * 1024 * 1024; - const MULTIPART_MAX_PARTS: u64 = 1000; - - fn multipart_part_size_for(size: u64) -> i32 { - let min_size = MULTIPART_MIN_PART_SIZE_BYTES as u64; - let size_for_1000 = (size + MULTIPART_MAX_PARTS - 1) / MULTIPART_MAX_PARTS; - (min_size.max(size_for_1000) as i32).max(MULTIPART_MIN_PART_SIZE_BYTES) - } - - let mut upload_req = AssetUploadRequest::new( - i64::try_from(content_length).unwrap_or(i64::MAX), + let upload_req = AssetUploadRequest::new( + i32::try_from(content_length).unwrap_or(i32::MAX), upload_id.clone(), source_info, ); - if content_length >= MULTIPART_THRESHOLD_BYTES { - upload_req.multipart = Some(true); - upload_req.multipart_part_size = Some(multipart_part_size_for(content_length)); - } output::info(format!("Requesting presigned URL for {}", file_path.display())); diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 4da3786..e85eaa6 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -29,8 +29,7 @@ use tokio::sync::mpsc as tokio_mpsc; use tellers_api_client::apis::accepts_api_key_api as api; use tellers_api_client::apis::configuration::Configuration; use tellers_api_client::models::{ - AssetUploadRequest, AssetUploadResponse, MultipartAbortRequest, MultipartCompleteRequest, - MultipartPart, ProcessAssetsRequest, SourceFileInfo, + AssetUploadRequest, AssetUploadResponse, ProcessAssetsRequest, SourceFileInfo, }; #[derive(Args, Debug)] @@ -71,20 +70,6 @@ pub struct UploadArgs { pub disable_description_generation: bool, } -/// Use multipart upload for files at least this size (10 MiB). -const MULTIPART_THRESHOLD_BYTES: u64 = 10 * 1024 * 1024; -/// S3 minimum part size (5 MiB); last part can be smaller. -const MULTIPART_MIN_PART_SIZE_BYTES: i32 = 5 * 1024 * 1024; -/// S3 maximum number of parts per multipart upload. -const MULTIPART_MAX_PARTS: u64 = 1000; - -/// Part size so that part_count = ceil(content_length / part_size) <= MULTIPART_MAX_PARTS. -fn multipart_part_size_for(content_length: u64) -> i32 { - let min_size = MULTIPART_MIN_PART_SIZE_BYTES as u64; - let size_for_1000 = (content_length + MULTIPART_MAX_PARTS - 1) / MULTIPART_MAX_PARTS; - (min_size.max(size_for_1000) as i32).max(MULTIPART_MIN_PART_SIZE_BYTES) -} - struct FileToUpload { upload_path: PathBuf, original_path: PathBuf, @@ -340,15 +325,11 @@ pub fn run(args: UploadArgs) -> Result<(), String> { } } - let mut req = AssetUploadRequest::new( - i64::try_from(content_length).unwrap_or(i64::MAX), + let req = AssetUploadRequest::new( + i32::try_from(content_length).unwrap_or(i32::MAX), upload_id, source_info, ); - if content_length >= MULTIPART_THRESHOLD_BYTES { - req.multipart = Some(true); - req.multipart_part_size = Some(multipart_part_size_for(content_length)); - } requests.push(req); } @@ -756,15 +737,11 @@ fn build_single_upload_request( source_info.umid = Some(Some(first_umid.clone())); } } - let mut req = AssetUploadRequest::new( - i64::try_from(content_length).unwrap_or(i64::MAX), + let req = AssetUploadRequest::new( + i32::try_from(content_length).unwrap_or(i32::MAX), upload_id.clone(), source_info, ); - if content_length >= MULTIPART_THRESHOLD_BYTES { - req.multipart = Some(true); - req.multipart_part_size = Some(multipart_part_size_for(content_length)); - } Ok((req, upload_id, file_in_app_path)) } @@ -933,171 +910,22 @@ async fn upload_to_presigned_urls( Ok(()) } -fn single_put_url(resp: &AssetUploadResponse) -> Option { - resp.presigned_put_url - .as_ref() - .and_then(|o| o.as_ref()) - .cloned() -} - -fn is_multipart_response(resp: &AssetUploadResponse) -> bool { - resp.presigned_put_urls - .as_ref() - .and_then(|o| o.as_ref()) - .map(|u| !u.is_empty()) - == Some(true) -} - -async fn upload_multipart_then_complete( - file_path: &PathBuf, - upload_resp: &AssetUploadResponse, - http: &reqwest::Client, - cfg: &Configuration, - api_key: &str, - bearer_opt: Option<&str>, -) -> Result<(), String> { - let urls = upload_resp - .presigned_put_urls - .as_ref() - .and_then(|o| o.as_ref()) - .ok_or_else(|| "multipart response missing presigned_put_urls".to_string())?; - let multipart_upload_id = upload_resp - .multipart_upload_id - .as_ref() - .and_then(|o| o.as_ref()) - .cloned() - .ok_or_else(|| "multipart response missing multipart_upload_id".to_string())?; - let part_size = upload_resp - .multipart_part_size - .as_ref() - .and_then(|o| o.as_ref()) - .copied() - .ok_or_else(|| "multipart response missing multipart_part_size".to_string())? - as u64; - let part_count = urls.len(); - - let mut f = File::open(file_path) - .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; - let content_type = mime_guess::from_path(file_path) - .first_or_text_plain() - .essence_str() - .to_string(); - - let mut parts: Vec = Vec::with_capacity(part_count); - for (i, url) in urls.iter().enumerate() { - let part_num = (i + 1) as i32; - let read_size = if i + 1 < part_count { - part_size as usize - } else { - // Last part: read remainder - let total = std::fs::metadata(file_path) - .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? - .len(); - let offset = (part_count - 1) as u64 * part_size; - (total.saturating_sub(offset)) as usize - }; - let mut buf = vec![0u8; read_size]; - let n = f - .read(&mut buf) - .map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?; - buf.truncate(n); - - let put_res = http - .put(url.as_str()) - .header(reqwest::header::CONTENT_LENGTH, buf.len()) - .header(reqwest::header::CONTENT_TYPE, &content_type) - .body(buf) - .send() - .await - .map_err(|e| format!("multipart part {} upload failed: {}", part_num, e))?; - - if !put_res.status().is_success() { - let status = put_res.status(); - let body = put_res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - let _ = api::abort_multipart_asset_upload_users_assets_multipart_abort_post( - cfg, - MultipartAbortRequest::new( - upload_resp.asset_id.clone(), - multipart_upload_id.clone(), - ), - Some(api_key), - bearer_opt, - ) - .await; - return Err(format!( - "Multipart part {} failed for {}: HTTP {} - {}", - part_num, - file_path.display(), - status, - body - )); - } - - let etag = put_res - .headers() - .get(reqwest::header::ETAG) - .and_then(|v| v.to_str().ok()) - .map(|s| s.trim_matches('"').to_string()) - .ok_or_else(|| { - format!( - "multipart part {} for {}: response missing ETag", - part_num, - file_path.display() - ) - })?; - parts.push(MultipartPart::new(part_num, etag)); - } - - let complete_req = MultipartCompleteRequest::new( - upload_resp.asset_id.clone(), - multipart_upload_id, - parts, - ); - api::complete_multipart_asset_upload_users_assets_multipart_complete_post( - cfg, - complete_req, - Some(api_key), - bearer_opt, - ) - .await - .map_err(|e| { - let mut m = format!("multipart complete failed for {}: {}", file_path.display(), e); - if let tellers_api_client::apis::Error::ResponseError(ref r) = e { - m.push_str(&format!("; body: {}", r.content)); - } - m - })?; - Ok(()) +fn single_put_url(resp: &AssetUploadResponse) -> String { + resp.presigned_put_url.clone() } pub async fn upload_file_to_presigned( file_path: &PathBuf, upload_resp: &AssetUploadResponse, http: &reqwest::Client, - cfg: &Configuration, - api_key: &str, - bearer_opt: Option<&str>, + _cfg: &Configuration, + _api_key: &str, + _bearer_opt: Option<&str>, ) -> Result<(), String> { - if is_multipart_response(upload_resp) { - return upload_multipart_then_complete( - file_path, - upload_resp, - http, - cfg, - api_key, - bearer_opt, - ) - .await; - } - let total_bytes = std::fs::metadata(file_path) .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? .len(); - let upload_url: String = single_put_url(upload_resp) - .ok_or_else(|| "response missing presigned_put_url".to_string())?; + let upload_url = single_put_url(upload_resp); let mut f = File::open(file_path) .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; @@ -1154,31 +982,14 @@ async fn upload_single_file( http: &reqwest::Client, progress_handle: &ProgressHandle, total_bytes: u64, - cfg: &Configuration, - api_key: &str, - bearer_opt: Option<&str>, + _cfg: &Configuration, + _api_key: &str, + _bearer_opt: Option<&str>, ) -> Result<(), String> { - if is_multipart_response(upload_resp) { - let result = upload_multipart_then_complete( - file_path, - upload_resp, - http, - cfg, - api_key, - bearer_opt, - ) - .await; - let _ = progress_handle.update_task(task_id, total_bytes); - if let Err(ref e) = result { - let _ = progress_handle.add_error(e.clone()); - return result; - } - } else { - let upload_url: String = single_put_url(upload_resp) - .ok_or_else(|| "response missing presigned_put_url".to_string())?; + let upload_url = single_put_url(upload_resp); - let mut f = File::open(file_path) - .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; + let mut f = File::open(file_path) + .map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?; let mut buf = Vec::with_capacity(total_bytes as usize); const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks @@ -1228,7 +1039,6 @@ async fn upload_single_file( let _ = progress_handle.add_error(error_msg.clone()); return Err(error_msg); } - } if let Err(e) = uploads_tracking::record_upload( user_id,