diff --git a/packages/cloud/src/types.ts b/packages/cloud/src/types.ts index 2c25bbedf..55d40e3ab 100644 --- a/packages/cloud/src/types.ts +++ b/packages/cloud/src/types.ts @@ -49,6 +49,16 @@ export type AuthSessionResponse = { export type WorkflowFileType = 'yaml' | 'ts' | 'py'; +export type PathSubmission = { + name: string; + s3CodeKey: string; + repoOwner?: string; + repoName?: string; + pushBranch?: string; + pushBase?: string; + pushPrBody?: string; +}; + export type RunWorkflowOptions = { apiUrl?: string; fileType?: WorkflowFileType; @@ -59,6 +69,26 @@ export type RunWorkflowResponse = { runId: string; sandboxId?: string; status: string; + patches?: Record< + string, + { + s3Key: string; + hasChanges?: boolean; + pushedTo?: { + branch: string; + prUrl: string; + sha: string; + base: { branch: string; sha: string }; + strategy?: 'contents_api' | 'git_db'; + }; + pushError?: { + code: string; + message: string; + observedBaseSha?: string; + base?: { branch: string; sha: string }; + }; + } + >; [key: string]: unknown; }; @@ -71,11 +101,16 @@ export type WorkflowLogsResponse = { }; export type SyncPatchResponse = { - patch: string; - hasChanges: boolean; + patch?: string; + hasChanges?: boolean; + patches?: Record; [key: string]: unknown; }; +export type GetPatchesResponse = { + patches: Record; +}; + export const SUPPORTED_PROVIDERS = ['anthropic', 'openai', 'google', 'cursor', 'opencode', 'droid'] as const; export const REFRESH_WINDOW_MS = 60_000; diff --git a/packages/cloud/src/workflows.test.ts b/packages/cloud/src/workflows.test.ts index a5559028e..9e7081d63 100644 --- a/packages/cloud/src/workflows.test.ts +++ b/packages/cloud/src/workflows.test.ts @@ -1,9 +1,35 @@ -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import path from 'node:path'; import os from 'node:os'; -import { mkdtemp, realpath, rm } from 'node:fs/promises'; +import { execFileSync } from 'node:child_process'; +import { mkdir, mkdtemp, realpath, rm, writeFile } from 'node:fs/promises'; +import type { RunWorkflowResponse } from './types.js'; -import { relativizeWorkflowPath } from './workflows.js'; +const s3SendMock = vi.hoisted(() => vi.fn()); +const ensureAuthenticatedMock = vi.hoisted(() => vi.fn()); +const authorizedApiFetchMock = vi.hoisted(() => vi.fn()); + +vi.mock('@aws-sdk/client-s3', () => { + class PutObjectCommand { + input: unknown; + constructor(input: unknown) { + this.input = input; + } + } + class S3Client { + send(command: unknown) { + return s3SendMock(command); + } + } + return { PutObjectCommand, S3Client }; +}); + +vi.mock('./auth.js', () => ({ + ensureAuthenticated: (...args: unknown[]) => ensureAuthenticatedMock(...args), + authorizedApiFetch: (...args: unknown[]) => authorizedApiFetchMock(...args), +})); + +import { parseGitHubRemote, parseWorkflowPaths, relativizeWorkflowPath, runWorkflow } from './workflows.js'; describe('relativizeWorkflowPath', () => { let tmpRoot: string; @@ -22,6 +48,7 @@ describe('relativizeWorkflowPath', () => { afterEach(async () => { process.chdir(originalCwd); await rm(tmpRoot, { recursive: true, force: true }); + vi.clearAllMocks(); }); it('returns a forward-slash relative path for a sibling of cwd', () => { @@ -59,3 +86,336 @@ describe('relativizeWorkflowPath', () => { expect(result).toBeNull(); }); }); + +describe('parseWorkflowPaths', () => { + it('extracts paths from YAML workflow source', () => { + const paths = parseWorkflowPaths( + [ + 'version: "1.0"', + 'name: multi', + 'paths:', + ' - name: cloud', + ' path: .', + ' - name: relay', + ' path: ../relay', + 'swarm:', + ' pattern: dag', + 'agents: []', + 'workflows: []', + ].join('\n'), + 'yaml' + ); + + expect(paths).toEqual([ + { name: 'cloud', path: '.' }, + { name: 'relay', path: '../relay' }, + ]); + }); + + it('extracts push-back options from YAML workflow paths', () => { + const paths = parseWorkflowPaths( + [ + 'version: "1.0"', + 'name: multi', + 'paths:', + ' - name: cloud', + ' path: .', + ' pushBranch: feature/api-keys', + ' pushBase: develop', + ' pushPrBody: Custom body', + 'swarm:', + ' pattern: dag', + 'agents: []', + 'workflows: []', + ].join('\n'), + 'yaml' + ); + + expect(paths).toEqual([ + { + name: 'cloud', + path: '.', + pushBranch: 'feature/api-keys', + pushBase: 'develop', + pushPrBody: 'Custom body', + }, + ]); + }); + + it('extracts paths from TS workflow source', () => { + const paths = parseWorkflowPaths( + ` + export const config = { + version: '1.0', + paths: [ + { name: 'cloud', path: '.' }, + { name: "relay", path: "../relay" }, + ], + swarm: { pattern: 'dag' }, + }; + `, + 'ts' + ); + + expect(paths).toEqual([ + { name: 'cloud', path: '.' }, + { name: 'relay', path: '../relay' }, + ]); + }); + + it('accepts widened run workflow patch push response types', () => { + const response = { + runId: 'run-1', + status: 'completed', + patches: { + cloud: { + s3Key: 'user/run/changes-cloud.patch', + pushedTo: { + branch: 'agent-relay/run-run-1', + prUrl: 'https://github.com/acme/cloud/pull/1', + sha: 'abc123', + base: { branch: 'main', sha: 'base123' }, + strategy: 'contents_api', + }, + }, + relay: { + s3Key: 'user/run/changes-relay.patch', + pushError: { + code: 'base_branch_moved', + message: 'Base moved', + observedBaseSha: 'base456', + base: { branch: 'main', sha: 'base123' }, + }, + }, + }, + } satisfies RunWorkflowResponse; + + expect(response.patches.cloud.pushedTo.prUrl).toContain('/pull/1'); + }); + + it('extracts paths from fluent TS workflow source', () => { + const paths = parseWorkflowPaths( + ` + workflow('probe') + .paths([ + { name: 'cloud', path: '.' }, + { name: 'relay', path: '../relay' }, + ]) + .run(); + `, + 'ts' + ); + + expect(paths).toEqual([ + { name: 'cloud', path: '.' }, + { name: 'relay', path: '../relay' }, + ]); + }); +}); + +describe('parseGitHubRemote', () => { + it('parses scp-style GitHub remotes', () => { + expect(parseGitHubRemote('git@github.com:Owner/Name.git')).toEqual({ + repoOwner: 'Owner', + repoName: 'Name', + }); + }); + + it('parses HTTPS GitHub remotes', () => { + expect(parseGitHubRemote('https://github.com/Owner/Name')).toEqual({ + repoOwner: 'Owner', + repoName: 'Name', + }); + expect(parseGitHubRemote('https://github.com/Owner/Name.git')).toEqual({ + repoOwner: 'Owner', + repoName: 'Name', + }); + }); + + it('parses ssh:// GitHub remotes', () => { + expect(parseGitHubRemote('ssh://git@github.com/Owner/Name.git')).toEqual({ + repoOwner: 'Owner', + repoName: 'Name', + }); + }); + + it('returns null for non-GitHub remotes', () => { + expect(parseGitHubRemote('https://gitlab.com/Owner/Name.git')).toBeNull(); + expect(parseGitHubRemote('not-a-url')).toBeNull(); + }); +}); + +describe('runWorkflow code sync', () => { + let tmpRoot: string; + let originalCwd: string; + const s3Credentials = { + accessKeyId: 'access', + secretAccessKey: 'secret', + sessionToken: 'session', + bucket: 'bucket', + prefix: 'user/run', + }; + + beforeEach(async () => { + originalCwd = process.cwd(); + tmpRoot = await realpath(await mkdtemp(path.join(os.tmpdir(), 'cloud-run-workflow-'))); + process.chdir(tmpRoot); + ensureAuthenticatedMock.mockResolvedValue({ accessToken: 'token' }); + s3SendMock.mockResolvedValue({}); + }); + + afterEach(async () => { + process.chdir(originalCwd); + await rm(tmpRoot, { recursive: true, force: true }); + vi.clearAllMocks(); + }); + + function mockPrepareAndRun(runBodies: unknown[]) { + authorizedApiFetchMock.mockImplementation(async (_auth, requestPath, init) => { + if (requestPath === '/api/v1/workflows/prepare') { + return { + auth: { accessToken: 'token' }, + response: new Response( + JSON.stringify({ + runId: 'run-1', + s3Credentials, + s3CodeKey: 'code.tar.gz', + }), + { status: 200, headers: { 'Content-Type': 'application/json' } } + ), + }; + } + if (requestPath === '/api/v1/workflows/run') { + runBodies.push(JSON.parse(String(init?.body))); + return { + auth: { accessToken: 'token' }, + response: new Response(JSON.stringify({ runId: 'run-1', status: 'pending' }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }), + }; + } + throw new Error(`unexpected request: ${requestPath}`); + }); + } + + it('uploads one tarball per declared path and sends paths[]', async () => { + await mkdir('cloud', { recursive: true }); + await mkdir('relay', { recursive: true }); + await writeFile('cloud/README.md', 'cloud\n'); + await writeFile('relay/README.md', 'relay\n'); + execFileSync('git', ['init', '-q'], { cwd: path.join(tmpRoot, 'cloud') }); + execFileSync('git', ['remote', 'add', 'origin', 'git@github.com:AgentWorkforce/cloud.git'], { + cwd: path.join(tmpRoot, 'cloud'), + }); + execFileSync('git', ['add', 'README.md'], { cwd: path.join(tmpRoot, 'cloud') }); + + const workflowPath = path.join(tmpRoot, 'workflow.yaml'); + await writeFile( + workflowPath, + [ + 'version: "1.0"', + 'name: multi', + 'paths:', + ' - name: cloud', + ' path: cloud', + ' pushBranch: feature/api-keys', + ' pushBase: develop', + ' pushPrBody: Custom body', + ' - name: relay', + ' path: relay', + 'swarm:', + ' pattern: dag', + 'agents: []', + 'workflows: []', + ].join('\n') + ); + const runBodies: unknown[] = []; + mockPrepareAndRun(runBodies); + + await runWorkflow(workflowPath); + + expect(s3SendMock).toHaveBeenCalledTimes(2); + const keys = s3SendMock.mock.calls.map(([command]) => command.input.Key); + expect(keys).toEqual(['user/run/code-cloud.tar.gz', 'user/run/code-relay.tar.gz']); + expect(runBodies[0]).toMatchObject({ + runId: 'run-1', + paths: [ + { + name: 'cloud', + s3CodeKey: 'code-cloud.tar.gz', + repoOwner: 'AgentWorkforce', + repoName: 'cloud', + pushBranch: 'feature/api-keys', + pushBase: 'develop', + pushPrBody: 'Custom body', + }, + { + name: 'relay', + s3CodeKey: 'code-relay.tar.gz', + }, + ], + }); + expect((runBodies[0] as { s3CodeKey?: unknown }).s3CodeKey).toBeUndefined(); + }); + + it('relativizes workflowPath against the declared path that contains it, not paths[0]', async () => { + await mkdir('cloud', { recursive: true }); + await mkdir('relay/workflows', { recursive: true }); + await writeFile('cloud/README.md', 'cloud\n'); + await writeFile('relay/README.md', 'relay\n'); + + // Workflow file lives inside the SECOND declared path (relay/), not the first (cloud/). + const workflowPath = path.join(tmpRoot, 'relay/workflows/thing.yaml'); + await writeFile( + workflowPath, + [ + 'version: "1.0"', + 'name: multi', + 'paths:', + ' - name: cloud', + ' path: ../cloud', + ' - name: relay', + ' path: ../relay', + 'swarm:', + ' pattern: dag', + 'agents: []', + 'workflows: []', + ].join('\n') + ); + const runBodies: unknown[] = []; + mockPrepareAndRun(runBodies); + + // Run from the relay dir so `../cloud` and `../relay` resolve correctly. + const prevCwd = process.cwd(); + process.chdir(path.join(tmpRoot, 'relay')); + try { + await runWorkflow(workflowPath); + } finally { + process.chdir(prevCwd); + } + + expect((runBodies[0] as { workflowPath?: string }).workflowPath).toBe('workflows/thing.yaml'); + }); + + it('falls back to the legacy single tarball when no paths are declared', async () => { + await writeFile('README.md', 'legacy\n'); + const workflowPath = path.join(tmpRoot, 'workflow.yaml'); + await writeFile( + workflowPath, + ['version: "1.0"', 'name: legacy', 'swarm:', ' pattern: dag', 'agents: []', 'workflows: []'].join('\n') + ); + const runBodies: unknown[] = []; + mockPrepareAndRun(runBodies); + + await runWorkflow(workflowPath); + + expect(s3SendMock).toHaveBeenCalledTimes(1); + expect(s3SendMock.mock.calls[0][0].input.Key).toBe('user/run/code.tar.gz'); + expect(runBodies[0]).toMatchObject({ + runId: 'run-1', + s3CodeKey: 'code.tar.gz', + }); + expect((runBodies[0] as { paths?: unknown }).paths).toBeUndefined(); + }); +}); diff --git a/packages/cloud/src/workflows.ts b/packages/cloud/src/workflows.ts index 26a365702..ca0523339 100644 --- a/packages/cloud/src/workflows.ts +++ b/packages/cloud/src/workflows.ts @@ -1,5 +1,6 @@ import fs from 'node:fs/promises'; import path from 'node:path'; +import { execFileSync } from 'node:child_process'; import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'; import ignore from 'ignore'; @@ -12,6 +13,7 @@ import { type RunWorkflowResponse, type WorkflowLogsResponse, type SyncPatchResponse, + type PathSubmission, } from './types.js'; type ResolvedWorkflowInput = { @@ -34,6 +36,14 @@ type PrepareWorkflowResponse = { s3CodeKey: string; }; +type WorkflowPathDefinition = { + name: string; + path: string; + pushBranch?: string; + pushBase?: string; + pushPrBody?: string; +}; + type RunWorkflowOptions = { apiUrl?: string; fileType?: WorkflowFileType; @@ -56,6 +66,7 @@ const CODE_SYNC_EXCLUDES = [ '.aws', '.ssh', ]; +const PATH_NAME_RE = /^[A-Za-z][A-Za-z0-9_-]{0,63}$/; function validateYamlWorkflow(content: string): void { const hasField = (field: string) => new RegExp(`^${field}\\s*:`, 'm').test(content); @@ -74,6 +85,210 @@ function validateYamlWorkflow(content: string): void { } } +function stripYamlScalar(raw: string): string { + let value = raw.trim(); + const commentIndex = value.search(/\s#/); + if (commentIndex !== -1) { + value = value.slice(0, commentIndex).trim(); + } + if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'"))) { + return value.slice(1, -1); + } + return value; +} + +function assignPathField(target: Partial, text: string): void { + const colonIdx = text.indexOf(':'); + if (colonIdx === -1) return; + const key = text.slice(0, colonIdx).trim(); + if (!/^[A-Za-z_][A-Za-z0-9_-]*$/.test(key)) return; + const value = stripYamlScalar(text.slice(colonIdx + 1).trim()); + switch (key) { + case 'name': + target.name = value; + break; + case 'path': + target.path = value; + break; + case 'pushBranch': + target.pushBranch = value; + break; + case 'pushBase': + target.pushBase = value; + break; + case 'pushPrBody': + target.pushPrBody = value; + break; + } +} + +function parseYamlWorkflowPaths(content: string): WorkflowPathDefinition[] { + const paths: WorkflowPathDefinition[] = []; + const lines = content.split(/\r?\n/); + let inPaths = false; + let baseIndent = 0; + let current: Partial | null = null; + + const flush = () => { + if (current?.name && current.path) { + paths.push({ + name: current.name, + path: current.path, + ...(current.pushBranch ? { pushBranch: current.pushBranch } : {}), + ...(current.pushBase ? { pushBase: current.pushBase } : {}), + ...(current.pushPrBody ? { pushPrBody: current.pushPrBody } : {}), + }); + } + current = null; + }; + + for (const rawLine of lines) { + if (!rawLine.trim() || rawLine.trimStart().startsWith('#')) continue; + const indent = rawLine.match(/^\s*/)?.[0].length ?? 0; + const trimmed = rawLine.trim(); + + if (!inPaths) { + if (/^paths\s*:/.test(trimmed)) { + inPaths = true; + baseIndent = indent; + } + continue; + } + + if (indent <= baseIndent && !trimmed.startsWith('-')) { + break; + } + + if (trimmed.startsWith('-')) { + flush(); + current = {}; + const rest = trimmed.slice(1).trim(); + if (rest) assignPathField(current, rest); + continue; + } + + if (current) { + assignPathField(current, trimmed); + } + } + flush(); + + return paths; +} + +function findMatchingBracket(source: string, startIndex: number, open: string, close: string): number { + let depth = 0; + let quote: '"' | "'" | '`' | null = null; + let escaped = false; + + for (let i = startIndex; i < source.length; i += 1) { + const ch = source[i] as '"' | "'" | '`' | string; + if (quote) { + if (escaped) { + escaped = false; + } else if (ch === '\\') { + escaped = true; + } else if (ch === quote) { + quote = null; + } + continue; + } + if (ch === '"' || ch === "'" || ch === '`') { + quote = ch; + continue; + } + if (ch === open) { + depth += 1; + } else if (ch === close) { + depth -= 1; + if (depth === 0) return i; + } + } + + return -1; +} + +function extractPathArrayLiterals(source: string): string[] { + const literals: string[] = []; + + const propertyPattern = /\bpaths\s*:/g; + let propertyMatch: RegExpExecArray | null; + while ((propertyMatch = propertyPattern.exec(source)) !== null) { + const arrayStart = source.indexOf('[', propertyPattern.lastIndex); + if (arrayStart === -1) continue; + const arrayEnd = findMatchingBracket(source, arrayStart, '[', ']'); + if (arrayEnd !== -1) { + literals.push(source.slice(arrayStart, arrayEnd + 1)); + propertyPattern.lastIndex = arrayEnd + 1; + } + } + + const methodPattern = /\.paths\s*\(/g; + let methodMatch: RegExpExecArray | null; + while ((methodMatch = methodPattern.exec(source)) !== null) { + const arrayStart = source.indexOf('[', methodPattern.lastIndex); + if (arrayStart === -1) continue; + const arrayEnd = findMatchingBracket(source, arrayStart, '[', ']'); + if (arrayEnd !== -1) { + literals.push(source.slice(arrayStart, arrayEnd + 1)); + methodPattern.lastIndex = arrayEnd + 1; + } + } + + return literals; +} + +function extractObjectLiterals(arrayLiteral: string): string[] { + const objects: string[] = []; + for (let i = 0; i < arrayLiteral.length; i += 1) { + if (arrayLiteral[i] !== '{') continue; + const end = findMatchingBracket(arrayLiteral, i, '{', '}'); + if (end === -1) break; + objects.push(arrayLiteral.slice(i, end + 1)); + i = end; + } + return objects; +} + +function readStringProperty(objectLiteral: string, propertyName: string): string | null { + const pattern = new RegExp(`\\b${propertyName}\\s*:\\s*(['"])(.*?)\\1`, 's'); + const match = objectLiteral.match(pattern); + return match?.[2] ?? null; +} + +function parseTypeScriptWorkflowPaths(content: string): WorkflowPathDefinition[] { + const paths: WorkflowPathDefinition[] = []; + for (const literal of extractPathArrayLiterals(content)) { + for (const objectLiteral of extractObjectLiterals(literal)) { + const name = readStringProperty(objectLiteral, 'name'); + const pathValue = readStringProperty(objectLiteral, 'path'); + if (name && pathValue) { + const pushBranch = readStringProperty(objectLiteral, 'pushBranch'); + const pushBase = readStringProperty(objectLiteral, 'pushBase'); + const pushPrBody = readStringProperty(objectLiteral, 'pushPrBody'); + paths.push({ + name, + path: pathValue, + ...(pushBranch ? { pushBranch } : {}), + ...(pushBase ? { pushBase } : {}), + ...(pushPrBody ? { pushPrBody } : {}), + }); + } + } + } + return paths; +} + +export function parseWorkflowPaths(content: string, fileType: WorkflowFileType): WorkflowPathDefinition[] { + if (fileType === 'yaml') { + return parseYamlWorkflowPaths(content); + } + if (fileType === 'ts') { + return parseTypeScriptWorkflowPaths(content); + } + return []; +} + async function validateTypeScriptWorkflow(content: string): Promise { // Strategy: use bun's built-in TS transpiler when available (the CLI is // bun-compiled, so this covers the common case with zero external deps). @@ -153,6 +368,52 @@ export function shouldSyncCodeByDefault(_workflowArg: string, _explicitFileType? return true; } +function normalizeRepoName(repoName: string): string { + return repoName.replace(/\.git$/i, ''); +} + +function parseGitHubPath(pathname: string): { repoOwner: string; repoName: string } | null { + const parts = pathname.replace(/^\/+|\/+$/g, '').split('/'); + if (parts.length < 2) return null; + const repoOwner = parts[0]; + const repoName = normalizeRepoName(parts[1]); + if (!repoOwner || !repoName) return null; + return { repoOwner, repoName }; +} + +export function parseGitHubRemote(remote: string): { repoOwner: string; repoName: string } | null { + const trimmed = remote.trim(); + const scpMatch = trimmed.match(/^git@github\.com:([^/]+)\/(.+?)\/?$/i); + if (scpMatch) { + return { + repoOwner: scpMatch[1], + repoName: normalizeRepoName(scpMatch[2]), + }; + } + + try { + const url = new URL(trimmed); + if (url.hostname.toLowerCase() !== 'github.com') return null; + if (url.protocol !== 'https:' && url.protocol !== 'ssh:') return null; + return parseGitHubPath(url.pathname); + } catch { + return null; + } +} + +function parseGitHubRemoteForPath(absPath: string): { repoOwner: string; repoName: string } | null { + try { + const remote = execFileSync('git', ['-C', absPath, 'remote', 'get-url', 'origin'], { + encoding: 'utf8', + stdio: ['ignore', 'pipe', 'ignore'], + timeout: 5000, + }); + return parseGitHubRemote(remote); + } catch { + return null; + } +} + export async function resolveWorkflowInput( workflowArg: string, explicitFileType?: WorkflowFileType @@ -238,42 +499,100 @@ export async function runWorkflow( const prepared = prepPayload; console.error(` Prepared in ${((Date.now() - t0) / 1000).toFixed(1)}s`); - const t1 = Date.now(); - console.error('Creating tarball...'); const s3Client = createScopedS3Client(prepared.s3Credentials); - const tarball = await createTarball(process.cwd()); - console.error( - ` Tarball: ${(tarball.length / 1024).toFixed(0)}KB in ${((Date.now() - t1) / 1000).toFixed(1)}s` - ); + requestBody.runId = prepared.runId; - const t2 = Date.now(); - console.error('Uploading to S3...'); - const key = scopedCodeKey(prepared.s3Credentials.prefix, prepared.s3CodeKey); - await s3Client.send( - new PutObjectCommand({ - Bucket: prepared.s3Credentials.bucket, - Key: key, - Body: tarball, - ContentType: 'application/gzip', - }) - ); - console.error(` Uploaded in ${((Date.now() - t2) / 1000).toFixed(1)}s`); + const declaredPaths = parseWorkflowPaths(input.workflow, input.fileType); + if (declaredPaths.length > 0) { + const seenNames = new Set(); + const pathSubmissions: PathSubmission[] = []; + const resolvedPathRoots: string[] = []; - requestBody.runId = prepared.runId; - requestBody.s3CodeKey = prepared.s3CodeKey; - - // Send the workflow's path inside the synced tarball so the cloud - // launcher can set WORKFLOW_FILE directly — no $HOME upload dance, - // sibling-relative imports (e.g. `../shared/models.ts`) resolve - // against the repo layout. The tarball was produced from - // process.cwd(), so relativize the user-typed argument against cwd. - // - // Absolute paths outside cwd OR paths that would escape the tarball - // via `..` are dropped silently — the server falls back to the - // legacy $HOME upload path in that case. - const workflowPath = relativizeWorkflowPath(workflowArg); - if (workflowPath) { - requestBody.workflowPath = workflowPath; + console.error(`Creating ${declaredPaths.length} path tarball(s)...`); + for (const pathDef of declaredPaths) { + if (!PATH_NAME_RE.test(pathDef.name) || seenNames.has(pathDef.name)) { + throw new Error(`Invalid or duplicate workflow path name: ${pathDef.name}`); + } + seenNames.add(pathDef.name); + + const absolutePath = path.resolve(process.cwd(), pathDef.path); + resolvedPathRoots.push(absolutePath); + const s3CodeKey = `code-${pathDef.name}.tar.gz`; + + const t1 = Date.now(); + const tarball = await createTarball(absolutePath); + console.error( + ` ${pathDef.name}: ${(tarball.length / 1024).toFixed(0)}KB in ${((Date.now() - t1) / 1000).toFixed(1)}s` + ); + + const t2 = Date.now(); + const key = scopedCodeKey(prepared.s3Credentials.prefix, s3CodeKey); + await s3Client.send( + new PutObjectCommand({ + Bucket: prepared.s3Credentials.bucket, + Key: key, + Body: tarball, + ContentType: 'application/gzip', + }) + ); + console.error(` ${pathDef.name}: uploaded in ${((Date.now() - t2) / 1000).toFixed(1)}s`); + + const repo = parseGitHubRemoteForPath(absolutePath); + pathSubmissions.push({ + name: pathDef.name, + s3CodeKey, + ...(repo ? { repoOwner: repo.repoOwner, repoName: repo.repoName } : {}), + ...(pathDef.pushBranch ? { pushBranch: pathDef.pushBranch } : {}), + ...(pathDef.pushBase ? { pushBase: pathDef.pushBase } : {}), + ...(pathDef.pushPrBody ? { pushPrBody: pathDef.pushPrBody } : {}), + }); + } + + requestBody.paths = pathSubmissions; + let workflowPath: string | null = null; + for (const root of resolvedPathRoots) { + workflowPath = relativizeWorkflowPathFromRoot(workflowArg, root); + if (workflowPath) break; + } + if (workflowPath) { + requestBody.workflowPath = workflowPath; + } + } else { + const t1 = Date.now(); + console.error('Creating tarball...'); + const tarball = await createTarball(process.cwd()); + console.error( + ` Tarball: ${(tarball.length / 1024).toFixed(0)}KB in ${((Date.now() - t1) / 1000).toFixed(1)}s` + ); + + const t2 = Date.now(); + console.error('Uploading to S3...'); + const key = scopedCodeKey(prepared.s3Credentials.prefix, prepared.s3CodeKey); + await s3Client.send( + new PutObjectCommand({ + Bucket: prepared.s3Credentials.bucket, + Key: key, + Body: tarball, + ContentType: 'application/gzip', + }) + ); + console.error(` Uploaded in ${((Date.now() - t2) / 1000).toFixed(1)}s`); + + requestBody.s3CodeKey = prepared.s3CodeKey; + + // Send the workflow's path inside the synced tarball so the cloud + // launcher can set WORKFLOW_FILE directly — no $HOME upload dance, + // sibling-relative imports (e.g. `../shared/models.ts`) resolve + // against the repo layout. The tarball was produced from + // process.cwd(), so relativize the user-typed argument against cwd. + // + // Absolute paths outside cwd OR paths that would escape the tarball + // via `..` are dropped silently — the server falls back to the + // legacy $HOME upload path in that case. + const workflowPath = relativizeWorkflowPath(workflowArg); + if (workflowPath) { + requestBody.workflowPath = workflowPath; + } } } @@ -440,7 +759,8 @@ export async function syncWorkflowPatch( if ( !payload || typeof payload !== 'object' || - typeof (payload as { hasChanges?: unknown }).hasChanges !== 'boolean' + (typeof (payload as { hasChanges?: unknown }).hasChanges !== 'boolean' && + (!('patches' in payload) || typeof (payload as { patches?: unknown }).patches !== 'object')) ) { throw new Error('Patch response was not valid JSON.'); } @@ -526,6 +846,7 @@ async function createTarball(rootDir: string): Promise { cwd: absoluteRoot, encoding: 'utf-8', maxBuffer: 50 * 1024 * 1024, + stdio: ['ignore', 'pipe', 'ignore'], }); const files = gitFiles.split('\0').filter(Boolean); if (files.length > 0) { @@ -599,8 +920,12 @@ function scopedCodeKey(prefix: string, key: string): string { * behaviour this field was added to fix). */ export function relativizeWorkflowPath(workflowArg: string): string | null { + return relativizeWorkflowPathFromRoot(workflowArg, process.cwd()); +} + +function relativizeWorkflowPathFromRoot(workflowArg: string, rootDir: string): string | null { const absolute = path.resolve(process.cwd(), workflowArg); - let rel = path.relative(process.cwd(), absolute); + let rel = path.relative(rootDir, absolute); if (rel.length === 0) return null; // Normalize to forward slashes so the server-side validator (which // runs on Linux Lambda) gets the same shape regardless of the CLI OS. diff --git a/src/cli/commands/cloud.test.ts b/src/cli/commands/cloud.test.ts index 56a11eb2a..fa22c3932 100644 --- a/src/cli/commands/cloud.test.ts +++ b/src/cli/commands/cloud.test.ts @@ -1,8 +1,37 @@ import { Command } from 'commander'; -import { describe, expect, it, vi } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const cloudMocks = vi.hoisted(() => ({ + runWorkflow: vi.fn(), + getRunStatus: vi.fn(), + syncWorkflowPatch: vi.fn(), +})); + +vi.mock('@agent-relay/cloud', () => ({ + AUTH_FILE_PATH: '/tmp/cloud-auth.json', + REFRESH_WINDOW_MS: 60_000, + authorizedApiFetch: vi.fn(), + cancelWorkflow: vi.fn(), + clearStoredAuth: vi.fn(), + defaultApiUrl: () => 'https://cloud.test', + ensureAuthenticated: vi.fn(), + getRunLogs: vi.fn(), + getRunStatus: (...args: unknown[]) => cloudMocks.getRunStatus(...args), + readStoredAuth: vi.fn(), + runWorkflow: (...args: unknown[]) => cloudMocks.runWorkflow(...args), + syncWorkflowPatch: (...args: unknown[]) => cloudMocks.syncWorkflowPatch(...args), +})); + +vi.mock('@agent-relay/telemetry', () => ({ + track: vi.fn(), +})); import { registerCloudCommands, type CloudDependencies } from './cloud.js'; +beforeEach(() => { + vi.clearAllMocks(); +}); + function createHarness() { const exit = vi.fn((code: number) => { throw new Error(`exit:${code}`); @@ -103,4 +132,102 @@ describe('registerCloudCommands', () => { expect(cancel?.registeredArguments[0]?.required).toBe(true); expect(cancel?.registeredArguments[0]?.name()).toBe('runId'); }); + + it('cloud run renders pushed PR and push errors for patches', async () => { + const { program, deps } = createHarness(); + cloudMocks.runWorkflow.mockResolvedValueOnce({ + runId: 'run-1', + status: 'completed', + patches: { + cloud: { + s3Key: 'user/run/changes-cloud.patch', + pushedTo: { + branch: 'agent-relay/run-run-1', + prUrl: 'https://github.com/acme/cloud/pull/12', + sha: 'abc123', + base: { branch: 'main', sha: 'base123' }, + }, + }, + relay: { + s3Key: 'user/run/changes-relay.patch', + pushError: { + code: 'base_branch_moved', + message: 'Base branch moved', + }, + }, + }, + }); + + await program.parseAsync(['node', 'agent-relay', 'cloud', 'run', 'workflow.yaml']); + + expect(deps.log).toHaveBeenCalledWith('Patches:'); + expect(deps.log).toHaveBeenCalledWith( + ' cloud: https://github.com/acme/cloud/pull/12 (agent-relay/run-run-1)' + ); + expect(deps.log).toHaveBeenCalledWith(' relay: push failed: base_branch_moved: Base branch moved'); + }); + + it('cloud sync refuses to apply multi-path responses (no silent data loss)', async () => { + const { program, deps } = createHarness(); + cloudMocks.syncWorkflowPatch.mockResolvedValueOnce({ + patches: { + cloud: { patch: 'diff --git a/x b/x\n', hasChanges: true }, + relay: { patch: 'diff --git a/y b/y\n', hasChanges: true }, + }, + }); + + await expect(program.parseAsync(['node', 'agent-relay', 'cloud', 'sync', 'run-42'])).rejects.toThrow( + 'exit:1' + ); + + expect(deps.error).toHaveBeenCalledWith(expect.stringContaining('2 per-path patches (cloud, relay)')); + expect(deps.log).not.toHaveBeenCalledWith('No changes to sync — the workflow did not modify any files.'); + }); + + it('cloud sync --dry-run prints each multi-path patch', async () => { + const { program, deps } = createHarness(); + cloudMocks.syncWorkflowPatch.mockResolvedValueOnce({ + patches: { + cloud: { patch: 'CLOUD_PATCH_BODY', hasChanges: true }, + relay: { patch: '', hasChanges: false }, + }, + }); + + await program.parseAsync(['node', 'agent-relay', 'cloud', 'sync', 'run-42', '--dry-run']); + + expect(deps.log).toHaveBeenCalledWith(expect.stringContaining('Patch for "cloud" (dry run)')); + expect(deps.log).not.toHaveBeenCalledWith(expect.stringContaining('Patch for "relay"')); + }); + + it('cloud sync reports no-changes when multi-path response has all empty patches', async () => { + const { program, deps } = createHarness(); + cloudMocks.syncWorkflowPatch.mockResolvedValueOnce({ + patches: { + cloud: { patch: '', hasChanges: false }, + relay: { patch: '', hasChanges: false }, + }, + }); + + await program.parseAsync(['node', 'agent-relay', 'cloud', 'sync', 'run-42']); + + expect(deps.log).toHaveBeenCalledWith('No changes to sync — the workflow did not modify any files.'); + }); + + it('cloud status renders pending patch push state', async () => { + const { program, deps } = createHarness(); + cloudMocks.getRunStatus.mockResolvedValueOnce({ + runId: 'run-1', + status: 'completed', + patches: { + cloud: { + s3Key: 'user/run/changes-cloud.patch', + }, + }, + }); + + await program.parseAsync(['node', 'agent-relay', 'cloud', 'status', 'run-1']); + + expect(deps.log).toHaveBeenCalledWith('Patches:'); + expect(deps.log).toHaveBeenCalledWith(' cloud: patch pending - run still active'); + }); }); diff --git a/src/cli/commands/cloud.ts b/src/cli/commands/cloud.ts index e8a19ad42..dffe49d34 100644 --- a/src/cli/commands/cloud.ts +++ b/src/cli/commands/cloud.ts @@ -98,6 +98,46 @@ function parseWorkflowFileType(value: string): WorkflowFileType { throw new InvalidArgumentError('Expected workflow type to be one of: yaml, ts, py'); } +function isObject(value: unknown): value is Record { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +function renderPatchPushResults(patches: unknown, log: (...args: unknown[]) => void): void { + if (!isObject(patches)) { + return; + } + + const entries = Object.entries(patches); + if (entries.length === 0) { + return; + } + + log('Patches:'); + for (const [name, rawEntry] of entries) { + if (!isObject(rawEntry)) { + log(` ${name}: patch pending - run still active`); + continue; + } + + const pushedTo = rawEntry.pushedTo; + if (isObject(pushedTo) && typeof pushedTo.prUrl === 'string') { + const branch = typeof pushedTo.branch === 'string' ? ` (${pushedTo.branch})` : ''; + log(` ${name}: ${pushedTo.prUrl}${branch}`); + continue; + } + + const pushError = rawEntry.pushError; + if (isObject(pushError)) { + const code = typeof pushError.code === 'string' ? pushError.code : 'unknown'; + const message = typeof pushError.message === 'string' ? pushError.message : 'push failed'; + log(` ${name}: push failed: ${code}: ${message}`); + continue; + } + + log(` ${name}: patch pending - run still active`); + } +} + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } @@ -450,6 +490,7 @@ export function registerCloudCommands(program: Command, overrides: Partial p.hasChanges); + if (withChanges.length === 0) { + deps.log('No changes to sync — the workflow did not modify any files.'); + return; + } + if (options.dryRun) { + for (const [name, p] of withChanges) { + deps.log(`\n--- Patch for "${name}" (dry run) ---`); + process.stdout.write(p.patch); + deps.log(`\n--- End patch for "${name}" ---`); + } + return; + } + deps.error( + `This run produced ${withChanges.length} per-path patch${withChanges.length === 1 ? '' : 'es'} ` + + `(${withChanges.map(([n]) => n).join(', ')}). "cloud sync" only applies single-patch runs. ` + + `Use --dry-run to inspect each patch, then apply manually in the correct repo.` + ); + deps.exit(1); + return; + } + + if (!result.hasChanges || !result.patch) { deps.log('No changes to sync — the workflow did not modify any files.'); return; }