Skip to content

Commit b7ce4c3

Browse files
committed
regen migrations
1 parent 1e8995b commit b7ce4c3

4 files changed

Lines changed: 17642 additions & 87 deletions

File tree

apps/sim/scripts/backfill-trace-spans.ts

Lines changed: 17 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,22 @@
11
#!/usr/bin/env bun
22

33
/**
4-
* One-shot backfill for the trace-spans-to-S3 + usage_log-cost migration.
4+
* One-shot, idempotent, resumable backfill that externalizes inline heavy
5+
* `execution_data` (traceSpans, finalOutput, workflowInput, ...) into the
6+
* execution-context large-value store, matching the completion path (cost-stripped
7+
* spans, trace pointer + markers, owner/dependency + execution_log reference
8+
* registration). Skips running rows and rows already carrying the pointer.
59
*
6-
* Two independent, idempotent, resumable passes:
10+
* Requires object storage to be configured; self-hosted deployments without it
11+
* keep `execution_data` inline (reads resolve inline transparently) and can skip
12+
* this script entirely.
713
*
8-
* 1. Cost projections (cheap, no object storage): populates
9-
* `workflow_execution_logs.cost_total` and `models_used` from the existing
10-
* (reconciling) `cost` jsonb so the logs list filter/sort/model-filter is
11-
* uniform across old and new rows. Only touches rows where `cost_total` is
12-
* still null. Run this before the `cost` column is dropped in a follow-up PR.
13-
*
14-
* 2. Trace storage (heavier): externalizes inline heavy `execution_data`
15-
* (traceSpans, finalOutput, workflowInput, ...) into the execution-context
16-
* large-value store, matching the completion path (cost-stripped spans,
17-
* trace pointer + markers, owner/dependency + execution_log reference
18-
* registration). Skips running rows and rows already carrying the pointer.
19-
*
20-
* Both passes are safe to re-run.
14+
* NOTE: the companion `cost_total` / `models_used` backfill is done in SQL by
15+
* migration 0220 (batched, idempotent), so it runs for everyone — including
16+
* self-hosted — and is intentionally NOT part of this script.
2117
*
2218
* Usage:
23-
* DATABASE_URL=... bun apps/sim/scripts/backfill-trace-spans.ts [flags]
24-
*
25-
* Flags:
26-
* --projections-only Run only pass 1 (cost_total / models_used).
27-
* --trace-only Run only pass 2 (externalize execution_data).
28-
* --max-batches=<n> Cap the number of batches per pass (default: unbounded).
29-
*
30-
* Examples:
31-
* bun apps/sim/scripts/backfill-trace-spans.ts --projections-only
32-
* bun apps/sim/scripts/backfill-trace-spans.ts --trace-only --max-batches=10
33-
* bun apps/sim/scripts/backfill-trace-spans.ts
19+
* DATABASE_URL=... bun apps/sim/scripts/backfill-trace-spans.ts [--max-batches=<n>]
3420
*/
3521

3622
import { db } from '@sim/db'
@@ -47,7 +33,6 @@ import {
4733
TRACE_STORE_REF_KEY,
4834
} from '@/lib/logs/execution/trace-store'
4935

50-
const PROJECTION_BATCH_SIZE = 1000
5136
const TRACE_BATCH_SIZE = 100
5237

5338
/**
@@ -66,14 +51,10 @@ function countTraceSpans(spans: unknown): number {
6651
}
6752

6853
interface Options {
69-
projections: boolean
70-
trace: boolean
7154
maxBatches: number
7255
}
7356

7457
function parseArgs(argv: string[]): Options {
75-
const projectionsOnly = argv.includes('--projections-only')
76-
const traceOnly = argv.includes('--trace-only')
7758
const maxBatchesArg = argv.find((a) => a.startsWith('--max-batches='))
7859
const maxBatches = maxBatchesArg
7960
? Number.parseInt(maxBatchesArg.slice('--max-batches='.length), 10)
@@ -83,53 +64,10 @@ function parseArgs(argv: string[]): Options {
8364
throw new Error('--max-batches must be a positive integer')
8465
}
8566

86-
return {
87-
projections: !traceOnly,
88-
trace: !projectionsOnly,
89-
maxBatches,
90-
}
91-
}
92-
93-
/** Pass 1: backfill cost_total + models_used from the cost jsonb. */
94-
async function backfillCostProjections(maxBatches: number): Promise<number> {
95-
let updated = 0
96-
97-
for (let batch = 0; batch < maxBatches; batch++) {
98-
// Candidate set is restricted to rows we can actually project (numeric
99-
// `total` present). Every updated row leaves the set (cost_total becomes
100-
// non-null); rows without a numeric total never match — so the set strictly
101-
// drains and the loop terminates instead of re-selecting unprojectable rows.
102-
const result = await db.execute<{ id: string }>(sql`
103-
WITH candidates AS (
104-
SELECT id FROM ${workflowExecutionLogs}
105-
WHERE cost_total IS NULL
106-
AND cost ? 'total'
107-
AND (cost->>'total') ~ '^-?[0-9]+(\\.[0-9]+)?$'
108-
LIMIT ${PROJECTION_BATCH_SIZE}
109-
)
110-
UPDATE ${workflowExecutionLogs} AS wel
111-
SET
112-
cost_total = NULLIF(wel.cost->>'total', '')::numeric,
113-
models_used = CASE
114-
WHEN jsonb_typeof(wel.cost->'models') = 'object'
115-
THEN ARRAY(SELECT jsonb_object_keys(wel.cost->'models'))
116-
ELSE wel.models_used
117-
END
118-
FROM candidates
119-
WHERE wel.id = candidates.id
120-
RETURNING wel.id
121-
`)
122-
123-
const rowCount = result.length
124-
updated += rowCount
125-
console.log(` [projections] batch ${batch + 1}: updated ${rowCount} (total ${updated})`)
126-
if (rowCount < PROJECTION_BATCH_SIZE) break
127-
}
128-
129-
return updated
67+
return { maxBatches }
13068
}
13169

132-
/** Pass 2: externalize inline heavy execution_data into the large-value store. */
70+
/** Externalize inline heavy execution_data into the large-value store. */
13371
async function backfillTraceStorage(
13472
maxBatches: number
13573
): Promise<{ migrated: number; failed: number }> {
@@ -233,17 +171,9 @@ async function main(): Promise<void> {
233171
const options = parseArgs(process.argv.slice(2))
234172
const startedAt = Date.now()
235173

236-
if (options.projections) {
237-
console.log('Backfilling cost projections (cost_total / models_used)…')
238-
const updated = await backfillCostProjections(options.maxBatches)
239-
console.log(`Projections done: ${updated} rows updated.`)
240-
}
241-
242-
if (options.trace) {
243-
console.log('Backfilling trace storage (externalizing execution_data)…')
244-
const { migrated, failed } = await backfillTraceStorage(options.maxBatches)
245-
console.log(`Trace storage done: ${migrated} migrated, ${failed} skipped/failed.`)
246-
}
174+
console.log('Backfilling trace storage (externalizing execution_data)…')
175+
const { migrated, failed } = await backfillTraceStorage(options.maxBatches)
176+
console.log(`Trace storage done: ${migrated} migrated, ${failed} skipped/failed.`)
247177

248178
console.log(`Backfill complete in ${((Date.now() - startedAt) / 1000).toFixed(1)}s.`)
249179
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
ALTER TYPE "public"."usage_log_category" ADD VALUE IF NOT EXISTS 'tool';--> statement-breakpoint
2+
ALTER TABLE "workflow_execution_logs" ADD COLUMN IF NOT EXISTS "cost_total" numeric;--> statement-breakpoint
3+
ALTER TABLE "workflow_execution_logs" ADD COLUMN IF NOT EXISTS "models_used" text[];--> statement-breakpoint
4+
COMMIT;--> statement-breakpoint
5+
CREATE OR REPLACE PROCEDURE backfill_wel_cost_total_0220() LANGUAGE plpgsql AS $$
6+
DECLARE
7+
updated integer;
8+
BEGIN
9+
LOOP
10+
WITH candidates AS (
11+
SELECT id FROM workflow_execution_logs
12+
WHERE cost_total IS NULL
13+
AND cost ? 'total'
14+
AND (cost->>'total') ~ '^-?[0-9]+(\.[0-9]+)?$'
15+
LIMIT 5000
16+
)
17+
UPDATE workflow_execution_logs wel
18+
SET cost_total = NULLIF(wel.cost->>'total', '')::numeric,
19+
models_used = CASE
20+
WHEN jsonb_typeof(wel.cost->'models') = 'object'
21+
THEN ARRAY(SELECT jsonb_object_keys(wel.cost->'models'))
22+
ELSE wel.models_used
23+
END
24+
FROM candidates
25+
WHERE wel.id = candidates.id;
26+
GET DIAGNOSTICS updated = ROW_COUNT;
27+
EXIT WHEN updated = 0;
28+
COMMIT;
29+
END LOOP;
30+
END;
31+
$$;--> statement-breakpoint
32+
CALL backfill_wel_cost_total_0220();--> statement-breakpoint
33+
DROP PROCEDURE backfill_wel_cost_total_0220();--> statement-breakpoint
34+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "usage_log_execution_id_idx" ON "usage_log" USING btree ("execution_id");--> statement-breakpoint
35+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "workflow_execution_logs_workspace_cost_total_idx" ON "workflow_execution_logs" USING btree ("workspace_id","cost_total");--> statement-breakpoint
36+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "workflow_execution_logs_models_used_idx" ON "workflow_execution_logs" USING gin ("models_used");

0 commit comments

Comments
 (0)