Skip to content

Commit a83d30a

Browse files
d-csclaude
andcommitted
feat(run-engine): retry getSnapshotsSince on the primary when the replica misses the since snapshot
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 8ee4aae commit a83d30a

1 file changed

Lines changed: 46 additions & 6 deletions

File tree

  • internal-packages/run-engine/src/engine

internal-packages/run-engine/src/engine/index.ts

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createRedisClient, Redis } from "@internal/redis";
2-
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
2+
import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
44
import {
55
CheckpointInput,
@@ -46,7 +46,12 @@ import { RunQueue } from "../run-queue/index.js";
4646
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
4747
import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js";
4848
import { BillingCache } from "./billingCache.js";
49-
import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js";
49+
import {
50+
ExecutionSnapshotNotFoundError,
51+
NotImplementedError,
52+
RunDuplicateIdempotencyKeyError,
53+
RunOneTimeUseTokenError,
54+
} from "./errors.js";
5055
import { EventBus, EventBusEvents } from "./eventBus.js";
5156
import { RunLocker } from "./locking.js";
5257
import { getFinalRunStatuses } from "./statuses.js";
@@ -88,6 +93,7 @@ export class RunEngine {
8893
private logger: Logger;
8994
private tracer: Tracer;
9095
private meter: Meter;
96+
private snapshotsSinceReplicaMissCounter: Counter;
9197
private heartbeatTimeouts: HeartbeatTimeouts;
9298
private repairSnapshotTimeoutMs: number;
9399
private batchQueue: BatchQueue;
@@ -272,6 +278,14 @@ export class RunEngine {
272278
this.tracer = options.tracer;
273279
this.meter = options.meter ?? getMeter("run-engine");
274280

281+
this.snapshotsSinceReplicaMissCounter = this.meter.createCounter(
282+
"run_engine.snapshots_since.replica_miss",
283+
{
284+
description:
285+
"getSnapshotsSince reads where the since snapshot was not yet on the read replica and the query was retried on the primary",
286+
}
287+
);
288+
275289
const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
276290
PENDING_EXECUTING: 60_000,
277291
PENDING_CANCEL: 60_000,
@@ -1918,13 +1932,39 @@ export class RunEngine {
19181932
snapshotId: string;
19191933
tx?: PrismaClientOrTransaction;
19201934
}): Promise<RunExecutionData[] | null> {
1921-
const prisma =
1922-
tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma);
1935+
const useReplica = !tx && this.options.readReplicaSnapshotsSinceEnabled === true;
1936+
const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma);
19231937

1924-
try {
1925-
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
1938+
const query = async (client: PrismaClientOrTransaction) => {
1939+
const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId);
19261940
return snapshots.map(executionDataFromSnapshot);
1941+
};
1942+
1943+
try {
1944+
return await query(prisma);
19271945
} catch (e) {
1946+
if (useReplica && e instanceof ExecutionSnapshotNotFoundError) {
1947+
// Expected during replica lag: the runner learned the snapshot id from the writer
1948+
// before the replica caught up. Serve the read from the writer instead of failing
1949+
// the poll.
1950+
this.snapshotsSinceReplicaMissCounter.add(1);
1951+
this.logger.warn("getSnapshotsSince: snapshot not yet on replica, retrying on primary", {
1952+
runId,
1953+
snapshotId,
1954+
});
1955+
1956+
try {
1957+
return await query(this.prisma);
1958+
} catch (retryError) {
1959+
this.logger.error("Failed to getSnapshotsSince", {
1960+
message: retryError instanceof Error ? retryError.message : retryError,
1961+
runId,
1962+
snapshotId,
1963+
});
1964+
return null;
1965+
}
1966+
}
1967+
19281968
this.logger.error("Failed to getSnapshotsSince", {
19291969
message: e instanceof Error ? e.message : e,
19301970
runId,

0 commit comments

Comments
 (0)