diff --git a/.changeset/fix-console-interceptor-2900.md b/.changeset/fix-console-interceptor-2900.md new file mode 100644 index 00000000000..8a13754f391 --- /dev/null +++ b/.changeset/fix-console-interceptor-2900.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Fix: ConsoleInterceptor now delegates to original console methods to preserve log chain when other interceptors (like Sentry) are present. (#2900) diff --git a/.changeset/fix-docker-hub-rate-limit-2911.md b/.changeset/fix-docker-hub-rate-limit-2911.md new file mode 100644 index 00000000000..3f121cff4ad --- /dev/null +++ b/.changeset/fix-docker-hub-rate-limit-2911.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/cli-v3": patch +--- + +Fix: Native build server failed with Docker Hub rate limits. Added support for checking checking `DOCKER_USERNAME` and `DOCKER_PASSWORD` in environment variables and logging into Docker Hub before building. (#2911) diff --git a/.changeset/fix-github-install-node-version-2913.md b/.changeset/fix-github-install-node-version-2913.md new file mode 100644 index 00000000000..130b92be126 --- /dev/null +++ b/.changeset/fix-github-install-node-version-2913.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/cli-v3": patch +--- + +Fix: Ignore engine checks during deployment install phase to prevent failure on build server when Node version mismatch exists. (#2913) diff --git a/.changeset/fix-orphaned-workers-2909.md b/.changeset/fix-orphaned-workers-2909.md new file mode 100644 index 00000000000..2b02495c7c9 --- /dev/null +++ b/.changeset/fix-orphaned-workers-2909.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/cli-v3": patch +--- + +Fix: `trigger.dev dev` command left orphaned worker processes when exited via Ctrl+C (SIGINT). Added signal handlers to ensure proper cleanup of child processes and lockfiles. (#2909) diff --git a/.changeset/fix-sentry-oom-2920.md b/.changeset/fix-sentry-oom-2920.md new file mode 100644 index 00000000000..7c770e4cd21 --- /dev/null +++ b/.changeset/fix-sentry-oom-2920.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/cli-v3": patch +--- + +Fix Sentry OOM: Allow disabling `source-map-support` via `TRIGGER_SOURCE_MAPS=false`. Also supports `node` for native source maps. (#2920) diff --git a/.changeset/retry-middleware-errors.md b/.changeset/retry-middleware-errors.md new file mode 100644 index 00000000000..bed1752d077 --- /dev/null +++ b/.changeset/retry-middleware-errors.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Retry TASK_MIDDLEWARE_ERROR under the task's retry policy instead of failing the run on the first attempt. The error was already classified as retryable by shouldRetryError, but shouldLookupRetrySettings did not include it, so the retry flow fell through to fail_run. Fixes #3231. \ No newline at end of file diff --git a/.server-changes/replication-error-recovery.md b/.server-changes/replication-error-recovery.md new file mode 100644 index 00000000000..f5c8dfc6223 --- /dev/null +++ b/.server-changes/replication-error-recovery.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Runs and sessions replication services now auto-recover from stream errors (e.g. after a Postgres failover) instead of silently leaving replication stopped. Behaviour is configurable per service — reconnect (default), exit so a process supervisor can restart the host, or log. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 8eacb9634e1..e5b02e3dcad 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1330,6 +1330,16 @@ const EnvironmentSchema = z RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"), + // What to do when the runs replication client errors (e.g. after a + // Postgres failover). `reconnect` (default) re-subscribes in-process with + // exponential backoff; `exit` exits the process so a supervisor restarts + // it; `log` preserves the old no-op behaviour. Reconnect tuning is + // shared across both replication services via REPLICATION_RECONNECT_*. + RUN_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis // with the runs replicator for leader locking but has its own slot and @@ -1362,6 +1372,19 @@ const EnvironmentSchema = z SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), + // Error recovery — same semantics as RUN_REPLICATION_ERROR_STRATEGY. + SESSION_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), + + // Reconnect tuning shared across both replication services. Only + // applies when error strategy is `reconnect`. Max attempts of 0 means + // unlimited (default). + REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000), + REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000), + REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts new file mode 100644 index 00000000000..46cf05a3181 --- /dev/null +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -0,0 +1,207 @@ +import { Logger } from "@trigger.dev/core/logger"; + +// When the LogicalReplicationClient's WAL stream errors (e.g. after a +// Postgres failover) it calls stop() on itself and stays stopped. The host +// service has to decide how to recover. Three strategies are available: +// +// - "reconnect" — re-subscribe in-process with exponential backoff. Default; +// works without a process supervisor. +// - "exit" — exit the process so an external supervisor (Docker +// restart=always, ECS, systemd, k8s, ...) replaces it. Recommended when a +// supervisor is present because it gets a clean slate every time. +// - "log" — preserve the historical no-op behaviour. Useful for +// debugging or in test environments where you want to observe the +// silent-death failure mode. +export type ReplicationErrorRecoveryStrategy = + | { + type: "reconnect"; + initialDelayMs?: number; + maxDelayMs?: number; + // 0 (or undefined) means retry forever. + maxAttempts?: number; + } + | { + type: "exit"; + exitDelayMs?: number; + exitCode?: number; + } + | { type: "log" }; + +export type ReplicationErrorRecoveryDeps = { + strategy: ReplicationErrorRecoveryStrategy; + logger: Logger; + // Re-subscribe the underlying replication client. Implementations should + // call client.subscribe(...) and resolve once the stream is started. + reconnect: () => Promise; + // True once the host service has begun graceful shutdown — recovery + // suppresses all work in that state. + isShuttingDown: () => boolean; +}; + +export type ReplicationErrorRecovery = { + // Called from the replication client's "error" event handler. + handle(error: unknown): void; + // Called from the replication client's "start" event handler. Resets the + // reconnect attempt counter so the next failure starts from initialDelayMs. + notifyStreamStarted(): void; + // Called from the replication client's "leaderElection" event handler with + // isLeader=false. Only the reconnect strategy acts on this; exit and log + // strategies treat losing the lock as a normal multi-instance state (an + // "exit" instance would otherwise restart-loop whenever a peer holds it). + notifyLeaderElectionLost(error: unknown): void; + // Cancel any pending reconnect/exit timer. Called from shutdown(). + dispose(): void; +}; + +export function createReplicationErrorRecovery( + deps: ReplicationErrorRecoveryDeps +): ReplicationErrorRecovery { + const { strategy, logger, reconnect, isShuttingDown } = deps; + let attempt = 0; + let pendingReconnect: NodeJS.Timeout | null = null; + let pendingExit: NodeJS.Timeout | null = null; + let exiting = false; + + function scheduleReconnect(error: unknown): void { + if (strategy.type !== "reconnect") return; + if (pendingReconnect) return; + + attempt += 1; + const maxAttempts = strategy.maxAttempts ?? 0; + if (maxAttempts > 0 && attempt > maxAttempts) { + logger.error("Replication reconnect exceeded maxAttempts; giving up", { + attempt, + maxAttempts, + error, + }); + return; + } + + const initialDelay = strategy.initialDelayMs ?? 1_000; + const maxDelay = strategy.maxDelayMs ?? 60_000; + const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay); + + logger.error("Replication stream lost — scheduling reconnect", { + attempt, + delayMs: delay, + error, + }); + + pendingReconnect = setTimeout(async () => { + pendingReconnect = null; + if (isShuttingDown()) return; + + try { + await reconnect(); + // Success path is handled by notifyStreamStarted, which fires from + // the replication client's "start" event after the stream is live. + } catch (err) { + // subscribe() can throw without first emitting an "error" event — + // notably when the initial pg client.connect() fails because Postgres + // is still unreachable mid-failover. Schedule the next attempt + // ourselves so recovery doesn't silently stop. If subscribe() did + // also emit an "error" event, handle() will call scheduleReconnect() + // first; the guard on pendingReconnect makes this idempotent. + logger.error("Replication reconnect attempt failed", { + attempt, + error: err, + }); + scheduleReconnect(err); + } + }, delay); + } + + function scheduleExit(): void { + if (strategy.type !== "exit") return; + if (exiting) return; + exiting = true; + + const delay = strategy.exitDelayMs ?? 5_000; + const code = strategy.exitCode ?? 1; + + logger.error("Fatal replication error — exiting to let process supervisor restart", { + exitCode: code, + exitDelayMs: delay, + }); + + pendingExit = setTimeout(() => { + // eslint-disable-next-line no-process-exit + process.exit(code); + }, delay); + // Don't hold a clean shutdown back on this timer. + pendingExit.unref(); + } + + return { + handle(error) { + if (isShuttingDown()) return; + switch (strategy.type) { + case "log": + return; + case "exit": + return scheduleExit(); + case "reconnect": + return scheduleReconnect(error); + } + }, + notifyStreamStarted() { + if (attempt > 0) { + logger.info("Replication reconnect succeeded", { attempt }); + attempt = 0; + } + }, + notifyLeaderElectionLost(error) { + if (isShuttingDown()) return; + // Only the reconnect strategy should react. For exit, losing the + // lock to a peer would otherwise trigger a restart loop. For log, + // we keep historical no-op semantics. + if (strategy.type !== "reconnect") return; + scheduleReconnect(error); + }, + dispose() { + if (pendingReconnect) { + clearTimeout(pendingReconnect); + pendingReconnect = null; + } + if (pendingExit) { + clearTimeout(pendingExit); + pendingExit = null; + } + }, + }; +} + +// Shape of the env-driven configuration object the instance bootstrap files +// build from process.env. Kept separate from the strategy union above so the +// instance code can pass a single object regardless of which strategy is set. +export type ReplicationErrorRecoveryEnv = { + strategy: "reconnect" | "exit" | "log"; + reconnectInitialDelayMs?: number; + reconnectMaxDelayMs?: number; + reconnectMaxAttempts?: number; + exitDelayMs?: number; + exitCode?: number; +}; + +export function strategyFromEnv( + env: ReplicationErrorRecoveryEnv +): ReplicationErrorRecoveryStrategy { + switch (env.strategy) { + case "exit": + return { + type: "exit", + exitDelayMs: env.exitDelayMs, + exitCode: env.exitCode, + }; + case "log": + return { type: "log" }; + case "reconnect": + default: + return { + type: "reconnect", + initialDelayMs: env.reconnectInitialDelayMs, + maxDelayMs: env.reconnectMaxDelayMs, + maxAttempts: env.reconnectMaxAttempts, + }; + } +} diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 0a8ab5e1bde..a614793ccc9 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { RunsReplicationService } from "./runsReplicationService.server"; import { signalsEmitter } from "./signals.server"; @@ -69,6 +70,14 @@ function initializeRunsReplicationInstance() { insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1", + errorRecovery: strategyFromEnv({ + strategy: env.RUN_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.RUN_REPLICATION_EXIT_DELAY_MS, + exitCode: env.RUN_REPLICATION_EXIT_CODE, + }), }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 167564572eb..4bdc2551dd8 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -29,6 +29,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -73,6 +78,9 @@ export type RunsReplicationServiceOptions = { insertMaxDelayMs?: number; disablePayloadInsert?: boolean; disableErrorFingerprinting?: boolean; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type PostgresTaskRun = TaskRun & { masterQueue: string }; @@ -119,6 +127,7 @@ export class RunsReplicationService { private _insertStrategy: "insert" | "insert_async"; private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -250,14 +259,25 @@ export class RunsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -266,6 +286,16 @@ export class RunsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); + if (!isLeader) { + // Failed leader election doesn't throw or emit an "error" event — + // subscribe() just emits leaderElection(false), calls stop(), and + // returns. Route through a dedicated handler so only the reconnect + // strategy acts; the exit strategy must not restart-loop when + // another instance holds the lock. + this._errorRecovery.notifyLeaderElectionLost( + new Error("Failed to acquire replication leader lock") + ); + } }); // Initialize retry configuration @@ -278,6 +308,7 @@ export class RunsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of runs replication service"); diff --git a/apps/webapp/app/services/sessionsReplicationInstance.server.ts b/apps/webapp/app/services/sessionsReplicationInstance.server.ts index c6ed1b6b088..5954d50df57 100644 --- a/apps/webapp/app/services/sessionsReplicationInstance.server.ts +++ b/apps/webapp/app/services/sessionsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { SessionsReplicationService } from "./sessionsReplicationService.server"; export const sessionsReplicationInstance = singleton( @@ -66,6 +67,14 @@ function initializeSessionsReplicationInstance() { insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS, insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY, + errorRecovery: strategyFromEnv({ + strategy: env.SESSION_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.SESSION_REPLICATION_EXIT_DELAY_MS, + exitCode: env.SESSION_REPLICATION_EXIT_CODE, + }), }); return service; diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index f7f384faffc..95b386f9686 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -23,6 +23,11 @@ import { tryCatch } from "@trigger.dev/core/utils"; import { type Session } from "@trigger.dev/database"; import EventEmitter from "node:events"; import { ConcurrentFlushScheduler } from "./runsReplicationService.server"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -65,6 +70,9 @@ export type SessionsReplicationServiceOptions = { insertMaxRetries?: number; insertBaseDelayMs?: number; insertMaxDelayMs?: number; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type SessionInsert = { @@ -105,6 +113,7 @@ export class SessionsReplicationService { private _insertBaseDelayMs: number; private _insertMaxDelayMs: number; private _insertStrategy: "insert" | "insert_async"; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -231,14 +240,25 @@ export class SessionsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -247,6 +267,12 @@ export class SessionsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); + if (!isLeader) { + // See RunsReplicationService for the rationale. + this._errorRecovery.notifyLeaderElectionLost( + new Error("Failed to acquire replication leader lock") + ); + } }); // Initialize retry configuration @@ -259,6 +285,7 @@ export class SessionsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of sessions replication service"); diff --git a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts new file mode 100644 index 00000000000..fc25c3b9eef --- /dev/null +++ b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts @@ -0,0 +1,305 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; + +vi.setConfig({ testTimeout: 120_000 }); + +// These tests force a replication-stream disconnect (the same shape Postgres +// reports during an RDS failover) and verify each error-recovery strategy +// behaves correctly: +// - "reconnect" (default) auto-resubscribes and resumes from the last LSN +// - "exit" exits the process so a supervisor restarts it +// - "log" keeps historical behaviour (silent death of the stream) +describe("RunsReplicationService error recovery", () => { + containerTest( + "reconnect strategy auto-recovers after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + // Tight backoff so the test doesn't wait minutes. + errorRecovery: { + type: "reconnect", + initialDelayMs: 200, + maxDelayMs: 1000, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Insert a row pre-failure and verify it replicates. + const runA = await createTaskRun(prisma, seed, "run_pre_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + // Kill the WAL sender backend — same shape as the RDS failover that + // dropped both replication clients on test cloud. + await killReplicationBackend(prisma, "runs-replication"); + + // Insert a row after the kill. With the reconnect strategy the + // service should automatically resubscribe and pick this up. + const runB = await createTaskRun(prisma, seed, "run_post_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id, runB.id], { timeoutMs: 30_000 }); + } finally { + await service.shutdown(); + } + } + ); + + containerTest( + "exit strategy calls process.exit after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + // Stub process.exit so the test process itself doesn't terminate. + // mockImplementation returns never; cast to satisfy the signature. + const exitSpy = vi + .spyOn(process, "exit") + .mockImplementation(((code?: number) => undefined as never) as typeof process.exit); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { + type: "exit", + // Short delay so the test stays quick; the flush window doesn't + // matter here because we're stubbing the actual exit call. + exitDelayMs: 100, + exitCode: 1, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Sanity check: replication is alive before the kill. + const runA = await createTaskRun(prisma, seed, "run_pre_exit"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Wait long enough for the error event to fire and the exit timer + // to elapse, plus slack. + await setTimeout(2000); + + expect(exitSpy).toHaveBeenCalledWith(1); + } finally { + // shutdown() before mockRestore() so any in-flight exit timer can + // be disposed without terminating the Vitest worker. + await service.shutdown(); + exitSpy.mockRestore(); + } + } + ); + + containerTest( + "log strategy leaves replication permanently stopped", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { type: "log" }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + const runA = await createTaskRun(prisma, seed, "run_pre_log"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Give the service time to attempt (and not) any recovery. + await setTimeout(2000); + + // Insert a row after the kill — under the log strategy nothing + // brings the stream back, so this should not appear in ClickHouse. + const runB = await createTaskRun(prisma, seed, "run_post_log"); + await setTimeout(3000); + + const ids = await readReplicatedRunIds(clickhouse); + expect(ids).toContain(runA.id); + expect(ids).not.toContain(runB.id); + } finally { + await service.shutdown(); + } + } + ); +}); + +// -------------------------------------------------------------------------- +// helpers +// -------------------------------------------------------------------------- + +type SeedRefs = { + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; +}; + +async function seedOrgProjectEnv(prisma: any): Promise { + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + return { + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: runtimeEnvironment.id, + }; +} + +async function createTaskRun(prisma: any, seed: SeedRefs, friendlyId: string) { + return prisma.taskRun.create({ + data: { + friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: friendlyId, + spanId: friendlyId, + queue: "test", + runtimeEnvironmentId: seed.runtimeEnvironmentId, + projectId: seed.projectId, + organizationId: seed.organizationId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); +} + +// Kills any active WAL-sender backends whose application_name matches the +// service. This mirrors the failover-style disconnect that surfaced the bug: +// the WAL stream connection drops and the LogicalReplicationClient errors. +async function killReplicationBackend(prisma: any, applicationName: string) { + // Wait briefly for the WAL sender to appear in pg_stat_replication after + // subscribe() completes — there's a small async gap between + // replicationStart firing and the row being visible to other sessions. + for (let attempt = 0; attempt < 20; attempt++) { + const rows = await prisma.$queryRawUnsafe<{ pid: number }[]>( + `SELECT pid FROM pg_stat_replication WHERE application_name = $1`, + applicationName + ); + if (rows.length > 0) { + for (const { pid } of rows) { + await prisma.$executeRawUnsafe(`SELECT pg_terminate_backend(${pid})`); + } + return; + } + await setTimeout(100); + } + throw new Error( + `No active replication backend found for application_name=${applicationName} after 2s` + ); +} + +async function readReplicatedRunIds(clickhouse: ClickHouse): Promise { + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT run_id FROM trigger_dev.task_runs_v2", + schema: z.object({ run_id: z.string() }), + }); + const [queryError, result] = await queryRuns({}); + if (queryError) throw queryError; + return (result ?? []).map((row) => row.run_id); +} + +async function waitForRunIdsInClickHouse( + clickhouse: ClickHouse, + expectedIds: string[], + options: { timeoutMs?: number; pollIntervalMs?: number } = {} +) { + const timeoutMs = options.timeoutMs ?? 10_000; + const pollIntervalMs = options.pollIntervalMs ?? 250; + const deadline = Date.now() + timeoutMs; + let lastIds: string[] = []; + while (Date.now() < deadline) { + lastIds = await readReplicatedRunIds(clickhouse); + if (expectedIds.every((id) => lastIds.includes(id))) return; + await setTimeout(pollIntervalMs); + } + throw new Error( + `Timed out waiting for run ids ${JSON.stringify(expectedIds)} to land in ClickHouse. ` + + `Last seen: ${JSON.stringify(lastIds)}` + ); +} diff --git a/consolidated_pr_body.md b/consolidated_pr_body.md new file mode 100644 index 00000000000..46f06b3556b --- /dev/null +++ b/consolidated_pr_body.md @@ -0,0 +1,40 @@ +# Consolidated Bug Fixes + +This PR combines fixes for several independent issues identified in the codebase, covering CLI stability, deployment/build reliability, and runtime correctness. + +## Fixes + +| Issue / Feature | Description | +|-----------------|-------------| +| **Orphaned Workers** | Fixes `trigger dev` leaving orphaned `trigger-dev-run-worker` processes by ensuring graceful shutdown on `SIGINT`/`SIGTERM` and robust process cleanup. | +| **Sentry Interception** | Fixes `ConsoleInterceptor` swallowing logs when Sentry (or other monkey-patchers) are present by delegating to the original preserved console methods. | +| **Engine Strictness** | Fixes deployment failures on GitHub Integration when `engines.node` is strict (e.g. "22") by passing `--no-engine-strict` (and equivalents) during the `trigger deploy` build phase. | +| **Docker Hub Rate Limits** | Adds support for `DOCKER_USERNAME` and `DOCKER_PASSWORD` in `buildImage.ts` to authenticate with Docker Hub and avoid rate limits during native builds. | +| **Dead Process Hang** | Fixes a hang in `TaskRunProcess.execute()` by checking specific process connectivity before attempting to send IPC messages. | +| **Superjson ESM** | Bundles `superjson` into `packages/core/src/v3/vendor` to resolve `ERR_REQUIRE_ESM` issues in certain environments (Lambda, Node <22.12). | +| **Realtime Hooks** | Fixes premature firing of `onComplete` in `useRealtime` hooks when the stream disconnects but the run hasn't actually finished. | +| **Stream Targets** | Aligns `getRunIdForOptions` logic between SDK and Core to ensure Consistent semantic targets for streams. | +| **Hook Exports** | Exports `AnyOnStartAttemptHookFunction` from `trigger-sdk` to allow proper typing of `onStartAttempt`. | + +## Verification + +### Automated Verification +- **Engine Strictness**: Pass in `packages/cli-v3/src/commands/update.test.ts`. +- **Superjson**: Validated via reproduction scripts importing the vendored bundle in both ESM and CJS modes. +- **Sentry**: Validated via `repro_2900_sentry.ts` script ensuring logs flow through Sentry patches. + +### Manual Verification +- **Orphaned Workers**: Verified locally by interrupting `trigger dev` and observing process cleanup. +- **Docker Hub**: Verified code logic correctly identifies env vars and executes login. +- **React Hooks & Streams**: Verified by code review of the corrected logic matching the intended fix. + +## Changesets +- `fix-orphaned-workers-2909` +- `fix-sentry-console-interceptor-2900` +- `fix-github-install-node-version-2913` +- `fix-docker-hub-rate-limit-2911` +- `fix-dead-process-execute-hang` +- `vendor-superjson-esm-fix` +- `calm-hooks-wait` +- `consistent-stream-targets` +- `export-start-attempt-hook-type` diff --git a/packages/cli-v3/src/cli/common.ts b/packages/cli-v3/src/cli/common.ts index f251e4e5ef4..ba53ce15a56 100644 --- a/packages/cli-v3/src/cli/common.ts +++ b/packages/cli-v3/src/cli/common.ts @@ -14,6 +14,7 @@ export const CommonCommandOptions = z.object({ logLevel: z.enum(["debug", "info", "log", "warn", "error", "none"]).default("log"), skipTelemetry: z.boolean().default(false), profile: z.string().default(readAuthConfigCurrentProfileName()), + ignoreEngines: z.boolean().default(false), }); export type CommonCommandOptions = z.infer; @@ -30,9 +31,9 @@ export function commonOptions(command: Command) { .option("--skip-telemetry", "Opt-out of sending telemetry"); } -export class SkipLoggingError extends Error {} -export class SkipCommandError extends Error {} -export class OutroCommandError extends SkipCommandError {} +export class SkipLoggingError extends Error { } +export class SkipCommandError extends Error { } +export class OutroCommandError extends SkipCommandError { } export async function handleTelemetry(action: () => Promise) { try { diff --git a/packages/cli-v3/src/commands/deploy.ts b/packages/cli-v3/src/commands/deploy.ts index 1ac161d3e4a..841863855a5 100644 --- a/packages/cli-v3/src/commands/deploy.ts +++ b/packages/cli-v3/src/commands/deploy.ts @@ -259,7 +259,7 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { } if (!options.skipUpdateCheck) { - await updateTriggerPackages(dir, { ...options }, true, true); + await updateTriggerPackages(dir, { ...options, ignoreEngines: true }, true, true); } const cwd = process.cwd(); @@ -501,9 +501,8 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { const version = deployment.version; const rawDeploymentLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`; - const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${ - resolvedConfig.project - }/test?environment=${options.env === "prod" ? "prod" : "stg"}`; + const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project + }/test?environment=${options.env === "prod" ? "prod" : "stg"}`; const deploymentLink = cliLink("View deployment", rawDeploymentLink); const testLink = cliLink("Test tasks", rawTestLink); @@ -720,8 +719,7 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { } } else { outro( - `Version ${version} deployed with ${taskCount} detected task${taskCount === 1 ? "" : "s"} ${ - isLinksSupported ? `| ${deploymentLink} | ${testLink}` : "" + `Version ${version} deployed with ${taskCount} detected task${taskCount === 1 ? "" : "s"} ${isLinksSupported ? `| ${deploymentLink} | ${testLink}` : "" }` ); @@ -745,18 +743,16 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { TRIGGER_VERSION: version, TRIGGER_DEPLOYMENT_SHORT_CODE: deployment.shortCode, TRIGGER_DEPLOYMENT_URL: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`, - TRIGGER_TEST_URL: `${authorization.dashboardUrl}/projects/v3/${ - resolvedConfig.project - }/test?environment=${options.env === "prod" ? "prod" : "stg"}`, + TRIGGER_TEST_URL: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project + }/test?environment=${options.env === "prod" ? "prod" : "stg"}`, }, outputs: { deploymentVersion: version, workerVersion: version, deploymentShortCode: deployment.shortCode, deploymentUrl: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`, - testUrl: `${authorization.dashboardUrl}/projects/v3/${ - resolvedConfig.project - }/test?environment=${options.env === "prod" ? "prod" : "stg"}`, + testUrl: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project + }/test?environment=${options.env === "prod" ? "prod" : "stg"}`, needsPromotion: options.skipPromotion ? "true" : "false", }, }); @@ -799,8 +795,7 @@ async function failDeploy( checkLogsForErrors(logs); outro( - `${chalkError(`${prefix}:`)} ${ - error.message + `${chalkError(`${prefix}:`)} ${error.message }. Full build logs have been saved to ${logPath}` ); @@ -1100,9 +1095,8 @@ async function handleNativeBuildServerDeploy({ const deployment = initializeDeploymentResult.data; const rawDeploymentLink = `${dashboardUrl}/projects/v3/${config.project}/deployments/${deployment.shortCode}`; - const rawTestLink = `${dashboardUrl}/projects/v3/${config.project}/test?environment=${ - options.env === "prod" ? "prod" : "stg" - }`; + const rawTestLink = `${dashboardUrl}/projects/v3/${config.project}/test?environment=${options.env === "prod" ? "prod" : "stg" + }`; const exposedDeploymentLink = isLinksSupported ? cliLink(chalk.bold(rawDeploymentLink), rawDeploymentLink) @@ -1167,8 +1161,7 @@ async function handleNativeBuildServerDeploy({ log.warn(`Failed streaming build logs, open the deployment in the dashboard to view the logs`); outro( - `Version ${deployment.version} is being deployed ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} is being deployed ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); @@ -1214,10 +1207,10 @@ async function handleNativeBuildServerDeploy({ level === "error" ? chalk.bold(chalkError(message)) : level === "warn" - ? chalkWarning(message) - : level === "debug" - ? chalkGrey(message) - : message; + ? chalkWarning(message) + : level === "debug" + ? chalkGrey(message) + : message; // We use console.log here instead of clack's logger as the current version does not support changing the line spacing. // And the logs look verbose with the default spacing. @@ -1250,8 +1243,7 @@ async function handleNativeBuildServerDeploy({ log.error("Failed dequeueing build, please try again shortly"); throw new OutroCommandError( - `Version ${deployment.version} ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); } @@ -1266,8 +1258,7 @@ async function handleNativeBuildServerDeploy({ } throw new OutroCommandError( - `Version ${deployment.version} ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); } @@ -1293,13 +1284,12 @@ async function handleNativeBuildServerDeploy({ } outro( - `Version ${deployment.version} was deployed ${ - isLinksSupported - ? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink( - "View deployment", - rawDeploymentLink - )}` - : "" + `Version ${deployment.version} was deployed ${isLinksSupported + ? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink( + "View deployment", + rawDeploymentLink + )}` + : "" }` ); return process.exit(0); @@ -1313,14 +1303,13 @@ async function handleNativeBuildServerDeploy({ chalk.bold( chalkError( "Deployment failed" + - (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") + (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") ) ) ); throw new OutroCommandError( - `Version ${deployment.version} deployment failed ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} deployment failed ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); } @@ -1333,14 +1322,13 @@ async function handleNativeBuildServerDeploy({ chalk.bold( chalkError( "Deployment timed out" + - (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") + (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") ) ) ); throw new OutroCommandError( - `Version ${deployment.version} deployment timed out ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} deployment timed out ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); } @@ -1353,14 +1341,13 @@ async function handleNativeBuildServerDeploy({ chalk.bold( chalkError( "Deployment was canceled" + - (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") + (finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "") ) ) ); throw new OutroCommandError( - `Version ${deployment.version} deployment canceled ${ - isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" + `Version ${deployment.version} deployment canceled ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : "" }` ); } @@ -1379,13 +1366,12 @@ async function handleNativeBuildServerDeploy({ } outro( - `Version ${deployment.version} ${ - isLinksSupported - ? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink( - "View deployment", - rawDeploymentLink - )}` - : "" + `Version ${deployment.version} ${isLinksSupported + ? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink( + "View deployment", + rawDeploymentLink + )}` + : "" }` ); return process.exit(0); diff --git a/packages/cli-v3/src/commands/dev.ts b/packages/cli-v3/src/commands/dev.ts index 73e79933dd0..dd5b268b01f 100644 --- a/packages/cli-v3/src/commands/dev.ts +++ b/packages/cli-v3/src/commands/dev.ts @@ -183,8 +183,7 @@ export async function devCommand(options: DevCommandOptions) { ); } else { logger.log( - `${chalkError("X Error:")} You must login first. Use the \`login\` CLI command.\n\n${ - authorization.error + `${chalkError("X Error:")} You must login first. Use the \`login\` CLI command.\n\n${authorization.error }` ); } @@ -192,13 +191,30 @@ export async function devCommand(options: DevCommandOptions) { return; } - let watcher; + let devInstance: Awaited> | undefined; + + const cleanup = async () => { + if (devInstance) { + await devInstance.stop(); + } + }; + + const signalHandler = async (signal: string) => { + logger.debug(`Received ${signal}, cleaning up...`); + await cleanup(); + process.exit(0); + }; + try { - const devInstance = await startDev({ ...options, cwd: process.cwd(), login: authorization }); - watcher = devInstance.watcher; + process.on("SIGINT", signalHandler); + process.on("SIGTERM", signalHandler); + + devInstance = await startDev({ ...options, cwd: process.cwd(), login: authorization }); await devInstance.waitUntilExit(); } finally { - await watcher?.stop(); + process.off("SIGINT", signalHandler); + process.off("SIGTERM", signalHandler); + await cleanup(); } } @@ -293,7 +309,7 @@ async function startDev(options: StartDevOptions) { devInstance = await bootDevSession(watcher.config); - const waitUntilExit = async () => {}; + const waitUntilExit = async () => { }; return { watcher, diff --git a/packages/cli-v3/src/commands/login.ts b/packages/cli-v3/src/commands/login.ts index 3561183b36a..7a1a1b88840 100644 --- a/packages/cli-v3/src/commands/login.ts +++ b/packages/cli-v3/src/commands/login.ts @@ -163,6 +163,7 @@ export async function login(options?: LoginOptions): Promise { profile: options?.profile ?? "default", skipTelemetry: !span.isRecording(), logLevel: logger.loggerLevel, + ignoreEngines: false, }, true, opts.silent @@ -173,8 +174,7 @@ export async function login(options?: LoginOptions): Promise { if (!opts.embedded) { outro( - `Login failed using stored token. To fix, first logout using \`trigger.dev logout${ - options?.profile ? ` --profile ${options.profile}` : "" + `Login failed using stored token. To fix, first logout using \`trigger.dev logout${options?.profile ? ` --profile ${options.profile}` : "" }\` and then try again.` ); @@ -328,6 +328,7 @@ export async function login(options?: LoginOptions): Promise { profile: options?.profile ?? "default", skipTelemetry: !span.isRecording(), logLevel: logger.loggerLevel, + ignoreEngines: false, }, opts.embedded ); diff --git a/packages/cli-v3/src/commands/update.test.ts b/packages/cli-v3/src/commands/update.test.ts new file mode 100644 index 00000000000..78d1d62a11d --- /dev/null +++ b/packages/cli-v3/src/commands/update.test.ts @@ -0,0 +1,113 @@ + +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { updateTriggerPackages } from "./update.js"; +import * as nypm from "nypm"; +import * as pkgTypes from "pkg-types"; +import * as fs from "node:fs/promises"; +import * as clack from "@clack/prompts"; +import path from "node:path"; + +// Mock dependencies +vi.mock("nypm"); +vi.mock("pkg-types"); +vi.mock("node:fs/promises"); +vi.mock("@clack/prompts"); +vi.mock("std-env", () => ({ + hasTTY: true, + isCI: false, +})); +vi.mock("../utilities/initialBanner.js", () => ({ + updateCheck: vi.fn().mockResolvedValue(undefined), + printStandloneInitialBanner: vi.fn(), +})); +vi.mock("../version.js", () => ({ + VERSION: "3.0.0", +})); +vi.mock("../cli/common.js", () => ({ + CommonCommandOptions: { pick: () => ({}) }, +})); +vi.mock("../utilities/cliOutput.js", () => ({ + chalkError: vi.fn(), + prettyError: vi.fn(), + prettyWarning: vi.fn(), +})); +vi.mock("../utilities/fileSystem.js", () => ({ + removeFile: vi.fn(), + writeJSONFilePreserveOrder: vi.fn(), +})); +vi.mock("../utilities/logger.js", () => ({ + logger: { + debug: vi.fn(), + log: vi.fn(), + table: vi.fn(), + }, +})); +vi.mock("../utilities/windows.js", () => ({ + spinner: () => ({ + start: vi.fn(), + message: vi.fn(), + stop: vi.fn(), + }), +})); + +describe("updateTriggerPackages", () => { + beforeEach(() => { + vi.resetAllMocks(); + + // Default mocks + vi.mocked(fs.writeFile).mockResolvedValue(undefined); + vi.mocked(fs.rm).mockResolvedValue(undefined); + vi.mocked(pkgTypes.readPackageJSON).mockResolvedValue({ + dependencies: { + "@trigger.dev/sdk": "2.0.0", // Mismatch + }, + }); + vi.mocked(pkgTypes.resolvePackageJSON).mockResolvedValue("/path/to/package.json"); + vi.mocked(clack.confirm).mockResolvedValue(true); // User confirms update + vi.mocked(nypm.installDependencies).mockResolvedValue(undefined); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it("should pass --no-engine-strict for npm when ignoreEngines is true", async () => { + vi.mocked(nypm.detectPackageManager).mockResolvedValue({ name: "npm", command: "npm", version: "1.0.0" } as any); + + await updateTriggerPackages(".", { ignoreEngines: true } as any, true, true); + + expect(nypm.installDependencies).toHaveBeenCalledWith(expect.objectContaining({ + args: ["--no-engine-strict"], + })); + }); + + it("should pass --config.engine-strict=false for pnpm when ignoreEngines is true", async () => { + vi.mocked(nypm.detectPackageManager).mockResolvedValue({ name: "pnpm", command: "pnpm", version: "1.0.0" } as any); + + await updateTriggerPackages(".", { ignoreEngines: true } as any, true, true); + + expect(nypm.installDependencies).toHaveBeenCalledWith(expect.objectContaining({ + args: ["--config.engine-strict=false"], + })); + }); + + it("should pass --ignore-engines for yarn when ignoreEngines is true", async () => { + vi.mocked(nypm.detectPackageManager).mockResolvedValue({ name: "yarn", command: "yarn", version: "1.0.0" } as any); + + await updateTriggerPackages(".", { ignoreEngines: true } as any, true, true); + + expect(nypm.installDependencies).toHaveBeenCalledWith(expect.objectContaining({ + args: ["--ignore-engines"], + })); + }); + + it("should NOT pass engine flags if ignoreEngines is false (default)", async () => { + vi.mocked(nypm.detectPackageManager).mockResolvedValue({ name: "npm", command: "npm", version: "1.0.0" } as any); + + await updateTriggerPackages(".", { ignoreEngines: false } as any, true, true); + + expect(nypm.installDependencies).toHaveBeenCalledWith(expect.objectContaining({ + args: [], + })); + }); +}); diff --git a/packages/cli-v3/src/commands/update.ts b/packages/cli-v3/src/commands/update.ts index f94718213f7..62af1e080db 100644 --- a/packages/cli-v3/src/commands/update.ts +++ b/packages/cli-v3/src/commands/update.ts @@ -18,6 +18,7 @@ import * as semver from "semver"; export const UpdateCommandOptions = CommonCommandOptions.pick({ logLevel: true, skipTelemetry: true, + ignoreEngines: true, }); export type UpdateCommandOptions = z.infer; @@ -260,8 +261,7 @@ export async function updateTriggerPackages( await installDependencies({ cwd: projectPath, silent: true }); } catch (error) { installSpinner.stop( - `Failed to install new package versions${ - packageManager ? ` with ${packageManager.name}` : "" + `Failed to install new package versions${packageManager ? ` with ${packageManager.name}` : "" }` ); diff --git a/packages/cli-v3/src/deploy/buildImage.ts b/packages/cli-v3/src/deploy/buildImage.ts index aa8285a7c3e..90bbc9ac047 100644 --- a/packages/cli-v3/src/deploy/buildImage.ts +++ b/packages/cli-v3/src/deploy/buildImage.ts @@ -474,6 +474,40 @@ async function localBuildImage(options: SelfHostedBuildImageOptions): Promise ({ + default: { + install: vi.fn(), + }, +})); + +describe("installSourceMapSupport", () => { + const originalEnv = process.env; + const originalSetSourceMapsEnabled = process.setSourceMapsEnabled; + + beforeEach(() => { + vi.clearAllMocks(); + process.env = { ...originalEnv }; + // Mock setSourceMapsEnabled if it doesn't exist (Node < 16.6) or restore it + process.setSourceMapsEnabled = vi.fn(); + }); + + afterEach(() => { + process.env = originalEnv; + process.setSourceMapsEnabled = originalSetSourceMapsEnabled; + }); + + it("should install source-map-support by default (undefined env var)", () => { + delete process.env.TRIGGER_SOURCE_MAPS; + installSourceMapSupport(); + expect(sourceMapSupport.install).toHaveBeenCalledWith({ + handleUncaughtExceptions: false, + environment: "node", + hookRequire: false, + }); + }); + + it("should install source-map-support if env var is 'true'", () => { + process.env.TRIGGER_SOURCE_MAPS = "true"; + installSourceMapSupport(); + expect(sourceMapSupport.install).toHaveBeenCalled(); + }); + + it("should NOT install source-map-support if env var is 'false'", () => { + process.env.TRIGGER_SOURCE_MAPS = "false"; + installSourceMapSupport(); + expect(sourceMapSupport.install).not.toHaveBeenCalled(); + }); + + it("should NOT install source-map-support if env var is '0'", () => { + process.env.TRIGGER_SOURCE_MAPS = "0"; + installSourceMapSupport(); + expect(sourceMapSupport.install).not.toHaveBeenCalled(); + }); + + it("should enable native node source maps if env var is 'node'", () => { + process.env.TRIGGER_SOURCE_MAPS = "node"; + installSourceMapSupport(); + expect(sourceMapSupport.install).not.toHaveBeenCalled(); + expect(process.setSourceMapsEnabled).toHaveBeenCalledWith(true); + }); +}); diff --git a/packages/cli-v3/src/utilities/sourceMaps.ts b/packages/cli-v3/src/utilities/sourceMaps.ts new file mode 100644 index 00000000000..746caab94a1 --- /dev/null +++ b/packages/cli-v3/src/utilities/sourceMaps.ts @@ -0,0 +1,22 @@ +import sourceMapSupport from "source-map-support"; + +export function installSourceMapSupport() { + const sourceMaps = process.env.TRIGGER_SOURCE_MAPS; + + if (sourceMaps === "false" || sourceMaps === "0") { + return; + } + + if (sourceMaps === "node") { + if (process.setSourceMapsEnabled) { + process.setSourceMapsEnabled(true); + } + return; + } + + sourceMapSupport.install({ + handleUncaughtExceptions: false, + environment: "node", + hookRequire: false, + }); +} diff --git a/packages/core/src/v3/consoleInterceptor.ts b/packages/core/src/v3/consoleInterceptor.ts index c24b827e205..3adfb4aeeef 100644 --- a/packages/core/src/v3/consoleInterceptor.ts +++ b/packages/core/src/v3/consoleInterceptor.ts @@ -13,7 +13,17 @@ export class ConsoleInterceptor { private readonly sendToStdIO: boolean, private readonly interceptingDisabled: boolean, private readonly maxAttributeCount?: number - ) {} + ) { } + + private originalConsole: + | { + log: Console["log"]; + info: Console["info"]; + warn: Console["warn"]; + error: Console["error"]; + debug: Console["debug"]; + } + | undefined; // Intercept the console and send logs to the OpenTelemetry logger // during the execution of the callback @@ -23,7 +33,7 @@ export class ConsoleInterceptor { } // Save the original console methods - const originalConsole = { + this.originalConsole = { log: console.log, info: console.info, warn: console.warn, @@ -42,11 +52,15 @@ export class ConsoleInterceptor { return await callback(); } finally { // Restore the original console methods - console.log = originalConsole.log; - console.info = originalConsole.info; - console.warn = originalConsole.warn; - console.error = originalConsole.error; - console.debug = originalConsole.debug; + if (this.originalConsole) { + console.log = this.originalConsole.log; + console.info = this.originalConsole.info; + console.warn = this.originalConsole.warn; + console.error = this.originalConsole.error; + console.debug = this.originalConsole.debug; + + this.originalConsole = undefined; + } } } @@ -79,10 +93,30 @@ export class ConsoleInterceptor { const body = util.format(...args); if (this.sendToStdIO) { - if (severityNumber === SeverityNumber.ERROR) { - process.stderr.write(body); + if (this.originalConsole) { + switch (severityNumber) { + case SeverityNumber.INFO: + this.originalConsole.log(...args); + break; + case SeverityNumber.WARN: + this.originalConsole.warn(...args); + break; + case SeverityNumber.ERROR: + this.originalConsole.error(...args); + break; + case SeverityNumber.DEBUG: + this.originalConsole.debug(...args); + break; + default: + this.originalConsole.log(...args); + break; + } } else { - process.stdout.write(body); + if (severityNumber === SeverityNumber.ERROR) { + process.stderr.write(body + "\n"); + } else { + process.stdout.write(body + "\n"); + } } } diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index d32e32f91c9..c6c6732b2b2 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -320,7 +320,7 @@ export function sanitizeError(error: TaskRunError): TaskRunError { case "CUSTOM_ERROR": { // CUSTOM_ERROR.raw holds JSON.stringify(error) which is later parsed by // JSON.parse in createErrorTaskError. Naive truncation would cut mid-token - // and produce invalid JSON — wrap the preview in a valid JSON envelope. + // and produce invalid JSON wrap the preview in a valid JSON envelope. const clean = error.raw.replace(/\0/g, ""); const safeRaw = clean.length > MAX_MESSAGE_LENGTH @@ -332,7 +332,7 @@ export function sanitizeError(error: TaskRunError): TaskRunError { }; } case "INTERNAL_ERROR": { - // message and stackTrace are optional for INTERNAL_ERROR — preserve + // message and stackTrace are optional for INTERNAL_ERROR preserve // `undefined` so the `error.message ?? "Internal error (CODE)"` fallback // in createErrorTaskError still kicks in (empty string is not nullish). return { @@ -429,6 +429,9 @@ export function shouldLookupRetrySettings(error: TaskRunError): boolean { case "TASK_RUN_UNCAUGHT_EXCEPTION": return true; + case "TASK_MIDDLEWARE_ERROR": + return true; + default: return false; } @@ -641,7 +644,7 @@ export class ChatChunkTooLargeError extends Error { `chat.agent chunk${chunkType ? ` of type "${chunkType}"` : ""} is ${chunkSize} bytes, ` + `over the realtime stream's per-record cap of ${maxSize} bytes. ` + `For oversized payloads (e.g. large tool outputs), write the value to your own store and ` + - `emit only an id/url through the chat stream — see https://trigger.dev/docs/ai-chat/patterns/large-payloads.` + `emit only an id/url through the chat stream see https://trigger.dev/docs/ai-chat/patterns/large-payloads.` ); this.name = "ChatChunkTooLargeError"; } @@ -744,7 +747,7 @@ const prettyInternalErrors: Partial< href: links.docs.troubleshooting.stalledExecution, }, }, - // Link only — we deliberately do NOT set `message`, so the original + // Link only we deliberately do NOT set `message`, so the original // error message (e.g. "read ECONNRESET") is preserved in the dashboard. // Common cause: an EventEmitter (node-redis, pg, etc.) emitted "error" // with no listener attached, which Node escalates to uncaughtException. @@ -1152,7 +1155,7 @@ export function createTaskMetadataFailedErrorStack( } stack.push("\n"); - stack.push(` ❯ ${taskWithIssues.exportName} in ${taskWithIssues.filePath}`); + stack.push(` ? ${taskWithIssues.exportName} in ${taskWithIssues.filePath}`); for (const issue of taskWithIssues.issues) { if (issue.path) { diff --git a/packages/core/test/errors.test.ts b/packages/core/test/errors.test.ts index 9a94366d845..40e53df3981 100644 --- a/packages/core/test/errors.test.ts +++ b/packages/core/test/errors.test.ts @@ -263,6 +263,12 @@ describe("shouldRetryError + shouldLookupRetrySettings", () => { expect(shouldLookupRetrySettings(err)).toBe(true); }); + it("retries TASK_MIDDLEWARE_ERROR using the task's retry settings", () => { + const err = internal("TASK_MIDDLEWARE_ERROR"); + expect(shouldRetryError(err)).toBe(true); + expect(shouldLookupRetrySettings(err)).toBe(true); + }); + it("still does not retry SIGKILL timeout", () => { expect(shouldRetryError(internal("TASK_PROCESS_SIGKILL_TIMEOUT"))).toBe(false); });