Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ export class SharedQueueConsumer {

console.log("✅ Started the SharedQueueConsumer");

this.#doWork().finally(() => {});
this.#doWork().finally(() => { });
}

#endCurrentSpan() {
Expand Down Expand Up @@ -417,7 +417,7 @@ export class SharedQueueConsumer {
span.end();

setTimeout(() => {
this.#doWork().finally(() => {});
this.#doWork().finally(() => { });
}, nextInterval);
}
});
Expand Down Expand Up @@ -620,8 +620,8 @@ export class SharedQueueConsumer {
return existingTaskRun.lockedById
? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById)
: existingTaskRun.lockedToVersionId
? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId)
: await findCurrentWorkerDeployment({
? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId)
: await findCurrentWorkerDeployment({
environmentId: existingTaskRun.runtimeEnvironmentId,
type: "V1",
});
Expand Down Expand Up @@ -928,7 +928,6 @@ export class SharedQueueConsumer {
machine,
nextAttemptNumber,
// identifiers
id: "placeholder", // TODO: Remove this completely in a future release
envId: lockedTaskRun.runtimeEnvironment.id,
envType: lockedTaskRun.runtimeEnvironment.type,
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
Expand Down Expand Up @@ -1650,6 +1649,12 @@ export const AttemptForExecutionGetPayload = {
maxDurationInSeconds: true,
tags: true,
taskEventStore: true,
batch: {
select: {
id: true,
friendlyId: true,
},
},
},
},
queue: {
Expand Down Expand Up @@ -1754,7 +1759,11 @@ class SharedQueueTasks {
slug: attempt.runtimeEnvironment.project.slug,
name: attempt.runtimeEnvironment.project.name,
},
batch: undefined, // TODO: Removing this for now until we can do it more efficiently
batch: attempt.taskRun.batch
? {
id: attempt.taskRun.batch.friendlyId,
}
: undefined,
worker: {
id: attempt.backgroundWorkerId,
contentHash: attempt.backgroundWorker.contentHash,
Expand Down Expand Up @@ -1900,9 +1909,9 @@ class SharedQueueTasks {

async getResumePayload(attemptId: string): Promise<
| {
execution: V3ProdTaskRunExecution;
completion: TaskRunExecutionResult;
}
execution: V3ProdTaskRunExecution;
completion: TaskRunExecutionResult;
}
| undefined
> {
const attempt = await prisma.taskRunAttempt.findFirst({
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/v3/services/createBackgroundWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ async function createWorkerTask(
description: task.description,
filePath: task.filePath,
exportName: task.exportName,
retryConfig: task.retry,
queueConfig: task.queue,
machineConfig: task.machine,
retryConfig: task.retry ?? null,
queueConfig: task.queue ?? null,
machineConfig: task.machine ?? null,
triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD",
fileId: tasksToBackgroundFiles?.get(task.id) ?? null,
maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null,
queueId: queue.id,
payloadSchema: task.payloadSchema as any,
payloadSchema: (task.payloadSchema as any) ?? null,
},
});
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,20 @@ function enhanceExecutionSnapshotWithWaitpoints(
waitpoints: Waitpoint[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
const waitpointIndexMap = new Map<string, number[]>();
for (let i = 0; i < completedWaitpointOrder.length; i++) {
const id = completedWaitpointOrder[i];
const existing = waitpointIndexMap.get(id) ?? [];
existing.push(i);
waitpointIndexMap.set(id, existing);
}

return {
...snapshot,
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
runFriendlyId: RunId.toFriendlyId(snapshot.runId),
completedWaitpoints: waitpoints.flatMap((w) => {
// Get all indexes of the waitpoint in the completedWaitpointOrder
// We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
let indexes: (number | undefined)[] = [];
for (let i = 0; i < completedWaitpointOrder.length; i++) {
if (completedWaitpointOrder[i] === w.id) {
indexes.push(i);
}
}

if (indexes.length === 0) {
indexes.push(undefined);
}
const indexes = waitpointIndexMap.get(w.id) ?? [undefined];

return indexes.map((index) => {
return {
Expand All @@ -89,22 +86,22 @@ function enhanceExecutionSnapshotWithWaitpoints(
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
: undefined,
completedAfter: w.completedAfter ?? undefined,
completedByBatch: w.completedByBatchId
? {
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
: undefined,
output: w.output ?? undefined,
outputType: w.outputType,
Expand Down Expand Up @@ -233,19 +230,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
Expand Down
46 changes: 44 additions & 2 deletions packages/cli-v3/src/dev/mcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,53 @@ server.tool(
}
})
.describe("The payload to pass to the task run, must be a valid JSON"),
// TODO: expose more parameteres from the trigger options
delay: z
.string()
.optional()
.describe("Delay before the task run starts, e.g. '1m', '30s', '2h', or an ISO 8601 date"),
ttl: z
.union([z.string(), z.number()])
.optional()
.describe(
"Time-to-live: how long the run remains valid before it starts, e.g. '1h' or seconds as a number"
),
tags: z
.array(z.string())
.optional()
.describe("Tags to attach to the task run for filtering and organization"),
queue: z.string().optional().describe("The queue name to use for this task run"),
maxAttempts: z
.number()
.int()
.optional()
.describe("Maximum number of retry attempts for this task run"),
idempotencyKey: z
.string()
.optional()
.describe("Idempotency key for deduplication of task runs"),
concurrencyKey: z
.string()
.optional()
.describe("Concurrency key for controlling concurrent execution"),
priority: z.number().optional().describe("Priority of the task run (higher = more priority)"),
test: z.boolean().optional().describe("Whether this is a test run"),
},
async ({ id, payload }) => {
async ({ id, payload, delay, ttl, tags, queue, maxAttempts, idempotencyKey, concurrencyKey, priority, test }) => {
const options: Record<string, unknown> = {};

if (delay !== undefined) options.delay = delay;
if (ttl !== undefined) options.ttl = ttl;
if (tags !== undefined) options.tags = tags;
if (queue !== undefined) options.queue = { name: queue };
if (maxAttempts !== undefined) options.maxAttempts = maxAttempts;
if (idempotencyKey !== undefined) options.idempotencyKey = idempotencyKey;
if (concurrencyKey !== undefined) options.concurrencyKey = concurrencyKey;
if (priority !== undefined) options.priority = priority;
if (test !== undefined) options.test = test;

const result = await sdkApiClient.triggerTask(id, {
payload,
options: Object.keys(options).length > 0 ? options : undefined,
});

const taskRunUrl = `${dashboardUrl}/projects/v3/${projectRef}/runs/${result.id}`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana
this._metrics = [];
}

// TODO: handle this when processKeepAlive is enabled
#seedMetricsFromEnvironment(isWarmStartOverride?: boolean) {
const forkStartTime = getEnvVar("TRIGGER_PROCESS_FORK_START_TIME");
const warmStart = getEnvVar("TRIGGER_WARM_START");
Expand All @@ -46,12 +45,21 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana

if (typeof forkStartTime === "string" && !isWarmStart) {
const forkStartTimeMs = parseInt(forkStartTime, 10);
const forkDuration = Date.now() - forkStartTimeMs;

// When processKeepAlive is enabled, the process is reused across multiple runs.
// The TRIGGER_PROCESS_FORK_START_TIME env var from the original cold start persists
// in the process environment and becomes stale. Skip registration if the fork time
// is unreasonably old (> 60s), which indicates a kept-alive process.
if (forkDuration > 60_000) {
return;
}

this.registerMetric({
name: "trigger.dev/start",
event: "fork",
attributes: {
duration: Date.now() - forkStartTimeMs,
duration: forkDuration,
},
timestamp: forkStartTimeMs,
});
Expand Down
Loading
Loading