From 6f33b142442a16f9ceca10b1ff57510112096d02 Mon Sep 17 00:00:00 2001 From: TimRl Date: Mon, 1 Jun 2026 12:33:26 -0600 Subject: [PATCH 1/2] Fixed upload to prevent timeout --- src/git/GitService.ts | 441 ++++++++++++++---- src/git/networkRetry.ts | 361 ++++++++++++++ .../unit/git.upload-stall-timeout.test.ts | 180 +++++++ src/test/suite/unit/network-retry.test.ts | 199 ++++++++ src/types/lfs.ts | 38 ++ 5 files changed, 1129 insertions(+), 90 deletions(-) create mode 100644 src/git/networkRetry.ts create mode 100644 src/test/suite/unit/git.upload-stall-timeout.test.ts create mode 100644 src/test/suite/unit/network-retry.test.ts diff --git a/src/git/GitService.ts b/src/git/GitService.ts index 186647a..c67d080 100644 --- a/src/git/GitService.ts +++ b/src/git/GitService.ts @@ -11,7 +11,14 @@ import { LFSBatchRequest, LFSBatchResponse, LfsPointerInfo, + LfsUploadEvents, } from "../types/lfs"; +import { + retryWithBackoff, + errorWithCause, + getNetworkErrorDetails, + parseRetryAfterMs, +} from "./networkRetry"; /** Retry and batching constants for LFS uploads */ const LFS_MAX_RETRIES = 3; @@ -25,6 +32,28 @@ const LFS_FETCH_TIMEOUT_MS = 60_000; /** Timeout for lightweight health-check requests (10 s) */ const HEALTH_CHECK_TIMEOUT_MS = 10_000; +/** + * Inactivity window for streamed LFS uploads. The upload is aborted only when + * NO progress is made for this long — there is deliberately no overall cap, so + * a large file on a very slow connection (e.g. near-dial-up speeds) can upload + * for as long as it needs, as long as bytes keep flowing. + */ +const LFS_UPLOAD_STALL_TIMEOUT_MS = 120_000; +/** + * Soft "no progress" warning threshold. Surfaced to the UI as + * "connection interrupted, waiting to resume…" well before the hard abort, so + * users get quick feedback when the connection drops. Chosen comfortably above + * the worst-case per-chunk time on very slow links to avoid false positives + * (and it self-clears the moment the next chunk lands anyway). + */ +const LFS_UPLOAD_STALL_WARN_MS = 30_000; +/** + * Chunk size used when streaming an upload body (64 KiB). Small enough that even + * a near-dial-up link delivers a chunk every few seconds, keeping the stall + * detector responsive without false alarms. + */ +const LFS_UPLOAD_CHUNK_SIZE = 64 * 1024; + /** * Wrapper around `fetch` that aborts after `timeoutMs`. * If a caller-provided `signal` is already aborted, throws immediately. @@ -52,69 +81,164 @@ function fetchWithTimeout( } /** - * Determine whether an error is retryable (server-side / transient network errors). + * PUT a byte buffer using a streamed request body with a *stall* (inactivity) + * timeout rather than a fixed overall deadline. + * + * Why: a fixed total timeout will kill a perfectly healthy upload on a slow + * link. Instead we reset the timer every time the socket accepts another chunk, + * so an upload can take arbitrarily long as long as it keeps making progress; we + * only abort when nothing has moved for `stallTimeoutMs`. This also fails fast + * on a genuinely dead socket instead of hanging until some huge deadline. + * + * A `Content-Length` header is set explicitly so undici sends a fixed-length + * body instead of `Transfer-Encoding: chunked` — object stores (S3/GCS/Azure) + * presigned PUT endpoints typically reject chunked uploads. */ -function isRetryableError(error: unknown): boolean { - if (error && typeof error === "object" && "status" in error) { - const status = (error as { status: number }).status; - if (status >= 500) { - return true; +export function fetchUploadWithStallTimeout( + url: string, + body: Uint8Array, + init: { + headers: Record; + stallTimeoutMs?: number; + stallWarnMs?: number; + chunkSize?: number; + signal?: AbortSignal; + /** + * Notified when the upload stops making progress (`true`) and again when + * progress resumes (`false`), so callers can surface a "waiting for + * connection" hint long before the hard abort. + */ + onStallStateChange?: (stalled: boolean) => void; + /** + * Notified as bytes are streamed to the socket (`bytesSent` of + * `totalBytes`), enabling a live progress meter. + */ + onProgress?: (bytesSent: number, totalBytes: number) => void; + }, +): Promise { + const { + headers, + stallTimeoutMs = LFS_UPLOAD_STALL_TIMEOUT_MS, + stallWarnMs = LFS_UPLOAD_STALL_WARN_MS, + chunkSize = LFS_UPLOAD_CHUNK_SIZE, + signal: externalSignal, + onStallStateChange, + onProgress, + } = init; + + if (externalSignal?.aborted) { + return Promise.reject(externalSignal.reason ?? new DOMException("Aborted", "AbortError")); + } + + const controller = new AbortController(); + let stallTimer: ReturnType | undefined; + let warnTimer: ReturnType | undefined; + let stalled = false; + + const setStalled = (next: boolean) => { + if (stalled === next) { + return; } - // Known HTTP status below 500 (e.g. 4xx client errors) — not retryable. - // Return early so message-based heuristics below don't produce false positives - // (e.g. "limit: 500MB" matching /5\d{2}/, or "authentication timeout" matching /timeout/). - if (typeof status === "number" && status > 0) { - return false; + stalled = next; + onStallStateChange?.(next); + }; + + const armWarnTimer = () => { + if (warnTimer) { + clearTimeout(warnTimer); } - } - const msg = error instanceof Error ? error.message : String(error); - return ( - /ECONNRESET|ETIMEDOUT|ECONNREFUSED|ENOTFOUND|timeout|abort|socket hang up/i.test(msg) || - /5\d{2}/i.test(msg) - ); -} + if (stallWarnMs > 0 && stallWarnMs < stallTimeoutMs) { + warnTimer = setTimeout(() => setStalled(true), stallWarnMs); + } + }; -/** - * Retry a function with exponential back-off (delay = base * 3^attempt). - * Only retries when `isRetryableError` returns true. - */ -async function retryWithBackoff( - fn: () => Promise, - label: string, - maxRetries: number = LFS_MAX_RETRIES, - baseDelayMs: number = LFS_RETRY_BASE_DELAY_MS, - signal?: AbortSignal, -): Promise { - let hadFailure = false; - for (let attempt = 0; ; attempt++) { - if (signal?.aborted) { - throw signal.reason ?? new DOMException("Aborted", "AbortError"); + const armStallTimer = () => { + if (stallTimer) { + clearTimeout(stallTimer); } - try { - const result = await fn(); - if (hadFailure) { - console.log( - `[LFS Retry] ${label} succeeded on attempt ${attempt + 1} after previous failure(s)` - ); - } - return result; - } catch (error) { - hadFailure = true; - if (signal?.aborted) { - throw signal.reason ?? new DOMException("Aborted", "AbortError"); - } - if (attempt >= maxRetries || !isRetryableError(error)) { - throw error; - } - const delay = baseDelayMs * Math.pow(3, attempt); // 1 s, 3 s, 9 s - console.log( - `[LFS Retry] ${label} failed (attempt ${attempt + 1}/${maxRetries + 1}), retrying in ${delay}ms: ${error instanceof Error ? error.message : error}`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); + stallTimer = setTimeout( + () => + controller.abort( + new DOMException( + `Upload stalled — no progress for ${Math.round(stallTimeoutMs / 1000)}s`, + "TimeoutError", + ), + ), + stallTimeoutMs, + ); + }; + + const markProgress = () => { + // Real progress → clear any "stalled" state and reset both timers. + setStalled(false); + armStallTimer(); + armWarnTimer(); + }; + + const clearTimers = () => { + if (stallTimer) { + clearTimeout(stallTimer); + stallTimer = undefined; } - } + if (warnTimer) { + clearTimeout(warnTimer); + warnTimer = undefined; + } + }; + + const onExternalAbort = () => controller.abort(externalSignal!.reason); + externalSignal?.addEventListener("abort", onExternalAbort, { once: true }); + + let offset = 0; + const bodyStream = new ReadableStream({ + start() { + // Covers connection setup / time-to-first-chunk. + armStallTimer(); + armWarnTimer(); + }, + pull(streamController) { + if (offset >= body.length) { + streamController.close(); + return; + } + const end = Math.min(offset + chunkSize, body.length); + // subarray is a view (no copy) — keeps memory flat for large files. + streamController.enqueue(body.subarray(offset, end)); + offset = end; + // A pull means undici accepted the previous chunk → real progress. + markProgress(); + onProgress?.(offset, body.length); + }, + cancel() { + clearTimers(); + }, + }); + + const finalHeaders: Record = { + ...headers, + "Content-Length": String(body.length), + }; + delete finalHeaders["Transfer-Encoding"]; + + return fetch(url, { + method: "PUT", + headers: finalHeaders, + body: bodyStream, + signal: controller.signal, + // Required by the fetch spec when streaming a request body. + duplex: "half", + } as RequestInit & { duplex: "half"; }).finally(() => { + clearTimers(); + externalSignal?.removeEventListener("abort", onExternalAbort); + }); } +/** Retry options shared by all LFS network calls. */ +const LFS_RETRY_OPTIONS = { + maxRetries: LFS_MAX_RETRIES, + baseDelayMs: LFS_RETRY_BASE_DELAY_MS, +} as const; + /** * Run an array of async tasks with a concurrency limit. * Tasks are started in order; at most `concurrency` run at the same time. @@ -273,6 +397,7 @@ async function uploadBlobsToLFSBucket( }: UploadBlobsOptions & { recovery?: { dir: string; filepaths: string[]; }; }, contents: Uint8Array[], onFileStatus?: (status: LfsFileStatus) => void, + events?: LfsUploadEvents, ): Promise { debugLog("[LFS Patch] Using patched uploadBlobs function"); debugLog("[LFS Patch] URL:", url); @@ -512,11 +637,15 @@ async function uploadBlobsToLFSBucket( `LFS request failed with status ${lfsInfoRes.status}: ${lfsInfoRes.statusText}\nResponse: ${errorText}` ); (err as any).status = lfsInfoRes.status; + const retryAfterMs = parseRetryAfterMs(lfsInfoRes.headers.get("retry-after")); + if (retryAfterMs !== undefined) { + (err as any).retryAfterMs = retryAfterMs; + } throw err; } return (await lfsInfoRes.json()) as unknown; - }, "LFS batch API"); + }, "LFS batch API", LFS_RETRY_OPTIONS); debugLog("[LFS Patch] Server response:", lfsInfoResponseData); @@ -584,12 +713,24 @@ async function uploadBlobsToLFSBucket( // Upload with retry on transient/server errors await retryWithBackoff(async () => { try { - const resp = await fetchWithTimeout(upload.href, { - method: "PUT", + // Stream the body with a stall timeout (no overall cap) so + // slow-but-progressing uploads aren't killed; see + // fetchUploadWithStallTimeout. + const resp = await fetchUploadWithStallTimeout(upload.href, fileBytes, { headers: uploadHeaders, - body: Buffer.from(fileBytes), - keepalive: false, - timeoutMs: 600_000, + onStallStateChange: (stalled) => + events?.onStallStateChange?.({ + index, + label: effectiveFilepaths[index], + stalled, + }), + onProgress: (bytesSent, totalBytes) => + events?.onBytes?.({ + index, + label: effectiveFilepaths[index], + bytesSent, + totalBytes, + }), }); if (!resp.ok) { @@ -601,6 +742,10 @@ async function uploadBlobsToLFSBucket( `Upload failed for ${fileLabel(index)}, HTTP ${resp.status}: ${resp.statusText}\nResponse: ${errorText}` ); (err as any).status = resp.status; + const retryAfterMs = parseRetryAfterMs(resp.headers.get("retry-after")); + if (retryAfterMs !== undefined) { + (err as any).retryAfterMs = retryAfterMs; + } throw err; } @@ -623,36 +768,63 @@ async function uploadBlobsToLFSBucket( }); } - // Rethrow with descriptive message; retryWithBackoff will decide whether to retry + // HTTP errors already carry a `status` (and any Retry-After) — + // re-throw as-is so the retry classifier can act on the status. + if ((fetchError as any).status) { + throw fetchError; + } + + // Wrap transport errors with a descriptive message while + // ALWAYS preserving the original error as `cause`. This keeps + // the underlying undici reason (e.g. UND_ERR_SOCKET) visible to + // both the retry classifier and the user-facing error report, + // instead of collapsing everything to a bare "fetch failed". + const detail = getNetworkErrorDetails(fetchError); if ( fetchError.message?.includes("certificate") || fetchError.message?.includes("SSL") || fetchError.message?.includes("TLS") ) { - throw new Error( - `SSL/Certificate error uploading ${fileLabel(index)} to LFS storage. Original error: ${fetchError.message}` + throw errorWithCause( + `SSL/Certificate error uploading ${fileLabel(index)} to LFS storage: ${detail}`, + fetchError, ); } else if ( fetchError.message?.includes("ECONNREFUSED") || fetchError.message?.includes("ENOTFOUND") ) { - throw new Error( - `Network connection error uploading ${fileLabel(index)} to LFS storage. Original error: ${fetchError.message}` + throw errorWithCause( + `Network connection error uploading ${fileLabel(index)} to LFS storage: ${detail}`, + fetchError, ); - } else if (fetchError.message?.includes("timeout") || fetchError.name === "AbortError") { - throw new Error( - `Upload timeout for ${fileLabel(index)} to LFS storage. Original error: ${fetchError.message}` + } else if ( + fetchError.message?.includes("timeout") || + fetchError.name === "AbortError" || + fetchError.name === "TimeoutError" + ) { + throw errorWithCause( + `Upload timeout for ${fileLabel(index)} to LFS storage: ${detail}`, + fetchError, ); - } else if ((fetchError as any).status) { - // Already has status from our HTTP check above; re-throw as-is - throw fetchError; } else { - throw new Error( - `Network error uploading ${fileLabel(index)} to LFS storage: ${fetchError.message}` + throw errorWithCause( + `Network error uploading ${fileLabel(index)} to LFS storage: ${detail}`, + fetchError, ); } } - }, `LFS PUT ${fileLabel(index)}`); + }, `LFS PUT ${fileLabel(index)}`, { + ...LFS_RETRY_OPTIONS, + onRetry: ({ attempt, maxRetries, delayMs, error }) => + events?.onRetry?.({ + index, + label: effectiveFilepaths[index], + retry: attempt + 1, + maxRetries, + delayMs, + reason: getNetworkErrorDetails(error), + }), + }); // Handle verification if required (also with retry) if (actions.verify) { @@ -678,9 +850,15 @@ async function uploadBlobsToLFSBucket( `Verification failed for ${fileLabel(index)}, HTTP ${verificationResp.status}: ${verificationResp.statusText}` ); (err as any).status = verificationResp.status; + const retryAfterMs = parseRetryAfterMs( + verificationResp.headers.get("retry-after"), + ); + if (retryAfterMs !== undefined) { + (err as any).retryAfterMs = retryAfterMs; + } throw err; } - }, `LFS verify ${fileLabel(index)}`); + }, `LFS verify ${fileLabel(index)}`, LFS_RETRY_OPTIONS); } }); await runWithConcurrency(uploadTasks, LFS_UPLOAD_CONCURRENCY); @@ -3125,6 +3303,93 @@ export class GitService { let skippedBytes = 0; let skippedCount = 0; const skippedLfsFiles: string[] = []; + // Indices of files currently stalled (no upload progress). Used to + // show a "waiting for connection" hint while ≥1 upload is stuck. + const stalledIndices = new Set(); + // Live bytes sent for files still uploading in the current batch, + // keyed by batch index. Added to `processedBytes` (completed files) + // for a live total; an entry is removed the moment its file completes + // so it is never double-counted against `processedBytes`. + const inFlightBytes = new Map(); + // Throttle: only push a new status when the integer percent changes. + let lastEmittedPct = -1; + + const displayedBytes = (): number => { + let inflight = 0; + for (const sent of inFlightBytes.values()) { + inflight += sent; + } + // Cap to total: the last chunk is reported as "sent" slightly + // before the file truly finishes, which could otherwise read >100%. + return Math.min(processedBytes + inflight, totalLfsBytes); + }; + + // Emit the standard "Uploading media (x%)" status, or `override` + // verbatim for transient connection/retry hints. + const emitMediaStatus = (override?: string) => { + if (!this.progressCallback) { + return; + } + if (override) { + this.progressCallback("uploading_lfs", processedBytes, totalLfsBytes, override); + return; + } + const shown = displayedBytes(); + const pct = totalLfsBytes > 0 + ? Math.round((shown / totalLfsBytes) * 100) + : 100; + lastEmittedPct = pct; + const skippedPart = skippedBytes > 0 + ? ` — ${GitService.formatBytes(skippedBytes)} already synced` + : ""; + this.progressCallback( + "uploading_lfs", + shown, + totalLfsBytes, + `Uploading media (${pct}% — ${GitService.formatBytes(shown)} of ${GitService.formatBytes(totalLfsBytes)}${skippedPart})`, + ); + }; + + // Surface live progress, retries, and stalls so the user can see what + // is happening rather than a frozen progress bar. + const uploadEvents: LfsUploadEvents = { + onBytes: ({ index, bytesSent }) => { + inFlightBytes.set(index, bytesSent); + // Don't override an active "waiting" hint with progress text. + if (stalledIndices.size > 0) { + return; + } + const shown = displayedBytes(); + const pct = totalLfsBytes > 0 + ? Math.round((shown / totalLfsBytes) * 100) + : 100; + if (pct !== lastEmittedPct) { + emitMediaStatus(); + } + }, + onRetry: ({ index, delayMs, retry, maxRetries }) => { + // We're actively retrying this file → no longer "waiting", and + // its partial in-flight bytes will be re-sent from scratch. + stalledIndices.delete(index); + inFlightBytes.delete(index); + const secs = Math.max(1, Math.round(delayMs / 1000)); + emitMediaStatus( + `Uploading media — connection issue, retrying in ${secs}s (retry ${retry} of ${maxRetries})`, + ); + }, + onStallStateChange: ({ index, stalled }) => { + if (stalled) { + stalledIndices.add(index); + } else { + stalledIndices.delete(index); + } + if (stalledIndices.size > 0) { + emitMediaStatus("Uploading media — connection interrupted, waiting to resume…"); + } else { + emitMediaStatus(); + } + }, + }; for (let i = 0; i < rawBytesFiles.length; i += LFS_UPLOAD_BATCH_SIZE) { const batch = rawBytesFiles.slice(i, i + LFS_UPLOAD_BATCH_SIZE); @@ -3132,6 +3397,9 @@ export class GitService { this.debugLog( `[GitService] Uploading batch ${batchNum}/${totalBatches} (${batch.length} files)` ); + // In-flight/stall tracking is per-batch (batches run sequentially). + stalledIndices.clear(); + inFlightBytes.clear(); const pointerInfos = await uploadBlobsToLFSBucket( { @@ -3147,21 +3415,14 @@ export class GitService { skippedBytes += status.size; skippedCount++; } - if (this.progressCallback) { - const pct = totalLfsBytes > 0 - ? Math.round((processedBytes / totalLfsBytes) * 100) - : 100; - const skippedPart = skippedBytes > 0 - ? ` — ${GitService.formatBytes(skippedBytes)} already synced` - : ""; - this.progressCallback( - "uploading_lfs", - processedBytes, - totalLfsBytes, - `Uploading media (${pct}% — ${GitService.formatBytes(processedBytes)} of ${GitService.formatBytes(totalLfsBytes)}${skippedPart})`, - ); - } + // Completed file: no longer in-flight/stalled. Its full + // size now lives in processedBytes, so drop the in-flight + // entry to avoid double-counting. + stalledIndices.delete(status.index); + inFlightBytes.delete(status.index); + emitMediaStatus(); }, + uploadEvents, ); // uploadBlobsToLFSBucket may skip corrupted/empty files, so the diff --git a/src/git/networkRetry.ts b/src/git/networkRetry.ts new file mode 100644 index 0000000..32f6e1d --- /dev/null +++ b/src/git/networkRetry.ts @@ -0,0 +1,361 @@ +/** + * Shared network-retry utilities for HTTP/`fetch` operations such as Git LFS + * batch requests, blob uploads, and downloads. + * + * Why this exists: Node's `fetch` (undici) reports most transport failures with + * the generic message `"fetch failed"` and hides the real reason one or more + * levels down in `error.cause` (e.g. `ECONNRESET`, `UND_ERR_SOCKET`). Classifying + * retryability off `error.message` alone therefore misses genuinely transient + * failures. These helpers walk the full `cause` chain so transient errors are + * retried consistently across every network call. + */ + +/** Default number of retries (so total attempts = maxRetries + 1). */ +export const DEFAULT_MAX_RETRIES = 3; +/** Base delay for exponential backoff. */ +export const DEFAULT_BASE_DELAY_MS = 1000; +/** Hard cap on any single backoff delay. */ +export const DEFAULT_MAX_DELAY_MS = 30_000; +/** Exponential growth factor (delay ≈ base * factor^attempt). */ +const BACKOFF_FACTOR = 3; +/** Never honor a server `Retry-After` longer than this. */ +const MAX_RETRY_AFTER_MS = 60_000; + +/** + * Human-readable descriptions for low-level network error codes. Used to turn + * an opaque `"fetch failed"` into something actionable in error reports. + */ +export const NETWORK_ERROR_DESCRIPTIONS: Record = { + ENOTFOUND: "DNS lookup failed - hostname not found", + EAI_AGAIN: "DNS lookup timed out - network may be unstable", + ECONNREFUSED: "Connection refused - server may be down", + ECONNRESET: "Connection reset by server", + ECONNABORTED: "Connection aborted", + ETIMEDOUT: "Connection timed out", + EPIPE: "Connection broken", + EHOSTUNREACH: "Host unreachable", + ENETUNREACH: "Network unreachable", + UND_ERR_CONNECT_TIMEOUT: "Connection timed out", + UND_ERR_HEADERS_TIMEOUT: "Server took too long to send headers", + UND_ERR_BODY_TIMEOUT: "Response body timed out", + UND_ERR_SOCKET: "Socket closed unexpectedly", + UNABLE_TO_VERIFY_LEAF_SIGNATURE: "SSL certificate verification failed", + DEPTH_ZERO_SELF_SIGNED_CERT: "Self-signed SSL certificate rejected", + CERT_HAS_EXPIRED: "SSL certificate has expired", + ERR_TLS_CERT_ALTNAME_INVALID: "SSL certificate hostname mismatch", +}; + +/** Transient transport error codes that are safe to retry. */ +const RETRYABLE_ERROR_CODES = new Set([ + "ENOTFOUND", + "EAI_AGAIN", + "ECONNREFUSED", + "ECONNRESET", + "ECONNABORTED", + "ETIMEDOUT", + "EPIPE", + "EHOSTUNREACH", + "ENETUNREACH", + "UND_ERR_CONNECT_TIMEOUT", + "UND_ERR_HEADERS_TIMEOUT", + "UND_ERR_BODY_TIMEOUT", + "UND_ERR_SOCKET", +]); + +/** Definitive failures that must never be retried (auth/cert problems). */ +const NON_RETRYABLE_ERROR_CODES = new Set([ + "UNABLE_TO_VERIFY_LEAF_SIGNATURE", + "DEPTH_ZERO_SELF_SIGNED_CERT", + "CERT_HAS_EXPIRED", + "ERR_TLS_CERT_ALTNAME_INVALID", +]); + +type MaybeError = { + message?: unknown; + code?: unknown; + status?: unknown; + name?: unknown; + cause?: unknown; + retryAfterMs?: unknown; +}; + +/** Attach a `cause` to a new Error without depending on ES2022 lib typings. */ +export function errorWithCause(message: string, cause: unknown): Error { + const error = new Error(message); + (error as Error & { cause?: unknown }).cause = cause; + return error; +} + +/** + * Walk an error's `cause` chain, yielding each link once. Guards against cyclic + * `cause` references so we never loop forever on malformed errors. + */ +function* iterateErrorChain(error: unknown): Generator { + const seen = new Set(); + let current: unknown = error; + while (current && typeof current === "object" && !seen.has(current)) { + seen.add(current); + yield current as MaybeError; + current = (current as MaybeError).cause; + } +} + +function getMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + const msg = (error as MaybeError)?.message; + return typeof msg === "string" ? msg : String(error); +} + +function getStatus(error: MaybeError): number | undefined { + const status = error.status; + return typeof status === "number" && status > 0 ? status : undefined; +} + +function getCode(error: MaybeError): string | undefined { + return typeof error.code === "string" ? error.code : undefined; +} + +function isTimeoutOrAbortName(name: unknown): boolean { + return name === "TimeoutError" || name === "AbortError"; +} + +export interface RetryClassificationOptions { + /** Retry HTTP 429 (rate limited). Defaults to true. */ + retryOn429?: boolean; +} + +/** + * Decide whether an error represents a transient failure worth retrying. + * + * The whole `cause` chain is inspected, so a wrapped `"fetch failed"` whose + * underlying cause is e.g. `UND_ERR_SOCKET` is correctly treated as retryable. + * An explicit HTTP status anywhere in the chain is authoritative: 5xx (and 429 + * when enabled) retry, every other status is a definitive client error. + */ +export function isRetryableError( + error: unknown, + options: RetryClassificationOptions = {}, +): boolean { + const { retryOn429 = true } = options; + + for (const link of iterateErrorChain(error)) { + const status = getStatus(link); + if (status !== undefined) { + if (status >= 500) { + return true; + } + if (status === 429) { + return retryOn429; + } + // Any other explicit HTTP status is a definitive client error. + return false; + } + + const code = getCode(link); + if (code) { + if (NON_RETRYABLE_ERROR_CODES.has(code)) { + return false; + } + if (RETRYABLE_ERROR_CODES.has(code)) { + return true; + } + } + + // Node/undici report transport failures as `TypeError: fetch failed`. + if (link instanceof TypeError && /fetch failed/i.test(getMessage(link))) { + return true; + } + + // Our own timeout aborts (and undici timeouts) surface as DOMExceptions. + // Genuine user-initiated cancellation is handled by the AbortSignal check + // in retryWithBackoff before this is ever consulted. + if (isTimeoutOrAbortName(link.name)) { + return true; + } + + const message = getMessage(link); + if ( + /ECONNRESET|ETIMEDOUT|ECONNREFUSED|ENOTFOUND|EAI_AGAIN|EPIPE|socket hang up|UND_ERR/i.test( + message, + ) + ) { + return true; + } + } + + return false; +} + +/** + * Build an actionable, single-line description of a network error by walking the + * cause chain and decoding known error codes. Turns `"fetch failed"` into + * `"fetch failed - Socket closed unexpectedly (UND_ERR_SOCKET)"`. + */ +export function getNetworkErrorDetails(error: unknown): string { + const top = getMessage(error); + const parts: string[] = [top]; + + for (const link of iterateErrorChain(error)) { + if (link === error) { + continue; + } + const code = getCode(link); + const description = code ? NETWORK_ERROR_DESCRIPTIONS[code] : undefined; + const message = getMessage(link); + + if (description && !parts.includes(description)) { + parts.push(code ? `${description} (${code})` : description); + } else if (code && !parts.some((p) => p.includes(code))) { + parts.push(`(${code})`); + } else if (message && message !== top && !parts.includes(message)) { + parts.push(message); + } + } + + return parts.join(" - "); +} + +/** + * Parse an HTTP `Retry-After` header (delta-seconds or HTTP-date) into ms. + * Returns undefined when absent or unparseable. + */ +export function parseRetryAfterMs(headerValue: string | null | undefined): number | undefined { + if (!headerValue) { + return undefined; + } + const seconds = Number(headerValue); + if (Number.isFinite(seconds)) { + return Math.max(0, seconds * 1000); + } + const dateMs = Date.parse(headerValue); + if (!Number.isNaN(dateMs)) { + return Math.max(0, dateMs - Date.now()); + } + return undefined; +} + +function getRetryAfterMs(error: unknown): number | undefined { + for (const link of iterateErrorChain(error)) { + const value = link.retryAfterMs; + if (typeof value === "number" && Number.isFinite(value) && value >= 0) { + return value; + } + } + return undefined; +} + +const createAbortError = (signal?: AbortSignal): unknown => + signal?.reason ?? new DOMException("Aborted", "AbortError"); + +/** Sleep that rejects promptly if the provided signal aborts mid-delay. */ +function abortableDelay(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(createAbortError(signal)); + return; + } + const timer = setTimeout(() => { + cleanup(); + resolve(); + }, ms); + const onAbort = () => { + cleanup(); + reject(createAbortError(signal)); + }; + const cleanup = () => { + clearTimeout(timer); + signal?.removeEventListener("abort", onAbort); + }; + signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + +/** + * Compute the next backoff delay: exponential growth, capped, with "equal + * jitter" (half fixed + half random) to avoid thundering-herd retries when many + * concurrent uploads fail at once. Honors a server-provided `Retry-After` when + * it is longer than the computed delay. + */ +export function computeBackoffDelay( + attempt: number, + baseDelayMs: number, + maxDelayMs: number, + retryAfterMs?: number, +): number { + const exponential = baseDelayMs * Math.pow(BACKOFF_FACTOR, attempt); + const capped = Math.min(exponential, maxDelayMs); + const jittered = capped / 2 + Math.random() * (capped / 2); + const serverRequested = Math.min(retryAfterMs ?? 0, MAX_RETRY_AFTER_MS); + return Math.max(jittered, serverRequested); +} + +export interface RetryOptions extends RetryClassificationOptions { + maxRetries?: number; + baseDelayMs?: number; + maxDelayMs?: number; + signal?: AbortSignal; + /** Override the default transient-error classifier. */ + isRetryable?: (error: unknown) => boolean; + /** Called before each backoff sleep (for logging/telemetry). */ + onRetry?: (info: { attempt: number; maxRetries: number; delayMs: number; error: unknown }) => void; +} + +/** + * Run `fn`, retrying transient failures with jittered exponential backoff. + * Only retries when the error is classified retryable; honors an AbortSignal + * both between attempts and during the backoff sleep. + */ +export async function retryWithBackoff( + fn: () => Promise, + label: string, + options: RetryOptions = {}, +): Promise { + const { + maxRetries = DEFAULT_MAX_RETRIES, + baseDelayMs = DEFAULT_BASE_DELAY_MS, + maxDelayMs = DEFAULT_MAX_DELAY_MS, + signal, + retryOn429, + onRetry, + } = options; + const isRetryable = + options.isRetryable ?? ((error: unknown) => isRetryableError(error, { retryOn429 })); + + let hadFailure = false; + for (let attempt = 0; ; attempt++) { + if (signal?.aborted) { + throw createAbortError(signal); + } + try { + const result = await fn(); + if (hadFailure) { + console.log( + `[Retry] ${label} succeeded on attempt ${attempt + 1} after previous failure(s)`, + ); + } + return result; + } catch (error) { + hadFailure = true; + // Distinguish genuine cancellation from a retryable failure. + if (signal?.aborted) { + throw createAbortError(signal); + } + if (attempt >= maxRetries || !isRetryable(error)) { + throw error; + } + const delayMs = computeBackoffDelay( + attempt, + baseDelayMs, + maxDelayMs, + getRetryAfterMs(error), + ); + onRetry?.({ attempt, maxRetries, delayMs, error }); + console.log( + `[Retry] ${label} failed (attempt ${attempt + 1}/${maxRetries + 1}), retrying in ${Math.round(delayMs)}ms: ${getNetworkErrorDetails(error)}`, + ); + await abortableDelay(delayMs, signal); + } + } +} diff --git a/src/test/suite/unit/git.upload-stall-timeout.test.ts b/src/test/suite/unit/git.upload-stall-timeout.test.ts new file mode 100644 index 0000000..df52d3e --- /dev/null +++ b/src/test/suite/unit/git.upload-stall-timeout.test.ts @@ -0,0 +1,180 @@ +import * as assert from "assert"; +import { fetchUploadWithStallTimeout } from "../../../git/GitService"; + +/** + * Read every chunk from a request-body ReadableStream, pausing `delayMs` + * between reads to simulate a slow-but-alive connection. + */ +const drainBody = async (init: any, delayMs: number): Promise => { + const reader = init.body.getReader(); + let total = 0; + for (;;) { + const { done, value } = await reader.read(); + if (done) { + break; + } + total += value.length; + if (delayMs > 0) { + await new Promise((r) => setTimeout(r, delayMs)); + } + } + return total; +}; + +suite("GitService - fetchUploadWithStallTimeout", () => { + let originalFetch: any; + + setup(() => { + originalFetch = (globalThis as any).fetch; + }); + + teardown(() => { + (globalThis as any).fetch = originalFetch; + }); + + test("sets an explicit Content-Length (fixed-length, not chunked)", async () => { + const bytes = new Uint8Array(1000).fill(7); + let seenContentLength: string | undefined; + let seenTransferEncoding: string | undefined; + + (globalThis as any).fetch = async (_input: any, init: any) => { + const headers = init.headers as Record; + seenContentLength = headers["Content-Length"]; + seenTransferEncoding = headers["Transfer-Encoding"]; + await drainBody(init, 0); + return new Response("", { status: 200 }); + }; + + const resp = await fetchUploadWithStallTimeout("https://lfs.example.com/up", bytes, { + headers: { Authorization: "Bearer x" }, + }); + + assert.strictEqual(resp.status, 200); + assert.strictEqual(seenContentLength, String(bytes.length)); + assert.strictEqual(seenTransferEncoding, undefined); + }); + + test("does NOT abort a slow-but-progressing upload", async () => { + // 8 chunks, ~30ms between each (~240ms total) with a 100ms stall window: + // progress keeps resetting the timer so it must never fire. + const bytes = new Uint8Array(8 * 256 * 1024).fill(1); + let bytesReceived = 0; + + (globalThis as any).fetch = async (_input: any, init: any) => { + bytesReceived = await drainBody(init, 30); + return new Response("", { status: 200 }); + }; + + const resp = await fetchUploadWithStallTimeout("https://lfs.example.com/up", bytes, { + headers: {}, + stallTimeoutMs: 100, + chunkSize: 256 * 1024, + }); + + assert.strictEqual(resp.status, 200); + assert.strictEqual(bytesReceived, bytes.length); + }); + + test("aborts with a retryable TimeoutError when no progress is made", async () => { + const bytes = new Uint8Array(1024).fill(1); + + // Never read the body; reject only when the stall timer aborts the signal. + (globalThis as any).fetch = (_input: any, init: any) => + new Promise((_resolve, reject) => { + init.signal.addEventListener("abort", () => reject(init.signal.reason), { + once: true, + }); + }); + + await assert.rejects( + fetchUploadWithStallTimeout("https://lfs.example.com/up", bytes, { + headers: {}, + stallTimeoutMs: 50, + }), + (err: unknown) => { + assert.ok(err instanceof Error); + assert.strictEqual((err as Error).name, "TimeoutError"); + return true; + }, + ); + }); + + test("reports cumulative byte progress via onProgress up to the total", async () => { + const bytes = new Uint8Array(5 * 256).fill(3); + (globalThis as any).fetch = async (_input: any, init: any) => { + await drainBody(init, 0); + return new Response("", { status: 200 }); + }; + + const progress: number[] = []; + const resp = await fetchUploadWithStallTimeout("https://lfs.example.com/up", bytes, { + headers: {}, + chunkSize: 256, + onProgress: (bytesSent, totalBytes) => { + assert.strictEqual(totalBytes, bytes.length); + progress.push(bytesSent); + }, + }); + + assert.strictEqual(resp.status, 200); + assert.ok(progress.length > 0, "should emit progress"); + // Monotonically increasing and ending exactly at the total. + for (let i = 1; i < progress.length; i++) { + assert.ok(progress[i] > progress[i - 1], "progress must increase"); + } + assert.strictEqual(progress[progress.length - 1], bytes.length); + }); + + test("reports a stall then recovery via onStallStateChange", async () => { + // 4 small chunks. The consumer reads the first chunk, pauses long enough + // for the warn timer to fire (stall=true), then resumes (stall=false). + const bytes = new Uint8Array(4 * 256).fill(1); + let pausedOnce = false; + + (globalThis as any).fetch = async (_input: any, init: any) => { + const reader = init.body.getReader(); + for (;;) { + const { done } = await reader.read(); + if (done) { + break; + } + if (!pausedOnce) { + pausedOnce = true; + await new Promise((r) => setTimeout(r, 120)); + } + } + return new Response("", { status: 200 }); + }; + + const states: boolean[] = []; + const resp = await fetchUploadWithStallTimeout("https://lfs.example.com/up", bytes, { + headers: {}, + stallTimeoutMs: 5000, + stallWarnMs: 30, + chunkSize: 256, + onStallStateChange: (stalled) => states.push(stalled), + }); + + assert.strictEqual(resp.status, 200); + assert.ok(states.includes(true), "should report a stall"); + assert.strictEqual(states[states.length - 1], false, "should report recovery last"); + }); + + test("rejects immediately if the external signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + let fetchCalled = false; + (globalThis as any).fetch = async () => { + fetchCalled = true; + return new Response("", { status: 200 }); + }; + + await assert.rejects( + fetchUploadWithStallTimeout("https://lfs.example.com/up", new Uint8Array(10), { + headers: {}, + signal: controller.signal, + }), + ); + assert.strictEqual(fetchCalled, false); + }); +}); diff --git a/src/test/suite/unit/network-retry.test.ts b/src/test/suite/unit/network-retry.test.ts new file mode 100644 index 0000000..7005d6b --- /dev/null +++ b/src/test/suite/unit/network-retry.test.ts @@ -0,0 +1,199 @@ +import * as assert from "assert"; +import { + isRetryableError, + getNetworkErrorDetails, + parseRetryAfterMs, + computeBackoffDelay, + retryWithBackoff, + errorWithCause, +} from "../../../git/networkRetry"; + +/** Build an Error carrying a low-level `code` (mimics a Node system error). */ +const codeError = (code: string, message = code): Error => { + const err = new Error(message); + (err as NodeJS.ErrnoException).code = code; + return err; +}; + +/** Build an Error carrying an HTTP `status`. */ +const statusError = (status: number, message = `HTTP ${status}`): Error => { + const err = new Error(message); + (err as Error & { status: number }).status = status; + return err; +}; + +suite("networkRetry - isRetryableError", () => { + test("bare 'fetch failed' TypeError is retryable", () => { + assert.strictEqual(isRetryableError(new TypeError("fetch failed")), true); + }); + + test("undici cause code (UND_ERR_SOCKET) one level down is retryable", () => { + const wrapped = errorWithCause("fetch failed", codeError("UND_ERR_SOCKET")); + assert.strictEqual(isRetryableError(wrapped), true); + }); + + test("regression: descriptive wrapper preserving a transient cause is retryable", () => { + // Mirrors the GitService LFS upload path: a friendly message wrapping the + // original `TypeError: fetch failed` (whose cause is a socket error). + const undiciError = errorWithCause("fetch failed", codeError("ECONNRESET")); + const userFacing = errorWithCause( + "Network error uploading file 0 to LFS storage: fetch failed", + undiciError, + ); + assert.strictEqual(isRetryableError(userFacing), true); + }); + + test("HTTP 5xx is retryable", () => { + assert.strictEqual(isRetryableError(statusError(503)), true); + }); + + test("HTTP 429 is retryable by default, configurable off", () => { + assert.strictEqual(isRetryableError(statusError(429)), true); + assert.strictEqual(isRetryableError(statusError(429), { retryOn429: false }), false); + }); + + test("HTTP 4xx client errors are not retryable", () => { + assert.strictEqual(isRetryableError(statusError(404)), false); + assert.strictEqual(isRetryableError(statusError(401)), false); + }); + + test("explicit HTTP status is authoritative over a transient cause", () => { + const err = statusError(400); + (err as Error & { cause?: unknown }).cause = codeError("ECONNRESET"); + assert.strictEqual(isRetryableError(err), false); + }); + + test("TLS/certificate errors are never retryable", () => { + assert.strictEqual(isRetryableError(codeError("CERT_HAS_EXPIRED")), false); + assert.strictEqual( + isRetryableError(errorWithCause("fetch failed", codeError("DEPTH_ZERO_SELF_SIGNED_CERT"))), + false, + ); + }); + + test("transient codes by message are retryable", () => { + assert.strictEqual(isRetryableError(new Error("read ECONNRESET")), true); + assert.strictEqual(isRetryableError(new Error("socket hang up")), true); + }); + + test("non-network errors are not retryable", () => { + assert.strictEqual(isRetryableError(new Error("Unexpected JSON structure")), false); + assert.strictEqual(isRetryableError("plain string"), false); + }); + + test("cyclic cause chains do not hang", () => { + const a = new Error("a") as Error & { cause?: unknown }; + const b = new Error("b") as Error & { cause?: unknown }; + a.cause = b; + b.cause = a; + assert.strictEqual(isRetryableError(a), false); + }); +}); + +suite("networkRetry - getNetworkErrorDetails", () => { + test("decodes undici cause code into a readable description", () => { + const wrapped = errorWithCause("fetch failed", codeError("UND_ERR_SOCKET")); + const details = getNetworkErrorDetails(wrapped); + assert.ok(details.includes("fetch failed"), "keeps the top-level message"); + assert.ok(details.includes("Socket closed unexpectedly"), "adds the decoded description"); + assert.ok(details.includes("UND_ERR_SOCKET"), "includes the raw code"); + }); + + test("falls back to the message when there is no cause", () => { + assert.strictEqual(getNetworkErrorDetails(new Error("boom")), "boom"); + }); +}); + +suite("networkRetry - parseRetryAfterMs", () => { + test("parses delta-seconds", () => { + assert.strictEqual(parseRetryAfterMs("5"), 5000); + }); + + test("parses an HTTP date in the future", () => { + const future = new Date(Date.now() + 10_000).toUTCString(); + const ms = parseRetryAfterMs(future); + assert.ok(ms !== undefined && ms > 5_000 && ms <= 10_000, `unexpected ms: ${ms}`); + }); + + test("returns undefined for missing/invalid values", () => { + assert.strictEqual(parseRetryAfterMs(null), undefined); + assert.strictEqual(parseRetryAfterMs(undefined), undefined); + assert.strictEqual(parseRetryAfterMs("not-a-date"), undefined); + }); +}); + +suite("networkRetry - computeBackoffDelay", () => { + test("grows exponentially within jitter bounds and respects the cap", () => { + const base = 1000; + const max = 30_000; + for (let attempt = 0; attempt < 3; attempt++) { + const expected = Math.min(base * Math.pow(3, attempt), max); + const delay = computeBackoffDelay(attempt, base, max); + assert.ok(delay >= expected / 2, `delay ${delay} below jitter floor for attempt ${attempt}`); + assert.ok(delay <= expected, `delay ${delay} above ceiling for attempt ${attempt}`); + } + }); + + test("never exceeds the max delay cap", () => { + const delay = computeBackoffDelay(10, 1000, 5000); + assert.ok(delay <= 5000, `delay ${delay} exceeded cap`); + }); + + test("honors a server Retry-After longer than the computed delay", () => { + const delay = computeBackoffDelay(0, 1000, 30_000, 8000); + assert.ok(delay >= 8000, `delay ${delay} ignored Retry-After`); + }); +}); + +suite("networkRetry - retryWithBackoff", () => { + const fastOptions = { baseDelayMs: 1, maxDelayMs: 5 } as const; + + test("retries a transient failure then succeeds", async () => { + let attempts = 0; + const result = await retryWithBackoff(async () => { + attempts++; + if (attempts < 3) { + throw codeError("ECONNRESET"); + } + return "ok"; + }, "test transient", fastOptions); + assert.strictEqual(result, "ok"); + assert.strictEqual(attempts, 3); + }); + + test("does not retry a non-retryable error", async () => { + let attempts = 0; + await assert.rejects( + retryWithBackoff(async () => { + attempts++; + throw statusError(404); + }, "test client error", fastOptions), + ); + assert.strictEqual(attempts, 1); + }); + + test("gives up after maxRetries and rethrows the last error", async () => { + let attempts = 0; + await assert.rejects( + retryWithBackoff(async () => { + attempts++; + throw codeError("ETIMEDOUT"); + }, "test exhaust", { ...fastOptions, maxRetries: 2 }), + /ETIMEDOUT/, + ); + assert.strictEqual(attempts, 3); // initial + 2 retries + }); + + test("throws immediately when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + let attempts = 0; + await assert.rejects( + retryWithBackoff(async () => { + attempts++; + return "unreachable"; + }, "test aborted", { ...fastOptions, signal: controller.signal }), + ); + assert.strictEqual(attempts, 0); + }); +}); diff --git a/src/types/lfs.ts b/src/types/lfs.ts index 5165978..b0d1dd9 100644 --- a/src/types/lfs.ts +++ b/src/types/lfs.ts @@ -56,3 +56,41 @@ export interface LfsPointerInfo { // Allow library-specific extras without forcing any [key: string]: unknown; } + +/** Fired when a single file's upload fails transiently and is about to retry. */ +export interface LfsUploadRetryEvent { + index: number; + label?: string; + /** 1-based count of the retry about to happen (1 = first retry). */ + retry: number; + /** Total number of retries that will be attempted. */ + maxRetries: number; + /** Delay before the retry fires. */ + delayMs: number; + /** Decoded, human-readable reason for the failure. */ + reason: string; +} + +/** Fired when an in-flight upload stalls (no progress) or recovers. */ +export interface LfsUploadStallEvent { + index: number; + label?: string; + stalled: boolean; +} + +/** Fired as a single file's bytes are streamed to the server. */ +export interface LfsUploadBytesEvent { + index: number; + label?: string; + /** Bytes of this file handed to the socket so far. */ + bytesSent: number; + /** Total size of this file. */ + totalBytes: number; +} + +/** Optional observers for live upload visibility (progress, retries, stalls). */ +export interface LfsUploadEvents { + onRetry?: (event: LfsUploadRetryEvent) => void; + onStallStateChange?: (event: LfsUploadStallEvent) => void; + onBytes?: (event: LfsUploadBytesEvent) => void; +} From b90117ec82340a85b97595a8be3d90c3c8bad8a1 Mon Sep 17 00:00:00 2001 From: TimRl Date: Tue, 2 Jun 2026 10:59:14 -0600 Subject: [PATCH 2/2] Implemented LFS download video for "streaming" media --- src/extension.ts | 67 +++++++++++++++++++++++ src/git/GitService.ts | 124 +++++++++++++++++++++++++++++------------- 2 files changed, 154 insertions(+), 37 deletions(-) diff --git a/src/extension.ts b/src/extension.ts index 47e3159..076cc33 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -127,6 +127,22 @@ export interface FrontierAPI { */ downloadLFSFile: (projectPath: string, oid: string, size: number) => Promise; + /** + * Resolve a (typically presigned) download URL for a single LFS object + * WITHOUT downloading the bytes. Used for streaming media directly from the + * object store (e.g. an R2 presigned URL fed to a