|
| 1 | +import pRetry from "p-retry"; |
| 2 | +import { Counter, Gauge } from "prom-client"; |
| 3 | +import { metricsRegister } from "~/metrics.server"; |
| 4 | +import { logger } from "~/services/logger.server"; |
| 5 | +import { signalsEmitter } from "~/services/signals.server"; |
| 6 | + |
| 7 | +const loadFailures = new Counter({ |
| 8 | + name: "reloading_registry_load_failures_total", |
| 9 | + help: "Failed loads of a reloading registry", |
| 10 | + labelNames: ["name"], |
| 11 | + registers: [metricsRegister], |
| 12 | +}); |
| 13 | + |
| 14 | +const lastSuccessfulLoadAt = new Gauge({ |
| 15 | + name: "reloading_registry_last_successful_load_timestamp_seconds", |
| 16 | + help: "Unix time of the last successful registry load (staleness signal)", |
| 17 | + labelNames: ["name"], |
| 18 | + registers: [metricsRegister], |
| 19 | +}); |
| 20 | + |
| 21 | +export type ReloadingRegistry<T> = { |
| 22 | + isReady: Promise<void>; |
| 23 | + readonly isLoaded: boolean; |
| 24 | + current(): T | undefined; |
| 25 | + reload(): Promise<void>; |
| 26 | + waitUntilReady(timeoutMs: number): Promise<void>; |
| 27 | + stop(): void; |
| 28 | +}; |
| 29 | + |
| 30 | +export type ReloadingRegistryOptions<T> = { |
| 31 | + /** Tag for metrics + logs. */ |
| 32 | + name: string; |
| 33 | + /** Loads the full snapshot from the source of truth. */ |
| 34 | + load: () => Promise<T>; |
| 35 | + /** How often to reload after the first successful load. */ |
| 36 | + intervalMs: number; |
| 37 | + /** Startup retry config; defaults to forever with backoff. */ |
| 38 | + retry?: { retries?: number }; |
| 39 | +}; |
| 40 | + |
| 41 | +/** |
| 42 | + * In-memory snapshot loaded at startup and refreshed on an interval. Reads are |
| 43 | + * synchronous (`current()`); the first read should gate on `waitUntilReady` so a |
| 44 | + * cold replica never serves a default over a real value. Mirrors the datastore / |
| 45 | + * LLM-pricing registries. Interval-only: no pub/sub (a follow-up if sub-second |
| 46 | + * propagation is ever needed). |
| 47 | + */ |
| 48 | +export function createReloadingRegistry<T>(opts: ReloadingRegistryOptions<T>): ReloadingRegistry<T> { |
| 49 | + let snapshot: T | undefined; |
| 50 | + let loaded = false; |
| 51 | + let resolveReady!: () => void; |
| 52 | + const isReady = new Promise<void>((resolve) => { |
| 53 | + resolveReady = resolve; |
| 54 | + }); |
| 55 | + |
| 56 | + async function doLoad() { |
| 57 | + snapshot = await opts.load(); |
| 58 | + lastSuccessfulLoadAt.set({ name: opts.name }, Date.now() / 1000); |
| 59 | + if (!loaded) { |
| 60 | + loaded = true; |
| 61 | + resolveReady(); |
| 62 | + } |
| 63 | + } |
| 64 | + |
| 65 | + const startup = pRetry(() => doLoad(), { |
| 66 | + forever: opts.retry?.retries === undefined, |
| 67 | + retries: opts.retry?.retries, |
| 68 | + minTimeout: 1_000, |
| 69 | + maxTimeout: 60_000, |
| 70 | + factor: 2, |
| 71 | + onFailedAttempt: (error) => { |
| 72 | + loadFailures.inc({ name: opts.name }); |
| 73 | + logger.warn("[ReloadingRegistry] startup load failed, retrying", { |
| 74 | + name: opts.name, |
| 75 | + attemptNumber: error.attemptNumber, |
| 76 | + retriesLeft: error.retriesLeft, |
| 77 | + error: error.message, |
| 78 | + }); |
| 79 | + }, |
| 80 | + }); |
| 81 | + startup.catch((err) => { |
| 82 | + logger.error("[ReloadingRegistry] startup load gave up", { |
| 83 | + name: opts.name, |
| 84 | + error: err instanceof Error ? err.message : String(err), |
| 85 | + }); |
| 86 | + }); |
| 87 | + |
| 88 | + const interval = setInterval(() => { |
| 89 | + doLoad().catch((err) => { |
| 90 | + loadFailures.inc({ name: opts.name }); |
| 91 | + logger.warn("[ReloadingRegistry] reload failed", { |
| 92 | + name: opts.name, |
| 93 | + error: err instanceof Error ? err.message : String(err), |
| 94 | + }); |
| 95 | + }); |
| 96 | + }, opts.intervalMs); |
| 97 | + |
| 98 | + function stop() { |
| 99 | + clearInterval(interval); |
| 100 | + } |
| 101 | + signalsEmitter.on("SIGTERM", stop); |
| 102 | + signalsEmitter.on("SIGINT", stop); |
| 103 | + |
| 104 | + return { |
| 105 | + isReady, |
| 106 | + get isLoaded() { |
| 107 | + return loaded; |
| 108 | + }, |
| 109 | + current: () => snapshot, |
| 110 | + reload: doLoad, |
| 111 | + async waitUntilReady(timeoutMs: number) { |
| 112 | + if (loaded || timeoutMs <= 0) return; |
| 113 | + await Promise.race([ |
| 114 | + isReady, |
| 115 | + new Promise<void>((resolve) => setTimeout(resolve, timeoutMs)), |
| 116 | + ]); |
| 117 | + }, |
| 118 | + stop, |
| 119 | + }; |
| 120 | +} |
0 commit comments