Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ tellers upload /path/to/media_folder
- `--ext <EXT>` — Filter files by extension (e.g., `--ext mp4 --ext mov`)
- `--in-app-path <PATH>` — Set the in-app path for uploaded files

Files ≥ 10 MiB use **multipart S3 upload** (presigned part URLs, then complete); smaller files use a single presigned PUT.

## Implementation Notes

- Argument parsing via Clap 4.x. See `clap` docs: [docs.rs/clap](https://docs.rs/clap/latest/clap/)
Expand Down
8 changes: 1 addition & 7 deletions src/commands/asset/preprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ pub fn run(args: PreprocessArgs) -> Result<(), String> {
let assets: Vec<AssetUploadResponse> = args
.ids
.into_iter()
.map(|asset_id| {
AssetUploadResponse::new(
String::new(),
String::new(),
asset_id,
)
})
.map(|asset_id| AssetUploadResponse::new(String::new(), String::new(), asset_id))
.collect();

let mut preproc_req = ProcessAssetsRequest::new(
Expand Down
62 changes: 12 additions & 50 deletions src/commands/entity/create.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use clap::Args;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use tellers_api_client::apis::accepts_api_key_api as api;
use tellers_api_client::models::{
Expand Down Expand Up @@ -112,8 +110,8 @@ pub fn run(args: CreateArgs) -> Result<(), String> {
output::info(format!("Associating asset {} with entity {}", asset_id, entity_id));

let asset = AssetUploadResponse::new(
"".to_string(),
"".to_string(),
String::new(),
String::new(),
asset_id.clone(),
);

Expand Down Expand Up @@ -248,64 +246,28 @@ fn upload_file_and_get_asset_id(
}

let upload_resp = responses.remove(0);
let upload_url = upload_resp.presigned_put_url.clone();
let asset_id = upload_resp.asset_id.clone();

output::info(format!("Uploading file to presigned URL..."));

let mut f = File::open(file_path)
.map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?;
let mut buf = Vec::with_capacity(content_length as usize);

const CHUNK_SIZE: usize = 1024 * 1024;
let mut chunk = vec![0u8; CHUNK_SIZE.min(content_length as usize)];

loop {
let n = f
.read(&mut chunk)
.map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
}

let content_type = mime_guess::from_path(file_path)
.first_or_text_plain()
.essence_str()
.to_string();

let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| format!("failed to build http client: {}", e))?;

let put_res = http
.put(upload_url)
.header(reqwest::header::CONTENT_LENGTH, content_length)
.header(reqwest::header::CONTENT_TYPE, &content_type)
.body(buf)
.send()
.await
.map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?;

if !put_res.status().is_success() {
let status = put_res.status();
let body = put_res
.text()
.await
.unwrap_or_else(|_| "<failed to read error body>".to_string());
return Err(format!(
"Upload failed for {}: HTTP {} - {}",
file_path.display(),
status,
body
));
}
crate::commands::upload::upload_file_to_presigned(
file_path,
&upload_resp,
&http,
cfg,
api_key,
bearer_header,
)
.await?;

if let Err(e) = uploads_tracking::record_upload(
user_id,
file_path,
file_path.as_path(),
&in_app_path,
&asset_id,
&upload_request_id,
Expand Down
128 changes: 78 additions & 50 deletions src/commands/upload/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ pub fn run(args: UploadArgs) -> Result<(), String> {
&user_id,
args.parallel_uploads,
&progress_handle,
&cfg,
&api_key,
bearer_header.as_deref(),
)
.await;

Expand Down Expand Up @@ -627,6 +630,9 @@ fn run_two_queue_pipeline(
&file_info.upload_path,
&upload_resp,
http.as_ref(),
&cfg,
&api_key,
bearer_header,
)
.await
{
Expand All @@ -637,7 +643,7 @@ fn run_two_queue_pipeline(

if let Err(e) = uploads_tracking::record_upload(
&user_id,
&file_info.upload_path,
file_info.upload_path.as_path(),
&in_app_path_str,
&upload_resp.asset_id,
&upload_request_id,
Expand Down Expand Up @@ -818,6 +824,9 @@ async fn upload_to_presigned_urls(
user_id: &str,
max_concurrent: usize,
progress_handle: &ProgressHandle,
cfg: &Configuration,
api_key: &str,
bearer_opt: Option<&str>,
) -> Result<(), String> {
let http = Arc::new(
reqwest::Client::builder()
Expand All @@ -828,6 +837,9 @@ async fn upload_to_presigned_urls(

let semaphore = Arc::new(Semaphore::new(max_concurrent));
let mut upload_tasks = Vec::new();
let cfg = cfg.clone();
let api_key = api_key.to_string();
let bearer_opt = bearer_opt.map(String::from);

for (i, file_path) in files.iter().enumerate() {
let file_path = file_path.clone();
Expand All @@ -843,6 +855,9 @@ async fn upload_to_presigned_urls(
let user_id = user_id.to_string();
let upload_request_id = upload_request_id.to_string();
let task_id = i;
let cfg_clone = cfg.clone();
let api_key_clone = api_key.clone();
let bearer_clone = bearer_opt.clone();

let file_name = file_path
.file_name()
Expand Down Expand Up @@ -872,6 +887,9 @@ async fn upload_to_presigned_urls(
&http_clone,
&progress_handle_clone,
file_size,
&cfg_clone,
&api_key_clone,
bearer_clone.as_deref(),
)
.await;

Expand All @@ -892,15 +910,22 @@ async fn upload_to_presigned_urls(
Ok(())
}

async fn upload_file_to_presigned(
fn single_put_url(resp: &AssetUploadResponse) -> String {
resp.presigned_put_url.clone()
}

pub async fn upload_file_to_presigned(
file_path: &PathBuf,
upload_resp: &AssetUploadResponse,
http: &reqwest::Client,
_cfg: &Configuration,
_api_key: &str,
_bearer_opt: Option<&str>,
) -> Result<(), String> {
let total_bytes = std::fs::metadata(file_path)
.map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))?
.len();
let upload_url = upload_resp.presigned_put_url.clone();
let upload_url = single_put_url(upload_resp);

let mut f = File::open(file_path)
.map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?;
Expand All @@ -922,7 +947,7 @@ async fn upload_file_to_presigned(
.to_string();

let put_res = http
.put(upload_url)
.put(upload_url.as_str())
.header(reqwest::header::CONTENT_LENGTH, total_bytes)
.header(reqwest::header::CONTENT_TYPE, &content_type)
.body(buf)
Expand Down Expand Up @@ -957,64 +982,67 @@ async fn upload_single_file(
http: &reqwest::Client,
progress_handle: &ProgressHandle,
total_bytes: u64,
_cfg: &Configuration,
_api_key: &str,
_bearer_opt: Option<&str>,
) -> Result<(), String> {
let upload_url = upload_resp.presigned_put_url.clone();
let upload_url = single_put_url(upload_resp);

let mut f = File::open(file_path)
.map_err(|e| format!("failed to open {}: {}", file_path.display(), e))?;
let mut buf = Vec::with_capacity(total_bytes as usize);

const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
let mut uploaded = 0u64;
let mut chunk = vec![0u8; CHUNK_SIZE.min(total_bytes as usize)];

loop {
let n = f
.read(&mut chunk)
.map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?;
if n == 0 {
break;
let mut buf = Vec::with_capacity(total_bytes as usize);

const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
let mut uploaded = 0u64;
let mut chunk = vec![0u8; CHUNK_SIZE.min(total_bytes as usize)];

loop {
let n = f
.read(&mut chunk)
.map_err(|e| format!("failed to read {}: {}", file_path.display(), e))?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
uploaded += n as u64;
let _ = progress_handle.update_task(task_id, uploaded);
}
buf.extend_from_slice(&chunk[..n]);
uploaded += n as u64;
let _ = progress_handle.update_task(task_id, uploaded);
}

let content_type = mime_guess::from_path(file_path)
.first_or_text_plain()
.essence_str()
.to_string();
let content_type = mime_guess::from_path(file_path)
.first_or_text_plain()
.essence_str()
.to_string();

let put_res = http
.put(upload_url)
.header(reqwest::header::CONTENT_LENGTH, total_bytes)
.header(reqwest::header::CONTENT_TYPE, &content_type)
.body(buf)
.send()
.await
.map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?;
let put_res = http
.put(upload_url.as_str())
.header(reqwest::header::CONTENT_LENGTH, total_bytes)
.header(reqwest::header::CONTENT_TYPE, &content_type)
.body(buf)
.send()
.await
.map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?;

let _ = progress_handle.update_task(task_id, total_bytes);
let _ = progress_handle.update_task(task_id, total_bytes);

if !put_res.status().is_success() {
let status = put_res.status();
let body = put_res
.text()
.await
.unwrap_or_else(|_| "<failed to read error body>".to_string());
let error_msg = format!(
"Upload failed for {}: HTTP {} - {}",
file_path.display(),
status,
body
);
let _ = progress_handle.add_error(error_msg.clone());
return Err(error_msg);
}
if !put_res.status().is_success() {
let status = put_res.status();
let body = put_res
.text()
.await
.unwrap_or_else(|_| "<failed to read error body>".to_string());
let error_msg = format!(
"Upload failed for {}: HTTP {} - {}",
file_path.display(),
status,
body
);
let _ = progress_handle.add_error(error_msg.clone());
return Err(error_msg);
}

if let Err(e) = uploads_tracking::record_upload(
user_id,
file_path,
file_path.as_path(),
in_app_path,
&upload_resp.asset_id,
upload_request_id,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod dry_run;
mod main;
mod utils;

pub use main::{run, UploadArgs};
pub use main::{run, upload_file_to_presigned, UploadArgs};


50 changes: 50 additions & 0 deletions src/tellers_api/openapi.tellers_public_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,56 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/users/assets/generate_description:
post:
tags:
- accepts-api-key
summary: Generate Description
operationId: generate_description_users_assets_generate_description_post
parameters:
- name: asset_id
in: query
required: true
schema:
type: string
title: Asset Id
- name: x-api-key
in: header
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: X-Api-Key
- name: authorization
in: header
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Authorization
requestBody:
required: true
content:
application/json:
schema:
type: array
items:
type: string
title: Entity Ids
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/users/entity/list:
get:
tags:
Expand Down