From 6bb539eee555232c91513fa45fc839b22e96ee03 Mon Sep 17 00:00:00 2001 From: Michael Leone Date: Thu, 7 May 2026 09:58:35 -0400 Subject: [PATCH 1/3] fix(argo-workflows): handle Error phase webhook events Argo Workflows finalize as `Error` (controller/infra failure) or `Failed` (user-code exit). The webhook handler only mapped `Failed`; `Error` events silently dropped at the unmapped-phase guard, leaving jobs stuck in_progress. Map `Error` -> JobStatus.Failure. Drop the workflowName fallback in getJobId since job.id is a UUID and workflowName never matches; surface missing labels as a warn log instead of a guaranteed no-op UPDATE. Add a notInArray guard on the UPDATE so the ~13 near-simultaneous sensor fires per workflow can't regress a terminal job. Add structured logs at every drop/update path so future silent failures stay loud. --- .../argoworkflow/__tests__/workflow.test.ts | 22 +++++++ apps/api/src/routes/argoworkflow/index.ts | 9 +++ apps/api/src/routes/argoworkflow/workflow.ts | 63 ++++++++++++++++--- 3 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts diff --git a/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts b/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts new file mode 100644 index 000000000..7250fb964 --- /dev/null +++ b/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, it } from "vitest"; +import { JobStatus } from "@ctrlplane/validators/jobs"; + +import { mapTriggerToStatus } from "../workflow.js"; + +describe("mapTriggerToStatus", () => { + it.each([ + ["Pending", JobStatus.Pending], + ["Running", JobStatus.InProgress], + ["Succeeded", JobStatus.Successful], + ["Failed", JobStatus.Failure], + ["Error", JobStatus.Failure], + ])("maps Argo phase %s to %s", (phase, expected) => { + expect(mapTriggerToStatus(phase)).toBe(expected); + }); + + it("returns null for unknown phases", () => { + expect(mapTriggerToStatus("Skipped")).toBeNull(); + expect(mapTriggerToStatus("")).toBeNull(); + expect(mapTriggerToStatus("succeeded")).toBeNull(); + }); +}); diff --git a/apps/api/src/routes/argoworkflow/index.ts b/apps/api/src/routes/argoworkflow/index.ts index d211eeec0..6ca49be1c 100644 --- a/apps/api/src/routes/argoworkflow/index.ts +++ b/apps/api/src/routes/argoworkflow/index.ts @@ -5,6 +5,7 @@ import { Router } from "express"; import { eq } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; +import { logger } from "@ctrlplane/logger"; import { handleArgoWorkflow } from "./workflow.js"; @@ -47,6 +48,14 @@ const handleWebhookRequest = async (req: Request, res: Response) => { } const payload = req.body; + logger.info("Argo webhook received", { + jobAgentId: id, + workflowName: payload?.workflowName, + uid: payload?.uid, + phase: payload?.phase, + jobId: payload?.jobId, + eventType: payload?.eventType, + }); await handleArgoWorkflow(payload); res.status(200).send(); }; diff --git a/apps/api/src/routes/argoworkflow/workflow.ts b/apps/api/src/routes/argoworkflow/workflow.ts index 08c392055..544774b0d 100644 --- a/apps/api/src/routes/argoworkflow/workflow.ts +++ b/apps/api/src/routes/argoworkflow/workflow.ts @@ -1,7 +1,8 @@ -import { eq } from "@ctrlplane/db"; +import { and, eq, notInArray } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import { enqueueAllReleaseTargetsDesiredVersion } from "@ctrlplane/db/reconcilers"; import * as schema from "@ctrlplane/db/schema"; +import { logger } from "@ctrlplane/logger"; import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; interface ArgoWorkflowPayload { @@ -16,9 +17,14 @@ interface ArgoWorkflowPayload { eventType: string; } +// Argo Workflow phases: Pending | Running | Succeeded | Failed | Error. +// Error covers controller/infra failures (timeouts, unschedulable pods, exit +// handler crashes); ctrlplane has no separate enum value, so it folds into +// Failure alongside user-code Failed. const statusMap: Record = { Succeeded: JobStatus.Successful, Failed: JobStatus.Failure, + Error: JobStatus.Failure, Running: JobStatus.InProgress, Pending: JobStatus.Pending, }; @@ -26,21 +32,40 @@ const statusMap: Record = { export const mapTriggerToStatus = (trigger: string): JobStatus | null => statusMap[trigger] ?? null; -export const getJobId = (payload: ArgoWorkflowPayload) => - payload.jobId ?? payload.workflowName; +export const getJobId = (payload: ArgoWorkflowPayload) => payload.jobId; export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => { - const { uid, phase, startedAt, finishedAt } = payload; + const { uid, phase, startedAt, finishedAt, workflowName } = payload; const jobId = getJobId(payload); + if (jobId == null) { + logger.warn("Argo webhook missing job-id label, ignoring", { + workflowName, + uid, + phase, + }); + return; + } - const status = statusMap[phase] ?? null; - if (status == null) return; + const status = mapTriggerToStatus(phase); + if (status == null) { + logger.warn("Argo webhook with unmapped phase, ignoring", { + workflowName, + uid, + phase, + jobId, + }); + return; + } const isCompleted = exitedStatus.includes(status); const completedAt = isCompleted && finishedAt != null ? new Date(finishedAt) : null; + // Filter on status NOT IN exitedStatus so a late-arriving non-terminal event + // (Running/Pending) cannot regress a job that already settled to a terminal + // state. The sensor fans out ~13 near-simultaneous fires per workflow; this + // makes the handler idempotent without a separate read+transaction. const [updated] = await db .update(schema.job) .set({ @@ -50,10 +75,32 @@ export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => { completedAt, updatedAt: new Date(), }) - .where(eq(schema.job.id, jobId)) + .where( + and( + eq(schema.job.id, jobId), + notInArray(schema.job.status, exitedStatus), + ), + ) .returning(); - if (updated == null) return; + if (updated == null) { + logger.info("Argo webhook produced no update", { + workflowName, + uid, + phase, + jobId, + mappedStatus: status, + }); + return; + } + + logger.info("Argo webhook updated job", { + workflowName, + uid, + phase, + jobId, + mappedStatus: status, + }); const result = await db .select({ workspaceId: schema.deployment.workspaceId }) From be907dc62e854d2c45e7a0237cda52fe74fe7e35 Mon Sep 17 00:00:00 2001 From: Michael Leone Date: Thu, 7 May 2026 10:18:05 -0400 Subject: [PATCH 2/3] fix(api/dockerfile): add /bin suffix to PNPM_HOME on PATH pnpm 11 places global binaries in $PNPM_HOME/bin, not $PNPM_HOME directly. The previous PATH entry pointed at /pnpm so pnpm refused to install globals with "configured global bin directory /pnpm/bin is not in PATH". Surfaced when corepack@latest pulled pnpm 11.0.8; older pnpm tolerated the mismatch. --- apps/api/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/Dockerfile b/apps/api/Dockerfile index d319fc9b4..5016c1b10 100644 --- a/apps/api/Dockerfile +++ b/apps/api/Dockerfile @@ -4,7 +4,7 @@ FROM node:${NODE_VERSION}-alpine RUN apk add --no-cache libc6-compat python3 make g++ curl ENV PNPM_HOME="/pnpm" -ENV PATH="$PNPM_HOME:$PATH" +ENV PATH="$PNPM_HOME/bin:$PATH" RUN npm install -g turbo RUN npm install -g corepack@latest From e5a458f15e0923a373c0760622c124fc126be153 Mon Sep 17 00:00:00 2001 From: Michael Leone Date: Thu, 7 May 2026 10:30:33 -0400 Subject: [PATCH 3/3] =?UTF-8?q?chore(argo-workflows):=20type=20webhook=20p?= =?UTF-8?q?ayload=20at=20handler=20entry=20Export=20ArgoWorkflowPayload=20?= =?UTF-8?q?and=20cast=20req.body=20to=20it=20in=20the=20route=20handler=20?= =?UTF-8?q?so=20field=20access=20in=20the=20entry=20log=20and=20the=20call?= =?UTF-8?q?=20into=20handleArgoWorkflow=20is=20checked=20by=20tsc=20instea?= =?UTF-8?q?d=20of=20relying=20on=20Express's=20`any`-typed=20body.=20Drop?= =?UTF-8?q?=20the=20now-redundant=20optional=20chaining=20on=20payload=20f?= =?UTF-8?q?ields=20(lint=20flagged=20them=20as=20unnecessary).=20No=20runt?= =?UTF-8?q?ime=20validation=20added=20=E2=80=94=20webhook=20stays=20auth-g?= =?UTF-8?q?ated=20by=20shared=20secret,=20matching=20the=20TFE=20and=20Git?= =?UTF-8?q?Hub=20webhook=20handlers.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/api/src/routes/argoworkflow/index.ts | 13 +++++++------ apps/api/src/routes/argoworkflow/workflow.ts | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/api/src/routes/argoworkflow/index.ts b/apps/api/src/routes/argoworkflow/index.ts index 6ca49be1c..b49b62958 100644 --- a/apps/api/src/routes/argoworkflow/index.ts +++ b/apps/api/src/routes/argoworkflow/index.ts @@ -7,6 +7,7 @@ import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; import { logger } from "@ctrlplane/logger"; +import type { ArgoWorkflowPayload } from "./workflow.js"; import { handleArgoWorkflow } from "./workflow.js"; export const createArgoWorkflowRouter = (): Router => @@ -47,14 +48,14 @@ const handleWebhookRequest = async (req: Request, res: Response) => { return; } - const payload = req.body; + const payload = req.body as ArgoWorkflowPayload; logger.info("Argo webhook received", { jobAgentId: id, - workflowName: payload?.workflowName, - uid: payload?.uid, - phase: payload?.phase, - jobId: payload?.jobId, - eventType: payload?.eventType, + workflowName: payload.workflowName, + uid: payload.uid, + phase: payload.phase, + jobId: payload.jobId, + eventType: payload.eventType, }); await handleArgoWorkflow(payload); res.status(200).send(); diff --git a/apps/api/src/routes/argoworkflow/workflow.ts b/apps/api/src/routes/argoworkflow/workflow.ts index 544774b0d..2017bc1d5 100644 --- a/apps/api/src/routes/argoworkflow/workflow.ts +++ b/apps/api/src/routes/argoworkflow/workflow.ts @@ -5,7 +5,7 @@ import * as schema from "@ctrlplane/db/schema"; import { logger } from "@ctrlplane/logger"; import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; -interface ArgoWorkflowPayload { +export interface ArgoWorkflowPayload { workflowName: string; namespace: string; uid: string;