Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions src/sync/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ fn convex_site_url() -> String {
.unwrap_or_else(|_| "https://ceaseless-shepherd-756.convex.site".to_string())
}

/// Optional staging Convex URL for dual-upload mirroring.
fn staging_convex_site_url() -> Option<String> {
std::env::var("THREADER_STAGING_CONVEX_SITE_URL").ok()
}

/// Background worker that processes the upload queue.
pub struct BackgroundUploader {
queue: UploadQueue,
Expand Down Expand Up @@ -80,12 +85,22 @@ impl BackgroundUploader {
continue;
}

match self.upload(&entry, &token).await {
let prod_url = convex_site_url();
match self.upload(&entry, &token, &prod_url).await {
Ok(()) => {
debug!(
"Uploaded {:?} for session {}",
entry.action, entry.session_id
);
// Best-effort staging mirror
if let Some(ref staging_url) = staging_convex_site_url() {
if let Err(e) = self.upload(&entry, &token, staging_url).await {
warn!(
"Staging mirror failed for session {} (non-blocking): {}",
entry.session_id, e
);
}
}
self.queue.remove(&path)?;
}
Err(e) => {
Expand Down Expand Up @@ -124,23 +139,23 @@ impl BackgroundUploader {
}

/// Upload a single queue entry to the Convex HTTP endpoint.
async fn upload(&self, entry: &QueueEntry, token: &str) -> Result<()> {
async fn upload(&self, entry: &QueueEntry, token: &str, base_url: &str) -> Result<()> {
match entry.action {
QueueAction::Create => self.upload_create(entry, token).await,
QueueAction::Create => self.upload_create(entry, token, base_url).await,
QueueAction::Append => {
// Ensure session exists on server (idempotent create)
self.upload_create(entry, token).await?;
self.upload_append(entry, token).await
self.upload_create(entry, token, base_url).await?;
self.upload_append(entry, token, base_url).await
}
QueueAction::Finalize => {
// Ensure session exists on server (idempotent create)
self.upload_create(entry, token).await?;
self.upload_finalize(entry, token).await
self.upload_create(entry, token, base_url).await?;
self.upload_finalize(entry, token, base_url).await
}
}
}

async fn upload_create(&self, entry: &QueueEntry, token: &str) -> Result<()> {
async fn upload_create(&self, entry: &QueueEntry, token: &str, base_url: &str) -> Result<()> {
let meta = self.storage.read_meta(&entry.session_id)?;

// Build body, omitting None fields (Convex rejects null for optional string fields)
Expand All @@ -161,7 +176,7 @@ impl BackgroundUploader {
body["repo"] = serde_json::json!(repo);
}

let url = format!("{}/api/sessions", convex_site_url());
let url = format!("{}/api/sessions", base_url);
info!("Creating session {} at {}", entry.session_id, url);

let resp = self
Expand All @@ -182,7 +197,7 @@ impl BackgroundUploader {
Ok(())
}

async fn upload_append(&self, entry: &QueueEntry, token: &str) -> Result<()> {
async fn upload_append(&self, entry: &QueueEntry, token: &str, base_url: &str) -> Result<()> {
// Read from the source transcript (Claude Code's file), not the local copy.
// The local copy can diverge due to concurrent poller + hook appends.
let meta = self.storage.read_meta(&entry.session_id)?;
Expand All @@ -204,7 +219,7 @@ impl BackgroundUploader {

// Process lines through image processor — rewrite image blocks with URLs
let image_processor =
ImageProcessor::new(self.client.clone(), convex_site_url());
ImageProcessor::new(self.client.clone(), base_url.to_string());
let mut processed_lines: Vec<String> = Vec::with_capacity(lines_to_send.len());
for line in &lines_to_send {
if line_has_images(line) {
Expand Down Expand Up @@ -244,7 +259,7 @@ impl BackgroundUploader {
.client
.post(format!(
"{}/api/sessions/{}/append",
convex_site_url(), entry.session_id
base_url, entry.session_id
))
.bearer_auth(token)
.json(&body)
Expand All @@ -264,7 +279,7 @@ impl BackgroundUploader {
Ok(())
}

async fn upload_finalize(&self, entry: &QueueEntry, token: &str) -> Result<()> {
async fn upload_finalize(&self, entry: &QueueEntry, token: &str, base_url: &str) -> Result<()> {
let meta = self.storage.read_meta(&entry.session_id)?;

let mut body = serde_json::json!({});
Expand Down Expand Up @@ -294,7 +309,7 @@ impl BackgroundUploader {
.client
.post(format!(
"{}/api/sessions/{}/finalize",
convex_site_url(), entry.session_id
base_url, entry.session_id
))
.bearer_auth(token)
.json(&body)
Expand Down