diff --git a/packages/backend/src/context/bullmq-context.test.ts b/packages/backend/src/context/bullmq-context.test.ts new file mode 100644 index 00000000..186f8a3b --- /dev/null +++ b/packages/backend/src/context/bullmq-context.test.ts @@ -0,0 +1,82 @@ +import { describe, it, expect, vi } from 'vitest'; +import { context, withContext } from '@logtide/shared/context'; +import { attachContextToPayload, wrapProcessorWithContext, CTX_KEY } from './bullmq-context.js'; + +describe('attachContextToPayload', () => { + it('returns input unchanged when no context', () => { + const data = { foo: 1 }; + expect(attachContextToPayload(data)).toBe(data); + }); + + it('adds _ctx with v=1 when a context is active', async () => { + await withContext({ requestId: 'req-1', organizationId: 'org-1' }, async () => { + const out = attachContextToPayload({ foo: 1 }) as any; + expect(out._ctx.v).toBe(1); + expect(out._ctx.requestId).toBe('req-1'); + expect(out._ctx.organizationId).toBe('org-1'); + expect(out.foo).toBe(1); + }); + }); + + it('handles non-object payloads gracefully', () => { + expect(attachContextToPayload('hello' as unknown as object)).toBe('hello'); + }); +}); + +describe('wrapProcessorWithContext', () => { + it('strips _ctx and runs processor inside context.run', async () => { + const processor = vi.fn(async (job) => { + // expect data does not contain _ctx + expect((job.data as any)._ctx).toBeUndefined(); + // expect context is established + const ctx = context.current(); + expect(ctx.requestId).toBe('req-7'); + expect(ctx.origin).toBe('job'); + }); + + const wrapped = wrapProcessorWithContext('test-job', processor); + await wrapped({ + id: 'j1', + name: 'test-job', + data: { + payload: 'x', + [CTX_KEY]: { + v: 1, + requestId: 'req-7', + origin: 'http', + actor: { type: 'system', id: null }, + organizationId: null, + projectId: null, + }, + } as any, + }); + expect(processor).toHaveBeenCalledOnce(); + }); + + it('falls back to runAsSystem when _ctx is missing', async () => { + const processor = vi.fn(async () => { + const ctx = context.current(); + expect(ctx.origin).toBe('system'); + expect(ctx.systemReason).toBe('bullmq-job:no-ctx-job'); + }); + + const wrapped = wrapProcessorWithContext('no-ctx-job', processor); + await wrapped({ id: 'j2', name: 'no-ctx-job', data: { payload: 'y' } as any }); + expect(processor).toHaveBeenCalledOnce(); + }); + + it('falls back to runAsSystem when _ctx has unknown version', async () => { + const processor = vi.fn(async () => { + const ctx = context.current(); + expect(ctx.origin).toBe('system'); + }); + + const wrapped = wrapProcessorWithContext('bad-ctx-job', processor); + await wrapped({ + id: 'j3', + name: 'bad-ctx-job', + data: { [CTX_KEY]: { v: 99 } } as any, + }); + expect(processor).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/backend/src/context/bullmq-context.ts b/packages/backend/src/context/bullmq-context.ts new file mode 100644 index 00000000..3038ab4c --- /dev/null +++ b/packages/backend/src/context/bullmq-context.ts @@ -0,0 +1,45 @@ +import { currentOrNull, deserializeContext, run, runAsSystem, serializeContext } from '@logtide/shared/context'; +import type { JobProcessor } from '../queue/abstractions/types.js'; + +export const CTX_KEY = '_ctx'; + +/** + * Adds `_ctx` to a job payload if a context is currently active. + * Returns the (possibly modified) payload as `unknown` to keep adapter generics sane. + */ +export function attachContextToPayload(data: T): T { + const ctx = currentOrNull(); + if (!ctx) return data; + if (typeof data !== 'object' || data === null) return data; + return { ...(data as object), [CTX_KEY]: serializeContext(ctx) } as T; +} + +/** + * Wraps a JobProcessor: detaches `_ctx` from the payload (so the user processor sees clean data) + * and runs the processor inside the appropriate context.run / runAsSystem. + */ +export function wrapProcessorWithContext( + jobName: string, + processor: JobProcessor +): JobProcessor { + return async (job) => { + const raw = job.data as T & { [CTX_KEY]?: unknown }; + const serialized = (raw as Record | null)?.[CTX_KEY]; + const cleanData = + raw && typeof raw === 'object' + ? (() => { + const { [CTX_KEY]: _omit, ...rest } = raw as Record; + return rest as T; + })() + : raw; + + const cleanJob = { ...job, data: cleanData }; + + const ctx = serialized != null ? deserializeContext(serialized) : undefined; + if (ctx) { + await run(ctx, () => processor(cleanJob)); + } else { + await runAsSystem(`bullmq-job:${jobName}`, () => processor(cleanJob)); + } + }; +} diff --git a/packages/backend/src/context/database-comment.test.ts b/packages/backend/src/context/database-comment.test.ts new file mode 100644 index 00000000..3da8ddb6 --- /dev/null +++ b/packages/backend/src/context/database-comment.test.ts @@ -0,0 +1,36 @@ +import { describe, it, expect } from 'vitest'; +import { withContext, currentOrNull } from '@logtide/shared/context'; +import { formatContextComment } from './kysely-plugin.js'; + +describe('SQL comment injection helpers', () => { + it('formatContextComment can be invoked deterministically', () => { + const out = formatContextComment({ + requestId: 'r1', + origin: 'http', + organizationId: 'o1', + actor: { type: 'user', id: 'u1' }, + }); + expect(out).toMatch(/^\/\* req=r1 origin=http org=o1 actor=user:u1 \*\/ $/); + }); + + it('manually-built patched query function prepends comment', async () => { + const captured: string[] = []; + const fakeOriginal = (sql: string) => { + captured.push(sql); + return Promise.resolve({ rows: [], rowCount: 0 }); + }; + const patched = (...args: any[]) => { + const ctx = currentOrNull(); + if (!ctx) return fakeOriginal(...(args as [string])); + const comment = formatContextComment(ctx); + args[0] = comment + args[0]; + return fakeOriginal(...(args as [string])); + }; + + await withContext({ requestId: 'rZ', organizationId: 'oZ' }, async () => { + await patched('SELECT 1'); + }); + + expect(captured[0]).toMatch(/^\/\* req=rZ /); + }); +}); diff --git a/packages/backend/src/context/fastify-plugin.test.ts b/packages/backend/src/context/fastify-plugin.test.ts new file mode 100644 index 00000000..14c716e0 --- /dev/null +++ b/packages/backend/src/context/fastify-plugin.test.ts @@ -0,0 +1,67 @@ +import { describe, it, expect } from 'vitest'; +import Fastify from 'fastify'; +import { context } from '@logtide/shared/context'; +import { contextPlugin } from './fastify-plugin.js'; + +describe('contextPlugin', () => { + async function buildApp(authStub?: (req: any) => void) { + const app = Fastify({ logger: false }); + if (authStub) { + // Simulate the auth plugin running BEFORE contextPlugin (registration order) + app.addHook('onRequest', async (req) => { + authStub(req); + }); + } + await app.register(contextPlugin); + app.get('/echo', async () => { + const ctx = context.current(); + return { + requestId: ctx.requestId, + origin: ctx.origin, + actorType: ctx.actor.type, + actorId: ctx.actor.id, + organizationId: ctx.organizationId, + }; + }); + return app; + } + + it('establishes a context for every request (anonymous)', async () => { + const app = await buildApp(); + const res = await app.inject({ method: 'GET', url: '/echo' }); + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body.origin).toBe('http'); + expect(body.actorType).toBe('system'); + expect(body.requestId).toBeTruthy(); + await app.close(); + }); + + it('captures user actor when auth populated request.user', async () => { + const app = await buildApp((req) => { + req.user = { id: 'u-abc', email: 'alice@test' }; + req.organizationId = 'org-1'; + }); + const res = await app.inject({ method: 'GET', url: '/echo' }); + const body = res.json(); + expect(body.actorType).toBe('user'); + expect(body.actorId).toBe('u-abc'); + expect(body.organizationId).toBe('org-1'); + await app.close(); + }); + + it('captures apiKey actor when auth populated request.apiKeyId', async () => { + const app = await buildApp((req) => { + req.apiKeyId = 'key-xyz'; + req.apiKeyType = 'write'; + req.organizationId = 'org-2'; + req.projectId = 'proj-2'; + }); + const res = await app.inject({ method: 'GET', url: '/echo' }); + const body = res.json(); + expect(body.actorType).toBe('apiKey'); + expect(body.actorId).toBe('key-xyz'); + expect(body.organizationId).toBe('org-2'); + await app.close(); + }); +}); diff --git a/packages/backend/src/context/fastify-plugin.ts b/packages/backend/src/context/fastify-plugin.ts new file mode 100644 index 00000000..614f7e18 --- /dev/null +++ b/packages/backend/src/context/fastify-plugin.ts @@ -0,0 +1,44 @@ +import type { FastifyPluginAsync, FastifyRequest } from 'fastify'; +import fp from 'fastify-plugin'; +import type { ApiKeyType } from '@logtide/shared'; +import { context, type Actor, type RequestContext } from '@logtide/shared/context'; + +type AuthDecoratedRequest = FastifyRequest & { + organizationId?: string; + projectId?: string; + apiKeyId?: string; + apiKeyType?: ApiKeyType; + user?: { id: string; email: string }; +}; + +const plugin: FastifyPluginAsync = async (fastify) => { + fastify.addHook('onRequest', async (request) => { + const r = request as AuthDecoratedRequest; + + const actor: Actor = r.user + ? { type: 'user', id: r.user.id, email: r.user.email } + : r.apiKeyId + ? { type: 'apiKey', id: r.apiKeyId, apiKeyType: r.apiKeyType } + : { type: 'system', id: null }; + + const userAgentHeader = r.headers['user-agent']; + const userAgent = + typeof userAgentHeader === 'string' ? userAgentHeader : undefined; + + const ctx: RequestContext = { + requestId: r.id, + origin: 'http', + actor, + organizationId: r.organizationId ?? null, + projectId: r.projectId ?? null, + ip: r.ip, + userAgent, + }; + context.enterWith(ctx); + }); +}; + +export const contextPlugin = fp(plugin, { + name: 'context', + fastify: '5.x', +}); diff --git a/packages/backend/src/context/index.ts b/packages/backend/src/context/index.ts new file mode 100644 index 00000000..faea2f03 --- /dev/null +++ b/packages/backend/src/context/index.ts @@ -0,0 +1,2 @@ +export { contextPlugin } from './fastify-plugin.js'; +export { context } from '@logtide/shared/context'; diff --git a/packages/backend/src/context/kysely-plugin.test.ts b/packages/backend/src/context/kysely-plugin.test.ts new file mode 100644 index 00000000..6083d237 --- /dev/null +++ b/packages/backend/src/context/kysely-plugin.test.ts @@ -0,0 +1,34 @@ +import { describe, it, expect } from 'vitest'; +import { formatContextComment, safeForComment } from './kysely-plugin.js'; + +describe('formatContextComment', () => { + it('formats basic context', () => { + const out = formatContextComment({ + requestId: 'req-1', + origin: 'http', + organizationId: 'org-1', + actor: { type: 'user', id: 'u1' }, + }); + expect(out).toBe('/* req=req-1 origin=http org=org-1 actor=user:u1 */ '); + }); + + it('uses dash placeholder for null fields', () => { + const out = formatContextComment({ + requestId: 'req-1', + origin: 'system', + organizationId: null, + actor: { type: 'system', id: null }, + }); + expect(out).toBe('/* req=req-1 origin=system org=- actor=system:- */ '); + }); +}); + +describe('safeForComment', () => { + it('strips dangerous characters', () => { + expect(safeForComment('a*/b\nc')).toBe('abc'); + }); + it('returns dash for empty result', () => { + expect(safeForComment('***')).toBe('-'); + expect(safeForComment(null)).toBe('-'); + }); +}); diff --git a/packages/backend/src/context/kysely-plugin.ts b/packages/backend/src/context/kysely-plugin.ts new file mode 100644 index 00000000..fa1d4338 --- /dev/null +++ b/packages/backend/src/context/kysely-plugin.ts @@ -0,0 +1,45 @@ +import type { + KyselyPlugin, + PluginTransformQueryArgs, + PluginTransformResultArgs, + QueryResult, + RootOperationNode, + UnknownRow, +} from 'kysely'; + +/** + * v1 placeholder. The actual SQL comment injection happens at the pg.Pool level + * (see database/connection.ts) so the comment lands in front of the wire query + * without depending on Kysely AST internals (which differ per query kind). + * + * Kept as a stub so future work can move comment injection back into the plugin + * if Kysely exposes a stable RawNode-prefix API. + */ +export class ContextSqlCommentPlugin implements KyselyPlugin { + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + return args.node; + } + transformResult(args: PluginTransformResultArgs): Promise> { + return Promise.resolve(args.result); + } +} + +const SAFE_RE = /[^a-zA-Z0-9_:-]/g; + +export function safeForComment(value: string | null | undefined): string { + if (!value) return '-'; + const cleaned = value.replace(SAFE_RE, ''); + return cleaned.length > 0 ? cleaned : '-'; +} + +export function formatContextComment(ctx: { + requestId: string; + origin: string; + organizationId: string | null; + actor: { type: string; id: string | null }; +}): string { + const actor = `${safeForComment(ctx.actor.type)}:${safeForComment(ctx.actor.id)}`; + return `/* req=${safeForComment(ctx.requestId)} origin=${safeForComment( + ctx.origin + )} org=${safeForComment(ctx.organizationId)} actor=${actor} */ `; +} diff --git a/packages/backend/src/database/connection.ts b/packages/backend/src/database/connection.ts index 0c36f143..c7b71204 100644 --- a/packages/backend/src/database/connection.ts +++ b/packages/backend/src/database/connection.ts @@ -4,6 +4,8 @@ import dotenv from 'dotenv'; import path from 'path'; import { fileURLToPath } from 'url'; import type { Database } from './types.js'; +import { currentOrNull } from '@logtide/shared/context'; +import { formatContextComment } from '../context/kysely-plugin.js'; const { Pool } = pg; @@ -59,6 +61,64 @@ const poolConfig = { export const pool = new Pool(poolConfig); +// Context SQL comment injection: prepend /* req=... */ to every query string, +// transparently, so slow-query logs and pg_stat_activity carry the request id. +// Disabled when LOGTIDE_CONTEXT_SQL_COMMENT=false. +const originalQuery = pool.query.bind(pool); +(pool as unknown as { query: typeof pool.query }).query = function patchedQuery( + this: typeof pool, + ...args: any[] +): any { + if (process.env.LOGTIDE_CONTEXT_SQL_COMMENT === 'false') { + return (originalQuery as any)(...args); + } + const ctx = currentOrNull(); + if (!ctx) return (originalQuery as any)(...args); + + const comment = formatContextComment(ctx); + const first = args[0]; + if (typeof first === 'string') { + args[0] = comment + first; + } else if (first && typeof first === 'object' && typeof first.text === 'string') { + args[0] = { ...first, text: comment + first.text }; + } + return (originalQuery as any)(...args); +} as typeof pool.query; + +// Kysely uses pool.connect() to acquire a client and calls client.query() on it. +// Wrap connect() to install the same comment-injection on each connected client. +const originalConnect = pool.connect.bind(pool); +(pool as unknown as { connect: typeof pool.connect }).connect = (async function patchedConnect( + this: typeof pool, + ...args: any[] +): Promise { + const client = await (originalConnect as any)(...args); + // pool.connect(callback) returns void; bail if no client materialized. + if (!client || typeof client !== 'object') return client; + // Patch the client's query method (only once per client) + if (!(client as any).__logtideContextPatched) { + const clientOriginalQuery = client.query.bind(client); + (client as any).query = function patchedClientQuery(this: any, ...qArgs: any[]): any { + if (process.env.LOGTIDE_CONTEXT_SQL_COMMENT === 'false') { + return clientOriginalQuery(...qArgs); + } + const ctx = currentOrNull(); + if (!ctx) return clientOriginalQuery(...qArgs); + + const comment = formatContextComment(ctx); + const first = qArgs[0]; + if (typeof first === 'string') { + qArgs[0] = comment + first; + } else if (first && typeof first === 'object' && typeof first.text === 'string') { + qArgs[0] = { ...first, text: comment + first.text }; + } + return clientOriginalQuery(...qArgs); + }; + (client as any).__logtideContextPatched = true; + } + return client; +}) as any; + // Pool event handlers for monitoring pool.on('connect', () => { // Set statement timeout on each new connection diff --git a/packages/backend/src/modules/api-keys/service.ts b/packages/backend/src/modules/api-keys/service.ts index 986e4ea1..05242c74 100644 --- a/packages/backend/src/modules/api-keys/service.ts +++ b/packages/backend/src/modules/api-keys/service.ts @@ -38,6 +38,9 @@ interface CachedApiKey { } export class ApiKeysService { + private static readonly LAST_USED_DEBOUNCE_MS = 60_000; + private lastUsedWrites = new Map(); + /** * Hash an API key using SHA-256 */ @@ -145,10 +148,17 @@ export class ApiKeysService { } /** - * Update last_used timestamp asynchronously - * This is fire-and-forget to not block ingestion + * Update last_used timestamp asynchronously. + * Debounced per key: at most one DB write per 60s per process. + * last_used is only used for UI display, so minute-granularity is fine. */ private async updateLastUsedAsync(keyId: string): Promise { + const now = Date.now(); + const last = this.lastUsedWrites.get(keyId); + if (last !== undefined && now - last < ApiKeysService.LAST_USED_DEBOUNCE_MS) { + return; + } + this.lastUsedWrites.set(keyId, now); await db .updateTable('api_keys') .set({ last_used: new Date() }) diff --git a/packages/backend/src/queue/adapters/bullmq-adapter.ts b/packages/backend/src/queue/adapters/bullmq-adapter.ts index c6431c72..58edb623 100644 --- a/packages/backend/src/queue/adapters/bullmq-adapter.ts +++ b/packages/backend/src/queue/adapters/bullmq-adapter.ts @@ -15,6 +15,7 @@ import type { IJobOptions, JobProcessor, } from '../abstractions/types.js'; +import { attachContextToPayload } from '../../context/bullmq-context.js'; /** * Default job options for cleanup (prevents Redis memory bloat) @@ -60,9 +61,8 @@ export class BullMQQueueAdapter implements IQueueAdapter, ICronR } async add(jobName: string, data: T, options?: IJobOptions): Promise> { - // BullMQ has complex generic types, so we use `any` for the queue.add call - // and return our properly typed IJob - const bullJob = await (this.queue as any).add(jobName, data, { + const payload = attachContextToPayload(data); + const bullJob = await (this.queue as any).add(jobName, payload, { delay: options?.delay, attempts: options?.maxAttempts, priority: options?.priority, @@ -74,7 +74,7 @@ export class BullMQQueueAdapter implements IQueueAdapter, ICronR return { id: bullJob.id || '', - data: data, + data, // return original (un-wrapped) to caller name: bullJob.name, attemptsMade: bullJob.attemptsMade, timestamp: bullJob.timestamp ? new Date(bullJob.timestamp) : undefined, diff --git a/packages/backend/src/queue/adapters/graphile-adapter.ts b/packages/backend/src/queue/adapters/graphile-adapter.ts index 3ed999a6..fd358e91 100644 --- a/packages/backend/src/queue/adapters/graphile-adapter.ts +++ b/packages/backend/src/queue/adapters/graphile-adapter.ts @@ -25,6 +25,7 @@ import type { IJobOptions, JobProcessor, } from '../abstractions/types.js'; +import { attachContextToPayload } from '../../context/bullmq-context.js'; /** * Convert Graphile Job + payload to unified IJob interface @@ -55,10 +56,11 @@ export class GraphileQueueAdapter implements IQueueAdapter, ICro ) {} async add(_jobName: string, data: T, options?: IJobOptions): Promise> { + const payload = attachContextToPayload(data) as object; const job = await quickAddJob( { pgPool: this.pool }, - this.name, // Task identifier (queue name) - data as object, + this.name, + payload, { runAt: options?.delay ? new Date(Date.now() + options.delay) : undefined, maxAttempts: options?.maxAttempts ?? 3, diff --git a/packages/backend/src/queue/connection.ts b/packages/backend/src/queue/connection.ts index c1adbbc8..8552b67d 100644 --- a/packages/backend/src/queue/connection.ts +++ b/packages/backend/src/queue/connection.ts @@ -34,6 +34,7 @@ import { getQueueSystemStatus, } from './queue-factory.js'; import type { IQueueAdapter, IWorkerAdapter, QueueBackend, JobProcessor } from './abstractions/types.js'; +import { wrapProcessorWithContext } from '../context/bullmq-context.js'; // Determine backend based on environment const hasRedis = !!config.REDIS_URL; @@ -127,7 +128,8 @@ export function createWorker( name: string, processor: JobProcessor ): IWorkerAdapter { - return createWorkerImpl(name, processor); + const wrapped = wrapProcessorWithContext(name, processor); + return createWorkerImpl(name, wrapped); } /** diff --git a/packages/backend/src/scripts/seed-load-test.ts b/packages/backend/src/scripts/seed-load-test.ts index 06ae8d10..a3c9df5e 100644 --- a/packages/backend/src/scripts/seed-load-test.ts +++ b/packages/backend/src/scripts/seed-load-test.ts @@ -48,6 +48,7 @@ async function seedLoadTestData() { project_id: project.id, name: `Load Test Key ${Date.now()}`, key_hash: keyHash, + type: 'full', last_used: null, }) .execute(); @@ -120,6 +121,7 @@ async function seedLoadTestData() { project_id: project.id, name: 'Load Test API Key', key_hash: keyHash, + type: 'full', last_used: null, }) .execute(); diff --git a/packages/backend/src/server.ts b/packages/backend/src/server.ts index de80878f..808ff620 100644 --- a/packages/backend/src/server.ts +++ b/packages/backend/src/server.ts @@ -6,6 +6,7 @@ import { config, isRedisConfigured } from './config/index.js'; import { getConnection } from './queue/connection.js'; import { notificationManager } from './modules/streaming/index.js'; import authPlugin from './modules/auth/plugin.js'; +import { contextPlugin } from './context/index.js'; import { ingestionRoutes } from './modules/ingestion/index.js'; import { queryRoutes } from './modules/query/index.js'; import { alertsRoutes } from './modules/alerts/index.js'; @@ -182,6 +183,7 @@ export async function build(opts = {}) { await fastify.register(retentionRoutes, { prefix: '/api/v1/admin' }); await fastify.register(authPlugin); + await fastify.register(contextPlugin); await fastify.register(ingestionRoutes); await fastify.register(queryRoutes); await fastify.register(correlationRoutes, { prefix: '/api' }); diff --git a/packages/backend/src/tests/integration/context-propagation.test.ts b/packages/backend/src/tests/integration/context-propagation.test.ts new file mode 100644 index 00000000..3315ed1b --- /dev/null +++ b/packages/backend/src/tests/integration/context-propagation.test.ts @@ -0,0 +1,47 @@ +import { describe, it, expect } from 'vitest'; +import { context } from '@logtide/shared/context'; +import { wrapProcessorWithContext, attachContextToPayload } from '../../context/bullmq-context.js'; + +describe('Request context propagation (integration)', () => { + it('establishes a context for HTTP requests via contextPlugin', async () => { + const Fastify = (await import('fastify')).default; + const { contextPlugin } = await import('../../context/fastify-plugin.js'); + const app = Fastify({ logger: false }); + await app.register(contextPlugin); + let captured: string | null = null; + app.get('/__test/ctx', async () => { + captured = context.current().requestId; + return { ok: true }; + }); + const res = await app.inject({ method: 'GET', url: '/__test/ctx' }); + expect(res.statusCode).toBe(200); + expect(captured).toBeTruthy(); + await app.close(); + }); + + it('propagates the same requestId from producer to consumer', async () => { + const seen: { producer?: string; consumer?: string } = {}; + + const wrapped = wrapProcessorWithContext('test', async (_job) => { + seen.consumer = context.current().requestId; + }); + + await context.run( + { + requestId: 'req-corr', + origin: 'http', + actor: { type: 'system', id: null }, + organizationId: null, + projectId: null, + }, + async () => { + seen.producer = context.current().requestId; + const payload = attachContextToPayload({ foo: 1 }); + await wrapped({ id: 'j', name: 'test', data: payload as any }); + } + ); + + expect(seen.producer).toBe('req-corr'); + expect(seen.consumer).toBe('req-corr'); + }); +}); diff --git a/packages/backend/src/tests/modules/api-keys/last-used-debounce.test.ts b/packages/backend/src/tests/modules/api-keys/last-used-debounce.test.ts new file mode 100644 index 00000000..95c09dd8 --- /dev/null +++ b/packages/backend/src/tests/modules/api-keys/last-used-debounce.test.ts @@ -0,0 +1,113 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { ApiKeysService } from '../../../modules/api-keys/service.js'; +import { db } from '../../../database/connection.js'; +import { CacheManager } from '../../../utils/cache.js'; + +vi.mock('../../../database/connection.js', () => ({ + db: { + selectFrom: vi.fn(), + updateTable: vi.fn(), + insertInto: vi.fn(), + }, +})); + +vi.mock('../../../utils/cache.js', () => ({ + CacheManager: { + apiKeyKey: vi.fn((h) => `api_key:${h}`), + get: vi.fn(), + set: vi.fn(), + invalidateApiKey: vi.fn(), + }, + CACHE_TTL: { API_KEY: 60 }, +})); + +describe('ApiKeysService - last_used debounce', () => { + let service: ApiKeysService; + let updateCount = 0; + let updateExecuteMock: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + updateCount = 0; + updateExecuteMock = vi.fn(async () => { + updateCount++; + }); + + // Fluent chain: .updateTable().set().where().execute() + (db.updateTable as ReturnType).mockImplementation(() => ({ + set: () => ({ where: () => ({ execute: updateExecuteMock }) }), + })); + + // Cache hit returns a valid cached api key + (CacheManager.get as ReturnType).mockResolvedValue({ + keyId: 'key-1', + projectId: 'proj-1', + organizationId: 'org-1', + type: 'full', + allowedOrigins: null, + }); + + service = new ApiKeysService(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('writes on first verification', async () => { + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + expect(updateCount).toBe(1); + }); + + it('skips writes within 60s window for same key', async () => { + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + + vi.advanceTimersByTime(30_000); // 30s later + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + + vi.advanceTimersByTime(20_000); // 50s total + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + + expect(updateCount).toBe(1); + }); + + it('writes again after 60s window passes', async () => { + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + + vi.advanceTimersByTime(61_000); // 61s later + await service.verifyApiKey('lp_test'); + await vi.advanceTimersByTimeAsync(0); + + expect(updateCount).toBe(2); + }); + + it('does not block writes for different keys', async () => { + (CacheManager.get as ReturnType).mockResolvedValueOnce({ + keyId: 'key-A', + projectId: 'p', + organizationId: 'o', + type: 'full', + allowedOrigins: null, + }); + await service.verifyApiKey('lp_a'); + await vi.advanceTimersByTimeAsync(0); + + (CacheManager.get as ReturnType).mockResolvedValueOnce({ + keyId: 'key-B', + projectId: 'p', + organizationId: 'o', + type: 'full', + allowedOrigins: null, + }); + await service.verifyApiKey('lp_b'); + await vi.advanceTimersByTimeAsync(0); + + expect(updateCount).toBe(2); + }); +}); diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index db7a8905..74cb1577 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -1,6 +1,7 @@ import { readFileSync } from 'fs'; import { fileURLToPath } from 'url'; import path from 'path'; +import { context } from '@logtide/shared/context'; import { createWorker, startQueueWorkers, shutdownQueueSystem, getQueueBackend } from './queue/connection.js'; import { processAlertNotification, type AlertNotificationData } from './queue/jobs/alert-notification.js'; import { processSigmaDetection, type SigmaDetectionData } from './queue/jobs/sigma-detection.js'; @@ -316,64 +317,66 @@ let isCheckingAlerts = false; // Schedule alert checking every minute async function checkAlerts() { - // CRITICAL: Skip if already checking (prevent race condition) - if (isCheckingAlerts) { - console.warn('Alert check already in progress, skipping...'); - return; - } + await context.runAsSystem('cron:check-alerts', async () => { + // CRITICAL: Skip if already checking (prevent race condition) + if (isCheckingAlerts) { + console.warn('Alert check already in progress, skipping...'); + return; + } - isCheckingAlerts = true; - const checkStartTime = Date.now(); + isCheckingAlerts = true; + const checkStartTime = Date.now(); - try { + try { - const triggeredAlerts = await alertsService.checkAlertRules(); - const checkDuration = Date.now() - checkStartTime; + const triggeredAlerts = await alertsService.checkAlertRules(); + const checkDuration = Date.now() - checkStartTime; - if (triggeredAlerts.length > 0) { + if (triggeredAlerts.length > 0) { - if (isInternalLoggingEnabled()) { - hub.captureLog('warn', `${triggeredAlerts.length} alert(s) triggered`, { - alertCount: triggeredAlerts.length, - alertRuleIds: triggeredAlerts.map((a) => a.rule_id), - checkDuration_ms: checkDuration, - }); - } + if (isInternalLoggingEnabled()) { + hub.captureLog('warn', `${triggeredAlerts.length} alert(s) triggered`, { + alertCount: triggeredAlerts.length, + alertRuleIds: triggeredAlerts.map((a) => a.rule_id), + checkDuration_ms: checkDuration, + }); + } - // Add notification jobs to queue - const { createQueue } = await import('./queue/connection.js'); - const notificationQueue = createQueue('alert-notifications'); + // Add notification jobs to queue + const { createQueue } = await import('./queue/connection.js'); + const notificationQueue = createQueue('alert-notifications'); - for (const alert of triggeredAlerts) { - await notificationQueue.add('send-notification', alert); + for (const alert of triggeredAlerts) { + await notificationQueue.add('send-notification', alert); + if (isInternalLoggingEnabled()) { + hub.captureLog('info', `Alert notification queued`, { + alertRuleId: alert.rule_id, + ruleName: alert.rule_name, + logCount: alert.log_count, + }); + } + } + } else { if (isInternalLoggingEnabled()) { - hub.captureLog('info', `Alert notification queued`, { - alertRuleId: alert.rule_id, - ruleName: alert.rule_name, - logCount: alert.log_count, + hub.captureLog('debug', `Alert check completed, no alerts triggered`, { + checkDuration_ms: checkDuration, }); } } - } else { + } catch (error) { + console.error('Error checking alerts:', error); + if (isInternalLoggingEnabled()) { - hub.captureLog('debug', `Alert check completed, no alerts triggered`, { - checkDuration_ms: checkDuration, + hub.captureLog('error', `Failed to check alert rules: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, }); } + } finally { + // CRITICAL: Always release lock + isCheckingAlerts = false; } - } catch (error) { - console.error('Error checking alerts:', error); - - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Failed to check alert rules: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); - } - } finally { - // CRITICAL: Always release lock - isCheckingAlerts = false; - } + }); } // Run alert check every minute @@ -387,34 +390,36 @@ let isAutoGrouping = false; // Schedule incident auto-grouping every 5 minutes async function runAutoGrouping() { - // Skip if already running - if (isAutoGrouping) { - console.warn('Auto-grouping already in progress, skipping...'); - return; - } + await context.runAsSystem('cron:auto-grouping', async () => { + // Skip if already running + if (isAutoGrouping) { + console.warn('Auto-grouping already in progress, skipping...'); + return; + } - isAutoGrouping = true; + isAutoGrouping = true; - try { - const { createQueue } = await import('./queue/connection.js'); - const autoGroupQueue = createQueue('incident-autogrouping'); + try { + const { createQueue } = await import('./queue/connection.js'); + const autoGroupQueue = createQueue('incident-autogrouping'); - await autoGroupQueue.add('group-incidents', {}); + await autoGroupQueue.add('group-incidents', {}); - if (isInternalLoggingEnabled()) { - hub.captureLog('info', `Incident auto-grouping job scheduled`); - } - } catch (error) { - console.error('Error scheduling auto-grouping:', error); + if (isInternalLoggingEnabled()) { + hub.captureLog('info', `Incident auto-grouping job scheduled`); + } + } catch (error) { + console.error('Error scheduling auto-grouping:', error); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Failed to schedule auto-grouping: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Failed to schedule auto-grouping: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); + } + } finally { + isAutoGrouping = false; } - } finally { - isAutoGrouping = false; - } + }); } // Run auto-grouping every 5 minutes @@ -428,30 +433,32 @@ setTimeout(runAutoGrouping, 10000); // ============================================================================ async function updateEnrichmentDatabases() { - try { - const results = await enrichmentService.updateDatabasesIfNeeded(); + await context.runAsSystem('cron:enrichment-update', async () => { + try { + const results = await enrichmentService.updateDatabasesIfNeeded(); - if (results.geoLite2) { - console.log('[Worker] GeoLite2 database updated'); - if (isInternalLoggingEnabled()) { - hub.captureLog('info', 'GeoLite2 database updated successfully'); + if (results.geoLite2) { + console.log('[Worker] GeoLite2 database updated'); + if (isInternalLoggingEnabled()) { + hub.captureLog('info', 'GeoLite2 database updated successfully'); + } } - } - if (results.ipsum) { - console.log('[Worker] IPsum database updated'); + if (results.ipsum) { + console.log('[Worker] IPsum database updated'); + if (isInternalLoggingEnabled()) { + hub.captureLog('info', 'IPsum database updated successfully'); + } + } + } catch (error) { + console.error('Error updating enrichment databases:', error); if (isInternalLoggingEnabled()) { - hub.captureLog('info', 'IPsum database updated successfully'); + hub.captureLog('error', `Failed to update databases: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); } } - } catch (error) { - console.error('Error updating enrichment databases:', error); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Failed to update databases: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); - } - } + }); } // Run database updates every 24 hours @@ -467,54 +474,56 @@ setTimeout(updateEnrichmentDatabases, 30000); let isRunningRetentionCleanup = false; async function runRetentionCleanup() { - // Skip if already running - if (isRunningRetentionCleanup) { - console.warn('Retention cleanup already in progress, skipping...'); - return; - } + await context.runAsSystem('cron:retention-cleanup', async () => { + // Skip if already running + if (isRunningRetentionCleanup) { + console.warn('Retention cleanup already in progress, skipping...'); + return; + } - isRunningRetentionCleanup = true; - const startTime = Date.now(); + isRunningRetentionCleanup = true; + const startTime = Date.now(); - try { - console.log('[Worker] Starting retention cleanup...'); - const summary = await retentionService.executeRetentionForAllOrganizations(); - const duration = Date.now() - startTime; - - console.log(`[Worker] Retention cleanup completed: ${summary.totalLogsDeleted} logs deleted from ${summary.successfulOrganizations}/${summary.totalOrganizations} orgs in ${duration}ms`); - - if (isInternalLoggingEnabled()) { - hub.captureLog('info', 'Retention cleanup completed', { - totalOrganizations: summary.totalOrganizations, - successfulOrganizations: summary.successfulOrganizations, - failedOrganizations: summary.failedOrganizations, - totalLogsDeleted: summary.totalLogsDeleted, - duration_ms: duration, - }); - } + try { + console.log('[Worker] Starting retention cleanup...'); + const summary = await retentionService.executeRetentionForAllOrganizations(); + const duration = Date.now() - startTime; + + console.log(`[Worker] Retention cleanup completed: ${summary.totalLogsDeleted} logs deleted from ${summary.successfulOrganizations}/${summary.totalOrganizations} orgs in ${duration}ms`); - // Log any failures - for (const result of summary.results.filter(r => r.error)) { - console.error(`Retention failed for org ${result.organizationName}: ${result.error}`); if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Retention failed for org ${result.organizationName}`, { - organizationId: result.organizationId, - organizationName: result.organizationName, - error: result.error, + hub.captureLog('info', 'Retention cleanup completed', { + totalOrganizations: summary.totalOrganizations, + successfulOrganizations: summary.successfulOrganizations, + failedOrganizations: summary.failedOrganizations, + totalLogsDeleted: summary.totalLogsDeleted, + duration_ms: duration, }); } - } - } catch (error) { - console.error('Retention cleanup failed:', error); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Retention cleanup failed: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); + // Log any failures + for (const result of summary.results.filter(r => r.error)) { + console.error(`Retention failed for org ${result.organizationName}: ${result.error}`); + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Retention failed for org ${result.organizationName}`, { + organizationId: result.organizationId, + organizationName: result.organizationName, + error: result.error, + }); + } + } + } catch (error) { + console.error('Retention cleanup failed:', error); + + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Retention cleanup failed: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); + } + } finally { + isRunningRetentionCleanup = false; } - } finally { - isRunningRetentionCleanup = false; - } + }); } // Calculate milliseconds until next 2 AM @@ -558,67 +567,69 @@ setTimeout(runRetentionCleanup, 2 * 60 * 1000); let isSyncingSigmaRules = false; async function syncSigmaRules() { - if (isSyncingSigmaRules) { - console.warn('[Worker] SigmaHQ sync already in progress, skipping...'); - return; - } - - isSyncingSigmaRules = true; - - try { - const orgs = await db - .selectFrom('sigma_rules') - .select('organization_id') - .distinct() - .where('sigmahq_path', 'is not', null) - .execute(); - - if (orgs.length === 0) { - console.log('[Worker] No organizations with SigmaHQ rules, skipping sync'); + await context.runAsSystem('cron:sigma-sync', async () => { + if (isSyncingSigmaRules) { + console.warn('[Worker] SigmaHQ sync already in progress, skipping...'); return; } - console.log(`[Worker] Starting SigmaHQ sync for ${orgs.length} organization(s)`); + isSyncingSigmaRules = true; - for (const org of orgs) { - try { - const result = await sigmaSyncService.syncFromSigmaHQ({ - organizationId: org.organization_id, - autoCreateAlerts: true, - }); + try { + const orgs = await db + .selectFrom('sigma_rules') + .select('organization_id') + .distinct() + .where('sigmahq_path', 'is not', null) + .execute(); - console.log(`[Worker] SigmaHQ sync for org ${org.organization_id}: ${result.imported} imported, ${result.skipped} skipped, ${result.failed} failed`); + if (orgs.length === 0) { + console.log('[Worker] No organizations with SigmaHQ rules, skipping sync'); + return; + } - if (isInternalLoggingEnabled()) { - hub.captureLog('info', `SigmaHQ sync completed for org ${org.organization_id}`, { - organizationId: org.organization_id, - imported: result.imported, - skipped: result.skipped, - failed: result.failed, - }); - } - } catch (orgError) { - console.error(`[Worker] SigmaHQ sync failed for org ${org.organization_id}:`, orgError); + console.log(`[Worker] Starting SigmaHQ sync for ${orgs.length} organization(s)`); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `SigmaHQ sync failed for org ${org.organization_id}: ${(orgError as Error).message}`, { + for (const org of orgs) { + try { + const result = await sigmaSyncService.syncFromSigmaHQ({ organizationId: org.organization_id, - error: orgError instanceof Error ? { name: orgError.name, message: orgError.message, stack: orgError.stack } : { message: String(orgError) }, + autoCreateAlerts: true, }); + + console.log(`[Worker] SigmaHQ sync for org ${org.organization_id}: ${result.imported} imported, ${result.skipped} skipped, ${result.failed} failed`); + + if (isInternalLoggingEnabled()) { + hub.captureLog('info', `SigmaHQ sync completed for org ${org.organization_id}`, { + organizationId: org.organization_id, + imported: result.imported, + skipped: result.skipped, + failed: result.failed, + }); + } + } catch (orgError) { + console.error(`[Worker] SigmaHQ sync failed for org ${org.organization_id}:`, orgError); + + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `SigmaHQ sync failed for org ${org.organization_id}: ${(orgError as Error).message}`, { + organizationId: org.organization_id, + error: orgError instanceof Error ? { name: orgError.name, message: orgError.message, stack: orgError.stack } : { message: String(orgError) }, + }); + } } } - } - } catch (error) { - console.error('[Worker] SigmaHQ sync aborted:', error); + } catch (error) { + console.error('[Worker] SigmaHQ sync aborted:', error); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `SigmaHQ sync aborted: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `SigmaHQ sync aborted: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); + } + } finally { + isSyncingSigmaRules = false; } - } finally { - isSyncingSigmaRules = false; - } + }); } function scheduleNextSigmaSync() { @@ -643,20 +654,22 @@ scheduleNextSigmaSync(); let isRunningMonitorChecks = false; async function runMonitorChecks() { - if (isRunningMonitorChecks) return; - isRunningMonitorChecks = true; - try { - await monitorService.runAllDueChecks(); - } catch (error) { - console.error('[Worker] Monitor check error:', error); - if (isInternalLoggingEnabled()) { - hub.captureLog('error', `Monitor check failed: ${(error as Error).message}`, { - error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, - }); + await context.runAsSystem('cron:monitor-checks', async () => { + if (isRunningMonitorChecks) return; + isRunningMonitorChecks = true; + try { + await monitorService.runAllDueChecks(); + } catch (error) { + console.error('[Worker] Monitor check error:', error); + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Monitor check failed: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); + } + } finally { + isRunningMonitorChecks = false; } - } finally { - isRunningMonitorChecks = false; - } + }); } // Run checks every 30 seconds @@ -671,15 +684,17 @@ runMonitorChecks(); let isRunningMaintenanceCheck = false; async function runMaintenanceTransitions() { - if (isRunningMaintenanceCheck) return; - isRunningMaintenanceCheck = true; - try { - await maintenanceService.processMaintenanceTransitions(); - } catch (error) { - console.error('[Worker] Maintenance transition error:', error); - } finally { - isRunningMaintenanceCheck = false; - } + await context.runAsSystem('cron:maintenance-transitions', async () => { + if (isRunningMaintenanceCheck) return; + isRunningMaintenanceCheck = true; + try { + await maintenanceService.processMaintenanceTransitions(); + } catch (error) { + console.error('[Worker] Maintenance transition error:', error); + } finally { + isRunningMaintenanceCheck = false; + } + }); } setInterval(runMaintenanceTransitions, 60000); diff --git a/packages/backend/tsconfig.json b/packages/backend/tsconfig.json index 9a2948a2..15e0a597 100644 --- a/packages/backend/tsconfig.json +++ b/packages/backend/tsconfig.json @@ -5,7 +5,7 @@ "rootDir": "./src", "baseUrl": ".", "module": "ES2022", - "moduleResolution": "node", + "moduleResolution": "bundler", "preserveSymlinks": true, "lib": [ "ES2022", diff --git a/packages/reservoir/src/engines/clickhouse/clickhouse-engine.ts b/packages/reservoir/src/engines/clickhouse/clickhouse-engine.ts index 1e789368..347c7317 100644 --- a/packages/reservoir/src/engines/clickhouse/clickhouse-engine.ts +++ b/packages/reservoir/src/engines/clickhouse/clickhouse-engine.ts @@ -1,6 +1,28 @@ -import { createClient, type ClickHouseClient } from '@clickhouse/client'; +import { createClient, type ClickHouseClient, type DataFormat, type InsertParams } from '@clickhouse/client'; import { randomUUID } from 'crypto'; +import { currentOrNull } from '@logtide/shared/context'; import { StorageEngine } from '../../core/storage-engine.js'; + +const CH_SAFE_RE = /[^a-zA-Z0-9_:-]/g; +function chSafe(v: string | null | undefined): string { + if (!v) return '-'; + const c = v.replace(CH_SAFE_RE, ''); + return c.length > 0 ? c : '-'; +} +function chCtxComment(): string { + if (process.env.LOGTIDE_CONTEXT_SQL_COMMENT === 'false') return ''; + const ctx = currentOrNull(); + if (!ctx) return ''; + return `/* req=${chSafe(ctx.requestId)} origin=${chSafe(ctx.origin)} org=${chSafe( + ctx.organizationId + )} actor=${chSafe(ctx.actor.type)}:${chSafe(ctx.actor.id)} */ `; +} +function chQueryId(operation: string): string | undefined { + const ctx = currentOrNull(); + if (!ctx) return undefined; + // ClickHouse query_id has a 100-char limit; keep it tight. + return `${chSafe(ctx.requestId)}-${chSafe(operation)}`.slice(0, 100); +} import type { LogRecord, LogLevel, @@ -118,7 +140,7 @@ export class ClickHouseEngine extends StorageEngine { async healthCheck(): Promise { const start = Date.now(); try { - await this.getClient().query({ query: 'SELECT 1', format: 'JSONEachRow' }); + await this.runQuery({ query: 'SELECT 1', format: 'JSONEachRow' }, 'health-check'); const responseTimeMs = Date.now() - start; let status: HealthStatus['status'] = 'healthy'; if (responseTimeMs >= 200) status = 'unhealthy'; @@ -152,10 +174,9 @@ export class ClickHouseEngine extends StorageEngine { await bootstrapClient.close(); } - const client = this.getClient(); const t = this.tableName; - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS ${t} ( id UUID DEFAULT generateUUIDv4(), @@ -177,7 +198,7 @@ export class ClickHouseEngine extends StorageEngine { }); try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD INDEX IF NOT EXISTS idx_message_fulltext message TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1`, }); } catch { @@ -185,7 +206,7 @@ export class ClickHouseEngine extends StorageEngine { } try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD INDEX IF NOT EXISTS idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 1`, }); } catch { @@ -195,10 +216,10 @@ export class ClickHouseEngine extends StorageEngine { // Bloom filter on id - lets getByIds skip data granules that don't contain // any of the requested UUIDs without a full project scan. try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD INDEX IF NOT EXISTS idx_id id TYPE bloom_filter(0.01) GRANULARITY 1`, }); - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} MATERIALIZE INDEX idx_id`, }); } catch { @@ -206,7 +227,7 @@ export class ClickHouseEngine extends StorageEngine { } try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD INDEX IF NOT EXISTS idx_span_id span_id TYPE bloom_filter(0.01) GRANULARITY 1`, }); } catch { @@ -215,10 +236,10 @@ export class ClickHouseEngine extends StorageEngine { // Projection for fast service+level filtered queries try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD PROJECTION IF NOT EXISTS proj_service_time (SELECT * ORDER BY project_id, service, level, time)`, }); - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} MATERIALIZE PROJECTION proj_service_time`, }); } catch { @@ -229,10 +250,10 @@ export class ClickHouseEngine extends StorageEngine { // Eliminates JSONExtractString() calls on every DISTINCT/filter query row. // MATERIALIZE backfills existing data parts asynchronously during merges. try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} ADD COLUMN IF NOT EXISTS hostname String MATERIALIZED JSONExtractString(metadata, 'hostname')`, }); - await client.command({ + await this.runCommand({ query: `ALTER TABLE ${t} MATERIALIZE COLUMN hostname`, }); } catch { @@ -240,7 +261,7 @@ export class ClickHouseEngine extends StorageEngine { } // Spans table - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS spans ( time DateTime64(3) NOT NULL, @@ -270,29 +291,29 @@ export class ClickHouseEngine extends StorageEngine { }); try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE spans ADD INDEX IF NOT EXISTS idx_spans_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 1`, }); } catch { /* index may already exist */ } try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE spans ADD INDEX IF NOT EXISTS idx_spans_parent parent_span_id TYPE bloom_filter(0.01) GRANULARITY 1`, }); } catch { /* index may already exist */ } // Projection for fast service_name filtered span queries try { - await client.command({ + await this.runCommand({ query: `ALTER TABLE spans ADD PROJECTION IF NOT EXISTS proj_service_time (SELECT * ORDER BY project_id, service_name, status_code, time)`, }); - await client.command({ + await this.runCommand({ query: `ALTER TABLE spans MATERIALIZE PROJECTION proj_service_time`, }); } catch { /* projection may already exist */ } // Traces table (ReplacingMergeTree for upsert semantics) - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS traces ( trace_id String NOT NULL, @@ -315,7 +336,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Metrics table - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS metrics ( time DateTime64(3) NOT NULL, @@ -340,7 +361,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Metric exemplars table - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS metric_exemplars ( time DateTime64(3) NOT NULL, @@ -362,7 +383,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Metrics hourly rollup (target table for materialized view) - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS metrics_hourly_rollup ( bucket DateTime NOT NULL, @@ -382,7 +403,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Materialized view: auto-populates hourly rollup on insert to metrics - await client.command({ + await this.runCommand({ query: ` CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_hourly_rollup_mv TO metrics_hourly_rollup AS @@ -402,7 +423,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Metrics daily rollup - await client.command({ + await this.runCommand({ query: ` CREATE TABLE IF NOT EXISTS metrics_daily_rollup ( bucket DateTime NOT NULL, @@ -422,7 +443,7 @@ export class ClickHouseEngine extends StorageEngine { }); // Materialized view: auto-populates daily rollup on insert to metrics - await client.command({ + await this.runCommand({ query: ` CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_daily_rollup_mv TO metrics_daily_rollup AS @@ -452,11 +473,10 @@ export class ClickHouseEngine extends StorageEngine { } const start = Date.now(); - const client = this.getClient(); try { const values = logs.map((log) => this.toClickHouseRow(log)); - await client.insert({ table: this.tableName, values, format: 'JSONEachRow' }); + await this.runInsert({ table: this.tableName, values, format: 'JSONEachRow' }, 'ingest-logs'); return { ingested: logs.length, failed: 0, durationMs: Date.now() - start }; } catch (err) { return { @@ -474,7 +494,6 @@ export class ClickHouseEngine extends StorageEngine { } const start = Date.now(); - const client = this.getClient(); // Use provided IDs or generate client-side since ClickHouse has no RETURNING const logsWithIds = logs.map((log) => ({ @@ -483,7 +502,7 @@ export class ClickHouseEngine extends StorageEngine { })); try { - await client.insert({ table: this.tableName, values: logsWithIds, format: 'JSONEachRow' }); + await this.runInsert({ table: this.tableName, values: logsWithIds, format: 'JSONEachRow' }, 'ingest-logs-returning'); const rows: StoredLogRecord[] = logsWithIds.map((row, i) => ({ id: row.id, @@ -512,12 +531,11 @@ export class ClickHouseEngine extends StorageEngine { async query(params: QueryParams): Promise> { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateQuery(params); const limit = (native.metadata?.limit as number) ?? 50; const offset = params.offset ?? 0; - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: native.query as string, query_params: (native.parameters as Record[])[0], format: 'JSONEachRow', @@ -549,10 +567,9 @@ export class ClickHouseEngine extends StorageEngine { async aggregate(params: AggregateParams): Promise { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateAggregate(params); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: native.query as string, query_params: (native.parameters as Record[])[0], format: 'JSONEachRow', @@ -587,8 +604,7 @@ export class ClickHouseEngine extends StorageEngine { } async getById(params: GetByIdParams): Promise { - const client = this.getClient(); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM ${this.tableName} WHERE id = {p_id:UUID} AND project_id = {p_project_id:String} LIMIT 1`, query_params: { p_id: params.id, p_project_id: params.projectId }, format: 'JSONEachRow', @@ -599,8 +615,7 @@ export class ClickHouseEngine extends StorageEngine { async getByIds(params: GetByIdsParams): Promise { if (params.ids.length === 0) return []; - const client = this.getClient(); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM ${this.tableName} WHERE id IN {p_ids:Array(UUID)} AND project_id = {p_project_id:String} ORDER BY time DESC`, query_params: { p_ids: params.ids, p_project_id: params.projectId }, format: 'JSONEachRow', @@ -611,9 +626,8 @@ export class ClickHouseEngine extends StorageEngine { async count(params: CountParams): Promise { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateCount(params); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: native.query as string, query_params: (native.parameters as Record[])[0], format: 'JSONEachRow', @@ -638,9 +652,8 @@ export class ClickHouseEngine extends StorageEngine { async distinct(params: DistinctParams): Promise { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateDistinct(params); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: native.query as string, query_params: (native.parameters as Record[])[0], format: 'JSONEachRow', @@ -654,9 +667,8 @@ export class ClickHouseEngine extends StorageEngine { async topValues(params: TopValuesParams): Promise { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateTopValues(params); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: native.query as string, query_params: (native.parameters as Record[])[0], format: 'JSONEachRow', @@ -673,10 +685,9 @@ export class ClickHouseEngine extends StorageEngine { async deleteByTimeRange(params: DeleteByTimeRangeParams): Promise { const start = Date.now(); - const client = this.getClient(); const native = this.translator.translateDelete(params); // ClickHouse mutations are async - the command returns immediately - await client.command({ + await this.runCommand({ query: native.query as string, query_params: (native.parameters as Record[])[0], }); @@ -713,6 +724,40 @@ export class ClickHouseEngine extends StorageEngine { return this.client; } + private async runCommand(args: Parameters[0], op = 'cmd') { + const client = this.getClient(); + const final = { ...args, query: chCtxComment() + args.query, query_id: (args as any).query_id ?? chQueryId(op) }; + return client.command(final); + } + + private async runQuery( + args: { query: string; format?: Format; [k: string]: unknown }, + op = 'query', + ) { + const client = this.getClient(); + const final = { ...args, query: chCtxComment() + args.query, query_id: (args as any).query_id ?? chQueryId(op) }; + // The spread loses the format literal; cast via `any` to preserve the Format generic + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return client.query(final as any); + } + + private async runInsert(args: InsertParams, op = 'insert') { + const client = this.getClient(); + const queryId = chQueryId(op); + const final: typeof args = { ...args }; + if (queryId) { + (final as any).query_id = queryId; + } + const comment = chCtxComment().trim(); + if (comment) { + (final as any).clickhouse_settings = { + ...((args as any).clickhouse_settings as object | undefined), + log_comment: comment, + }; + } + return client.insert(final); + } + private toClickHouseRow(log: LogRecord): Record { const row: Record = { time: log.time.getTime(), @@ -739,7 +784,6 @@ export class ClickHouseEngine extends StorageEngine { if (spans.length === 0) return { ingested: 0, failed: 0, durationMs: 0 }; const start = Date.now(); - const client = this.getClient(); try { const values = spans.map((span) => ({ @@ -763,7 +807,7 @@ export class ClickHouseEngine extends StorageEngine { resource_attributes: span.resourceAttributes ? JSON.stringify(span.resourceAttributes) : '{}', })); - await client.insert({ table: 'spans', values, format: 'JSONEachRow' }); + await this.runInsert({ table: 'spans', values, format: 'JSONEachRow' }, 'ingest-spans'); return { ingested: spans.length, failed: 0, durationMs: Date.now() - start }; } catch (err) { return { @@ -776,11 +820,9 @@ export class ClickHouseEngine extends StorageEngine { } async upsertTrace(trace: TraceRecord): Promise { - const client = this.getClient(); - // ReplacingMergeTree handles dedup by (project_id, trace_id) using updated_at // We read the existing row, merge, and insert the merged version - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT trace_id, start_time, end_time, span_count, error FROM traces FINAL WHERE trace_id = {traceId:String} AND project_id = {projectId:String}`, @@ -811,7 +853,7 @@ export class ClickHouseEngine extends StorageEngine { const durationMs = endTime.getTime() - startTime.getTime(); - await client.insert({ + await this.runInsert({ table: 'traces', values: [{ trace_id: trace.traceId, @@ -827,12 +869,11 @@ export class ClickHouseEngine extends StorageEngine { error: error ? 1 : 0, }], format: 'JSONEachRow', - }); + }, 'upsert-trace'); } async querySpans(params: SpanQueryParams): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -879,14 +920,14 @@ export class ClickHouseEngine extends StorageEngine { const sortBy = ALLOWED_SORT_COLUMNS.has(params.sortBy ?? '') ? params.sortBy! : 'start_time'; const sortOrder = ALLOWED_SORT_ORDERS.has((params.sortOrder ?? '').toLowerCase()) ? params.sortOrder!.toUpperCase() : 'ASC'; - const countResult = await client.query({ + const countResult = await this.runQuery({ query: `SELECT count() AS count FROM spans ${where}`, query_params: queryParams, format: 'JSONEachRow', }); const total = Number((await countResult.json<{ count: string }>())[0]?.count ?? 0); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM spans ${where} ORDER BY ${sortBy} ${sortOrder} LIMIT ${limit} OFFSET ${offset}`, query_params: queryParams, format: 'JSONEachRow', @@ -904,8 +945,7 @@ export class ClickHouseEngine extends StorageEngine { } async getSpansByTraceId(traceId: string, projectId: string): Promise { - const client = this.getClient(); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM spans WHERE trace_id = {traceId:String} AND project_id = {projectId:String} ORDER BY start_time ASC`, query_params: { traceId, projectId }, format: 'JSONEachRow', @@ -916,7 +956,6 @@ export class ClickHouseEngine extends StorageEngine { async queryTraces(params: TraceQueryParams): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -954,14 +993,14 @@ export class ClickHouseEngine extends StorageEngine { const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; // Use FINAL to get deduplicated rows from ReplacingMergeTree - const countResult = await client.query({ + const countResult = await this.runQuery({ query: `SELECT count() AS count FROM traces FINAL ${where}`, query_params: queryParams, format: 'JSONEachRow', }); const total = Number((await countResult.json<{ count: string }>())[0]?.count ?? 0); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM traces FINAL ${where} ORDER BY start_time DESC LIMIT ${limit} OFFSET ${offset}`, query_params: queryParams, format: 'JSONEachRow', @@ -979,8 +1018,7 @@ export class ClickHouseEngine extends StorageEngine { } async getTraceById(traceId: string, projectId: string): Promise { - const client = this.getClient(); - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM traces FINAL WHERE trace_id = {traceId:String} AND project_id = {projectId:String}`, query_params: { traceId, projectId }, format: 'JSONEachRow', @@ -994,7 +1032,6 @@ export class ClickHouseEngine extends StorageEngine { from?: Date, to?: Date, ): Promise { - const client = this.getClient(); const queryParams: Record = { projectId }; let timeFilter = ''; @@ -1007,7 +1044,7 @@ export class ClickHouseEngine extends StorageEngine { queryParams.p_to = toDateTime64(to); } - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: ` SELECT parent.service_name AS source_service, @@ -1052,7 +1089,6 @@ export class ClickHouseEngine extends StorageEngine { async deleteSpansByTimeRange(params: DeleteSpansByTimeRangeParams): Promise { const start = Date.now(); - const client = this.getClient(); const pids = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; const conditions = [ @@ -1073,7 +1109,7 @@ export class ClickHouseEngine extends StorageEngine { } // ClickHouse mutations are async - await client.command({ + await this.runCommand({ query: `ALTER TABLE spans DELETE WHERE ${conditions.join(' AND ')}`, query_params: queryParams, }); @@ -1089,7 +1125,6 @@ export class ClickHouseEngine extends StorageEngine { if (metrics.length === 0) return { ingested: 0, failed: 0, durationMs: 0 }; const start = Date.now(); - const client = this.getClient(); try { const metricRows: Record[] = []; @@ -1132,10 +1167,10 @@ export class ClickHouseEngine extends StorageEngine { } } - await client.insert({ table: 'metrics', values: metricRows, format: 'JSONEachRow' }); + await this.runInsert({ table: 'metrics', values: metricRows, format: 'JSONEachRow' }, 'ingest-metrics'); if (exemplarRows.length > 0) { - await client.insert({ table: 'metric_exemplars', values: exemplarRows, format: 'JSONEachRow' }); + await this.runInsert({ table: 'metric_exemplars', values: exemplarRows, format: 'JSONEachRow' }, 'ingest-metric-exemplars'); } return { ingested: metrics.length, failed: 0, durationMs: Date.now() - start }; @@ -1151,7 +1186,6 @@ export class ClickHouseEngine extends StorageEngine { async queryMetrics(params: MetricQueryParams): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -1207,7 +1241,7 @@ export class ClickHouseEngine extends StorageEngine { const sortOrder = params.sortOrder ?? 'desc'; // Count total - const countResult = await client.query({ + const countResult = await this.runQuery({ query: `SELECT count() AS count FROM metrics ${where}`, query_params: queryParams, format: 'JSONEachRow', @@ -1215,7 +1249,7 @@ export class ClickHouseEngine extends StorageEngine { const total = Number((await countResult.json<{ count: string }>())[0]?.count ?? 0); // Fetch rows - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT * FROM metrics ${where} ORDER BY time ${sortOrder} LIMIT ${limit} OFFSET ${offset}`, query_params: queryParams, format: 'JSONEachRow', @@ -1228,7 +1262,7 @@ export class ClickHouseEngine extends StorageEngine { if (params.includeExemplars) { const metricIds = metricsResult.filter(m => m.hasExemplars).map(m => m.id); if (metricIds.length > 0) { - const exemplarResult = await client.query({ + const exemplarResult = await this.runQuery({ query: `SELECT * FROM metric_exemplars WHERE metric_id IN {p_mids:Array(String)} ORDER BY time ASC`, query_params: { p_mids: metricIds }, format: 'JSONEachRow', @@ -1275,8 +1309,6 @@ export class ClickHouseEngine extends StorageEngine { return this.aggregateMetricsFromRollup(params, start); } - const client = this.getClient(); - const intervalMap: Record = { '1m': '1 MINUTE', '5m': '5 MINUTE', @@ -1368,7 +1400,7 @@ export class ClickHouseEngine extends StorageEngine { const query = `SELECT ${selectCols} FROM metrics ${where} GROUP BY ${groupByColumns.join(', ')} ORDER BY bucket ASC`; - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query, query_params: queryParams, format: 'JSONEachRow', @@ -1395,7 +1427,7 @@ export class ClickHouseEngine extends StorageEngine { // Determine metricType: use param or query DB let metricType: MetricType = params.metricType ?? 'gauge'; if (!params.metricType) { - const typeResult = await client.query({ + const typeResult = await this.runQuery({ query: `SELECT metric_type FROM metrics WHERE metric_name = {p_name:String} AND project_id IN {p_pids:Array(String)} LIMIT 1`, query_params: { p_name: params.metricName, p_pids: pids }, format: 'JSONEachRow', @@ -1426,8 +1458,6 @@ export class ClickHouseEngine extends StorageEngine { params: MetricAggregateParams, start: number, ): Promise { - const client = this.getClient(); - const rollupTable = params.interval === '1d' ? 'metrics_daily_rollup' : 'metrics_hourly_rollup'; @@ -1473,7 +1503,7 @@ export class ClickHouseEngine extends StorageEngine { ORDER BY bucket ASC `; - const result = await client.query({ + const result = await this.runQuery({ query: sql, query_params: queryParams, format: 'JSONEachRow', @@ -1495,7 +1525,6 @@ export class ClickHouseEngine extends StorageEngine { async getMetricNames(params: MetricNamesParams): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 1000; const conditions: string[] = []; @@ -1526,7 +1555,7 @@ export class ClickHouseEngine extends StorageEngine { const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT metric_name, metric_type FROM metrics ${where} GROUP BY metric_name, metric_type ORDER BY metric_name ASC LIMIT ${limit}`, query_params: queryParams, format: 'JSONEachRow', @@ -1544,7 +1573,6 @@ export class ClickHouseEngine extends StorageEngine { async getMetricLabelKeys(params: MetricLabelParams): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 100; const conditions: string[] = []; @@ -1575,7 +1603,7 @@ export class ClickHouseEngine extends StorageEngine { // ClickHouse has no native JSONB keys function, so we sample rows // and extract keys client-side using JSONExtractKeys - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT DISTINCT arrayJoin(JSONExtractKeys(attributes)) AS key FROM metrics ${where} LIMIT ${limit}`, query_params: queryParams, format: 'JSONEachRow', @@ -1590,7 +1618,6 @@ export class ClickHouseEngine extends StorageEngine { async getMetricLabelValues(params: MetricLabelParams, labelKey: string): Promise { const start = Date.now(); - const client = this.getClient(); const limit = params.limit ?? 100; const conditions: string[] = []; @@ -1619,7 +1646,7 @@ export class ClickHouseEngine extends StorageEngine { const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; - const resultSet = await client.query({ + const resultSet = await this.runQuery({ query: `SELECT DISTINCT JSONExtractString(attributes, {p_label_key:String}) AS val FROM metrics ${where} HAVING val != '' LIMIT ${limit}`, query_params: { ...queryParams, p_label_key: labelKey }, format: 'JSONEachRow', @@ -1634,7 +1661,6 @@ export class ClickHouseEngine extends StorageEngine { async deleteMetricsByTimeRange(params: DeleteMetricsByTimeRangeParams): Promise { const start = Date.now(); - const client = this.getClient(); const pids = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; const conditions = [ @@ -1660,7 +1686,7 @@ export class ClickHouseEngine extends StorageEngine { } // Delete from metrics table (async mutation) - await client.command({ + await this.runCommand({ query: `ALTER TABLE metrics DELETE WHERE ${conditions.join(' AND ')}`, query_params: queryParams, }); @@ -1671,7 +1697,7 @@ export class ClickHouseEngine extends StorageEngine { `time >= {p_from:DateTime64(3)}`, `time <= {p_to:DateTime64(3)}`, ]; - await client.command({ + await this.runCommand({ query: `ALTER TABLE metric_exemplars DELETE WHERE ${exemplarConditions.join(' AND ')}`, query_params: { p_pids: pids, @@ -1685,7 +1711,6 @@ export class ClickHouseEngine extends StorageEngine { async getMetricsOverview(params: MetricsOverviewParams): Promise { const start = Date.now(); - const client = this.getClient(); const projectIds = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; const queryParams: Record = { p_pids: projectIds, @@ -1717,7 +1742,7 @@ export class ClickHouseEngine extends StorageEngine { ORDER BY service_name, metric_name `; - const result = await client.query({ + const result = await this.runQuery({ query: sql, query_params: queryParams, format: 'JSONEachRow', diff --git a/packages/reservoir/src/engines/clickhouse/context-comment.test.ts b/packages/reservoir/src/engines/clickhouse/context-comment.test.ts new file mode 100644 index 00000000..9c0dfd8d --- /dev/null +++ b/packages/reservoir/src/engines/clickhouse/context-comment.test.ts @@ -0,0 +1,35 @@ +import { describe, it, expect, vi } from 'vitest'; +import { withContext } from '@logtide/shared/context'; +import { ClickHouseEngine } from './clickhouse-engine.js'; + +describe('ClickHouseEngine context propagation', () => { + it('sets query_id and prepends SQL comment when context is set', async () => { + const seen: any[] = []; + const fakeClient = { + command: vi.fn(async (args: any) => { + seen.push({ kind: 'command', ...args }); + return {}; + }), + query: vi.fn(async (args: any) => { + seen.push({ kind: 'query', ...args }); + return { json: async () => ({ data: [] }) }; + }), + insert: vi.fn(async () => ({})), + ping: vi.fn(async () => ({ success: true })), + close: vi.fn(), + } as any; + + const engine = new ClickHouseEngine( + { host: 'http://x', port: 0, database: 'x', user: 'x', password: 'x' } as any, + { client: fakeClient, skipInitialize: true } + ); + + await withContext({ requestId: 'req-cc', organizationId: 'org-cc' }, async () => { + await engine.healthCheck(); + }); + + const allArgs = seen.flatMap((s) => [s.query, s.query_id].filter(Boolean)); + expect(allArgs.some((s) => typeof s === 'string' && s.startsWith('/* req=req-cc'))).toBe(true); + expect(seen.some((s) => typeof s.query_id === 'string' && s.query_id.startsWith('req-cc'))).toBe(true); + }); +}); diff --git a/packages/reservoir/src/engines/mongodb/context-comment.test.ts b/packages/reservoir/src/engines/mongodb/context-comment.test.ts new file mode 100644 index 00000000..e979457d --- /dev/null +++ b/packages/reservoir/src/engines/mongodb/context-comment.test.ts @@ -0,0 +1,53 @@ +import { describe, it, expect, vi } from 'vitest'; +import { withContext } from '@logtide/shared/context'; +import { MongoDBEngine } from './mongodb-engine.js'; + +describe('MongoDBEngine context propagation', () => { + it('passes a $comment option when context is set', async () => { + const callsWithOptions: any[] = []; + const cursor = { + toArray: async () => [], + project: () => cursor, + sort: () => cursor, + skip: () => cursor, + limit: () => cursor, + }; + const collection = { + find: vi.fn((_filter: any, options?: any) => { + callsWithOptions.push({ kind: 'find', options }); + return cursor; + }), + countDocuments: vi.fn((_filter: any, options?: any) => { + callsWithOptions.push({ kind: 'countDocuments', options }); + return Promise.resolve(0); + }), + aggregate: vi.fn((_pipeline: any, options?: any) => { + callsWithOptions.push({ kind: 'aggregate', options }); + return cursor; + }), + findOne: vi.fn((_filter: any, options?: any) => { + callsWithOptions.push({ kind: 'findOne', options }); + return Promise.resolve(null); + }), + insertMany: vi.fn(() => Promise.resolve({ insertedCount: 0, insertedIds: {} })), + deleteMany: vi.fn(() => Promise.resolve({ deletedCount: 0 })), + } as any; + const db = { + collection: () => collection, + command: vi.fn(() => Promise.resolve({ ok: 1 })), + } as any; + + const engine = new MongoDBEngine( + { uri: 'mongodb://x' } as any, + { db, skipInitialize: true } as any + ); + + await withContext({ requestId: 'req-mm', organizationId: 'org-mm' }, async () => { + // any read-side method works; pick countDocuments + await (engine as any).count({ projectId: 'p1', filters: [] }).catch(() => {}); + }); + + const opts = callsWithOptions.find((c) => c.options?.comment)?.options; + expect(opts?.comment).toMatch(/req=req-mm/); + }); +}); diff --git a/packages/reservoir/src/engines/mongodb/mongodb-engine.ts b/packages/reservoir/src/engines/mongodb/mongodb-engine.ts index baebabf0..bb89ca5c 100644 --- a/packages/reservoir/src/engines/mongodb/mongodb-engine.ts +++ b/packages/reservoir/src/engines/mongodb/mongodb-engine.ts @@ -1,5 +1,6 @@ import { MongoClient, type Db, type Collection, type Document, MongoBulkWriteError, type WriteError } from 'mongodb'; import { randomUUID } from 'crypto'; +import { currentOrNull } from '@logtide/shared/context'; import { StorageEngine } from '../../core/storage-engine.js'; import type { LogRecord, @@ -71,6 +72,8 @@ export interface MongoDBEngineOptions { skipInitialize?: boolean; /** Force time-series collections on/off; undefined = auto-detect */ useTimeSeries?: boolean; + /** Use an existing Db instance (test/injection) */ + db?: Db; } /** Interval to $dateTrunc unit/binSize for MongoDB 5.0+ */ @@ -102,6 +105,31 @@ const PERCENTILE_FRACTIONS: Record = { p99: 0.99, }; +// ============================================================================= +// Context comment helpers +// ============================================================================= + +const MONGO_SAFE_RE = /[^a-zA-Z0-9_:-]/g; +function mongoSafe(v: string | null | undefined): string { + if (!v) return '-'; + const c = v.replace(MONGO_SAFE_RE, ''); + return c.length > 0 ? c : '-'; +} +function mongoCommentValue(): string | undefined { + if (process.env.LOGTIDE_CONTEXT_SQL_COMMENT === 'false') return undefined; + const ctx = currentOrNull(); + if (!ctx) return undefined; + return `req=${mongoSafe(ctx.requestId)} origin=${mongoSafe(ctx.origin)} org=${mongoSafe( + ctx.organizationId + )} actor=${mongoSafe(ctx.actor.type)}:${mongoSafe(ctx.actor.id)}`; +} + +/** Spread into options to inject the $comment field if a context is active. */ +function ctxOpts(): { comment?: string } { + const c = mongoCommentValue(); + return c ? { comment: c } : {}; +} + export class MongoDBEngine extends StorageEngine { private mongoClient: MongoClient | null = null; private ownsClient: boolean; @@ -267,7 +295,7 @@ export class MongoDBEngine extends StorageEngine { try { const docs = logs.map((log) => toMongoLogDoc(log, log.id ?? randomUUID())); - const result = await col.insertMany(docs, { ordered: false }); + const result = await col.insertMany(docs, { ...ctxOpts(), ordered: false }); return { ingested: result.insertedCount, failed: logs.length - result.insertedCount, durationMs: Date.now() - start }; } catch (err) { if (err instanceof MongoBulkWriteError) { @@ -297,7 +325,7 @@ export class MongoDBEngine extends StorageEngine { try { const docs = logsWithIds.map((log) => toMongoLogDoc(log, log.id)); - await col.insertMany(docs, { ordered: false }); + await col.insertMany(docs, { ...ctxOpts(), ordered: false }); const rows: StoredLogRecord[] = logsWithIds.map((log) => ({ id: log.id, @@ -343,7 +371,7 @@ export class MongoDBEngine extends StorageEngine { const { limit, offset, sort } = native.metadata as { limit: number; offset: number; sort: Document }; const docs = await col - .find(filter, { projection: { _id: 0 } }) + .find(filter, { ...ctxOpts(), projection: { _id: 0 } }) .sort(sort) .skip(offset) .limit(limit + 1) @@ -390,7 +418,7 @@ export class MongoDBEngine extends StorageEngine { { $sort: { '_id.bucket': 1 } }, ]; - const rows = await col.aggregate(pipeline, { allowDiskUse: true }).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts(), allowDiskUse: true }).toArray(); const bucketMap = new Map(); for (const row of rows) { @@ -418,7 +446,7 @@ export class MongoDBEngine extends StorageEngine { const col = this.logsCol(); const doc = await col.findOne( { id: params.id, project_id: params.projectId }, - { projection: { _id: 0 } }, + { ...ctxOpts(), projection: { _id: 0 } }, ); return doc ? mapDocToStoredLogRecord(doc) : null; } @@ -429,7 +457,7 @@ export class MongoDBEngine extends StorageEngine { const docs = await col .find( { id: { $in: params.ids }, project_id: params.projectId }, - { projection: { _id: 0 } }, + { ...ctxOpts(), projection: { _id: 0 } }, ) .sort({ time: -1 }) .toArray(); @@ -441,7 +469,7 @@ export class MongoDBEngine extends StorageEngine { const col = this.logsCol(); const native = this.translator.translateCount(params); const filter = native.query as Document; - const count = await col.countDocuments(filter); + const count = await col.countDocuments(filter, { ...ctxOpts() }); return { count, executionTimeMs: Date.now() - start }; } @@ -457,7 +485,7 @@ export class MongoDBEngine extends StorageEngine { // is usually the only way because estimatedDocumentCount doesn't take a filter. // However, we can add a timeout to prevent it from hanging on massive datasets. try { - const count = await col.countDocuments(filter, { maxTimeMS: 2000 }); + const count = await col.countDocuments(filter, { ...ctxOpts(), maxTimeMS: 2000 }); return { count, executionTimeMs: Date.now() - start }; } catch (err) { // If it times out, return a large safe fallback or try explain() @@ -484,7 +512,7 @@ export class MongoDBEngine extends StorageEngine { { $project: { value: '$_id', _id: 0 } }, ]; - const rows = await col.aggregate(pipeline).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts() }).toArray(); return { values: rows.map((r) => String(r.value)).filter((v) => v !== '' && v !== 'null'), executionTimeMs: Date.now() - start, @@ -508,7 +536,7 @@ export class MongoDBEngine extends StorageEngine { { $project: { value: '$_id', count: 1, _id: 0 } }, ]; - const rows = await col.aggregate(pipeline).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts() }).toArray(); return { values: rows .filter((r) => r.value != null && String(r.value) !== '') @@ -522,7 +550,7 @@ export class MongoDBEngine extends StorageEngine { const col = this.logsCol(); const native = this.translator.translateDelete(params); const filter = native.query as Document; - const result = await col.deleteMany(filter); + const result = await col.deleteMany(filter, { ...ctxOpts() }); return { deleted: result.deletedCount, executionTimeMs: Date.now() - start }; } @@ -538,7 +566,7 @@ export class MongoDBEngine extends StorageEngine { try { const docs = spans.map(toMongoSpanDoc); - const result = await col.insertMany(docs, { ordered: false }); + const result = await col.insertMany(docs, { ...ctxOpts(), ordered: false }); return { ingested: result.insertedCount, failed: spans.length - result.insertedCount, durationMs: Date.now() - start }; } catch (err) { if (err instanceof MongoBulkWriteError) { @@ -610,12 +638,12 @@ export class MongoDBEngine extends StorageEngine { const sortOrder = params.sortOrder === 'desc' ? -1 : 1; const [docs, total] = await Promise.all([ - col.find(filter, { projection: { _id: 0 } }) + col.find(filter, { ...ctxOpts(), projection: { _id: 0 } }) .sort({ [sortBy]: sortOrder }) .skip(offset) .limit(limit) .toArray(), - col.countDocuments(filter), + col.countDocuments(filter, { ...ctxOpts() }), ]); return { @@ -631,7 +659,7 @@ export class MongoDBEngine extends StorageEngine { async getSpansByTraceId(traceId: string, projectId: string): Promise { const col = this.spansCol(); const docs = await col - .find({ trace_id: traceId, project_id: projectId }, { projection: { _id: 0 } }) + .find({ trace_id: traceId, project_id: projectId }, { ...ctxOpts(), projection: { _id: 0 } }) .sort({ start_time: 1 }) .toArray(); return docs.map(mapDocToSpanRecord); @@ -646,12 +674,12 @@ export class MongoDBEngine extends StorageEngine { const filter = this.buildTraceFilter(params); const [docs, total] = await Promise.all([ - col.find(filter, { projection: { _id: 0 } }) + col.find(filter, { ...ctxOpts(), projection: { _id: 0 } }) .sort({ start_time: -1 }) .skip(offset) .limit(limit) .toArray(), - col.countDocuments(filter), + col.countDocuments(filter, { ...ctxOpts() }), ]); return { @@ -668,7 +696,7 @@ export class MongoDBEngine extends StorageEngine { const col = this.tracesCol(); const doc = await col.findOne( { trace_id: traceId, project_id: projectId }, - { projection: { _id: 0 } }, + { ...ctxOpts(), projection: { _id: 0 } }, ); return doc ? mapDocToTraceRecord(doc) : null; } @@ -695,6 +723,7 @@ export class MongoDBEngine extends StorageEngine { // Fetch all spans with parent references - minimal projection const allSpans = await col .find(filter, { + ...ctxOpts(), projection: { span_id: 1, parent_span_id: 1, service_name: 1, trace_id: 1, _id: 0 }, }) .toArray(); @@ -708,7 +737,7 @@ export class MongoDBEngine extends StorageEngine { const parentSpans = await col .find( { project_id: projectId, span_id: { $in: parentSpanIds } }, - { projection: { span_id: 1, service_name: 1, _id: 0 } }, + { ...ctxOpts(), projection: { span_id: 1, service_name: 1, _id: 0 } }, ) .toArray(); @@ -761,7 +790,7 @@ export class MongoDBEngine extends StorageEngine { filter.service_name = { $in: svc }; } - const result = await col.deleteMany(filter); + const result = await col.deleteMany(filter, { ...ctxOpts() }); return { deleted: result.deletedCount, executionTimeMs: Date.now() - start }; } @@ -818,10 +847,10 @@ export class MongoDBEngine extends StorageEngine { } const insertOps: Promise[] = [ - db.collection('metrics').insertMany(metricDocs, { ordered: false }), + db.collection('metrics').insertMany(metricDocs, { ...ctxOpts(), ordered: false }), ]; if (exemplarDocs.length > 0) { - insertOps.push(db.collection('metric_exemplars').insertMany(exemplarDocs, { ordered: false })); + insertOps.push(db.collection('metric_exemplars').insertMany(exemplarDocs, { ...ctxOpts(), ordered: false })); } await Promise.all(insertOps); @@ -847,12 +876,12 @@ export class MongoDBEngine extends StorageEngine { const sortOrder = params.sortOrder === 'asc' ? 1 : -1; const [docs, total] = await Promise.all([ - col.find(filter, { projection: { _id: 0 } }) + col.find(filter, { ...ctxOpts(), projection: { _id: 0 } }) .sort({ time: sortOrder }) .skip(offset) .limit(limit) .toArray(), - col.countDocuments(filter), + col.countDocuments(filter, { ...ctxOpts() }), ]); let metricsResult = docs.map(mapDocToStoredMetricRecord); @@ -863,7 +892,7 @@ export class MongoDBEngine extends StorageEngine { if (metricIds.length > 0) { const exemplarDocs = await db .collection('metric_exemplars') - .find({ metric_id: { $in: metricIds } }, { projection: { _id: 0 } }) + .find({ metric_id: { $in: metricIds } }, { ...ctxOpts(), projection: { _id: 0 } }) .sort({ time: 1 }) .toArray(); @@ -955,7 +984,7 @@ export class MongoDBEngine extends StorageEngine { pipeline.push({ $sort: { '_id.bucket': 1 } }); - const rows = await col.aggregate(pipeline, { allowDiskUse: true }).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts(), allowDiskUse: true }).toArray(); const timeseries = rows.map((row) => { const bucket: { bucket: Date; value: number; labels?: Record } = { @@ -980,7 +1009,7 @@ export class MongoDBEngine extends StorageEngine { const pids = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; const sample = await col.findOne( { metric_name: params.metricName, project_id: { $in: pids } }, - { projection: { metric_type: 1, _id: 0 } }, + { ...ctxOpts(), projection: { metric_type: 1, _id: 0 } }, ); if (sample?.metric_type) { metricType = sample.metric_type as MetricType; @@ -1011,7 +1040,7 @@ export class MongoDBEngine extends StorageEngine { { $project: { name: '$_id.name', type: '$_id.type', _id: 0 } }, ]; - const rows = await col.aggregate(pipeline).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts() }).toArray(); return { names: rows.map((row) => ({ @@ -1039,7 +1068,7 @@ export class MongoDBEngine extends StorageEngine { { $limit: limit }, ]; - const rows = await col.aggregate(pipeline).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts() }).toArray(); return { keys: rows.map((r) => String(r._id)), @@ -1063,7 +1092,7 @@ export class MongoDBEngine extends StorageEngine { { $limit: limit }, ]; - const rows = await col.aggregate(pipeline).toArray(); + const rows = await col.aggregate(pipeline, { ...ctxOpts() }).toArray(); return { values: rows.map((r) => String(r._id)), @@ -1091,13 +1120,13 @@ export class MongoDBEngine extends StorageEngine { } // Delete metrics - const result = await db.collection('metrics').deleteMany(filter); + const result = await db.collection('metrics').deleteMany(filter, { ...ctxOpts() }); // Delete exemplars for same time range + project await db.collection('metric_exemplars').deleteMany({ project_id: { $in: pids }, time: { $gte: params.from, $lte: params.to }, - }); + }, { ...ctxOpts() }); return { deleted: result.deletedCount, executionTimeMs: Date.now() - start }; } @@ -1132,7 +1161,7 @@ export class MongoDBEngine extends StorageEngine { { $sort: { '_id.service_name': 1 as const, '_id.metric_name': 1 as const } }, ]; - const cursor = db.collection('metrics').aggregate(pipeline, { allowDiskUse: true }); + const cursor = db.collection('metrics').aggregate(pipeline, { ...ctxOpts(), allowDiskUse: true }); const docs = await cursor.toArray(); const serviceMap = new Map(); @@ -1166,6 +1195,7 @@ export class MongoDBEngine extends StorageEngine { // ========================================================================= private getDb(): Db { + if (this.options?.db) return this.options.db; if (!this.mongoClient) { throw new Error('Not connected. Call connect() first.'); } diff --git a/packages/reservoir/src/engines/timescale/context-comment.test.ts b/packages/reservoir/src/engines/timescale/context-comment.test.ts new file mode 100644 index 00000000..1d84d22f --- /dev/null +++ b/packages/reservoir/src/engines/timescale/context-comment.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, vi } from 'vitest'; +import { withContext } from '@logtide/shared/context'; +import { TimescaleEngine } from './timescale-engine.js'; + +describe('TimescaleEngine context comment', () => { + it('prepends /* req=... */ to SQL when context is set', async () => { + const captured: string[] = []; + const fakePool = { + query: vi.fn(async (sql: string) => { + captured.push(sql); + return { rows: [{ now: new Date() }], rowCount: 1 }; + }), + end: vi.fn(), + } as unknown as import('pg').Pool; + + const engine = new TimescaleEngine( + { host: 'x', port: 0, database: 'x', user: 'x', password: 'x', schema: 'public' } as any, + { pool: fakePool, skipInitialize: true } + ); + + await withContext({ requestId: 'req-99', organizationId: 'org-z' }, async () => { + await engine.healthCheck(); + }); + + expect(captured.some((s) => s.startsWith('/* req=req-99'))).toBe(true); + }); + + it('passes SQL unchanged when no context', async () => { + const captured: string[] = []; + const fakePool = { + query: vi.fn(async (sql: string) => { + captured.push(sql); + return { rows: [{ now: new Date() }], rowCount: 1 }; + }), + end: vi.fn(), + } as unknown as import('pg').Pool; + + const engine = new TimescaleEngine( + { host: 'x', port: 0, database: 'x', user: 'x', password: 'x', schema: 'public' } as any, + { pool: fakePool, skipInitialize: true } + ); + + await engine.healthCheck(); + expect(captured.every((s) => !s.startsWith('/* req='))).toBe(true); + }); +}); diff --git a/packages/reservoir/src/engines/timescale/timescale-engine.ts b/packages/reservoir/src/engines/timescale/timescale-engine.ts index a9298eb4..5c72b875 100644 --- a/packages/reservoir/src/engines/timescale/timescale-engine.ts +++ b/packages/reservoir/src/engines/timescale/timescale-engine.ts @@ -1,4 +1,5 @@ import pg from 'pg'; +import { currentOrNull } from '@logtide/shared/context'; import { StorageEngine } from '../../core/storage-engine.js'; import type { LogRecord, @@ -75,6 +76,21 @@ const METRIC_INTERVAL_MAP: Record = { '1w': '1 week', }; +const SAFE_RE = /[^a-zA-Z0-9_:-]/g; +function safe(v: string | null | undefined): string { + if (!v) return '-'; + const c = v.replace(SAFE_RE, ''); + return c.length > 0 ? c : '-'; +} +function ctxComment(): string { + if (process.env.LOGTIDE_CONTEXT_SQL_COMMENT === 'false') return ''; + const ctx = currentOrNull(); + if (!ctx) return ''; + return `/* req=${safe(ctx.requestId)} origin=${safe(ctx.origin)} org=${safe( + ctx.organizationId + )} actor=${safe(ctx.actor.type)}:${safe(ctx.actor.id)} */ `; +} + export interface TimescaleEngineOptions { /** Use an existing pg.Pool instead of creating a new one */ pool?: pg.Pool; @@ -141,7 +157,7 @@ export class TimescaleEngine extends StorageEngine { async healthCheck(): Promise { const start = Date.now(); try { - await this.getPool().query('SELECT 1'); + await this.runQuery('SELECT 1'); const responseTimeMs = Date.now() - start; let status: HealthStatus['status'] = 'healthy'; if (responseTimeMs >= 200) status = 'unhealthy'; @@ -223,11 +239,10 @@ export class TimescaleEngine extends StorageEngine { } const start = Date.now(); - const pool = this.getPool(); const { query, values } = this.buildInsertQuery(logs); try { - await pool.query(query, values); + await this.runQuery(query, values); return { ingested: logs.length, failed: 0, durationMs: Date.now() - start }; } catch (err) { return { @@ -245,11 +260,10 @@ export class TimescaleEngine extends StorageEngine { } const start = Date.now(); - const pool = this.getPool(); const { query, values } = this.buildInsertQuery(logs, true); try { - const result = await pool.query(query, values); + const result = await this.runQuery(query, values); const rows = result.rows.map(mapRowToStoredLogRecord); return { ingested: logs.length, failed: 0, durationMs: Date.now() - start, rows }; } catch (err) { @@ -265,12 +279,11 @@ export class TimescaleEngine extends StorageEngine { async query(params: QueryParams): Promise> { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateQuery(params); const limit = (native.metadata?.limit as number) ?? 50; const offset = params.offset ?? 0; - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); const hasMore = result.rows.length > limit; const rows = hasMore ? result.rows.slice(0, limit) : result.rows; @@ -296,10 +309,9 @@ export class TimescaleEngine extends StorageEngine { async aggregate(params: AggregateParams): Promise { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateAggregate(params); - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); const bucketMap = new Map(); @@ -328,8 +340,7 @@ export class TimescaleEngine extends StorageEngine { } async getById(params: GetByIdParams): Promise { - const pool = this.getPool(); - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${this.schema}.${this.tableName} WHERE id = $1 AND project_id = $2 LIMIT 1`, [params.id, params.projectId], ); @@ -338,8 +349,7 @@ export class TimescaleEngine extends StorageEngine { async getByIds(params: GetByIdsParams): Promise { if (params.ids.length === 0) return []; - const pool = this.getPool(); - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${this.schema}.${this.tableName} WHERE id = ANY($1::uuid[]) AND project_id = $2 ORDER BY time DESC`, [params.ids, params.projectId], ); @@ -348,9 +358,8 @@ export class TimescaleEngine extends StorageEngine { async count(params: CountParams): Promise { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateCount(params); - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); return { count: Number(result.rows[0]?.count ?? 0), executionTimeMs: Date.now() - start, @@ -359,9 +368,8 @@ export class TimescaleEngine extends StorageEngine { async countEstimate(params: CountParams): Promise { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateCountEstimate(params); - const result = await pool.query( + const result = await this.runQuery( `EXPLAIN (FORMAT JSON) ${native.query}`, native.parameters, ); @@ -375,10 +383,9 @@ export class TimescaleEngine extends StorageEngine { async distinct(params: DistinctParams): Promise { const start = Date.now(); - const pool = this.getPool(); // Skip-Scan Optimization for indexed fields (service, level) - // This provides massive performance gains (100x+) on large datasets by jumping + // This provides massive performance gains (100x+) on large datasets by jumping // through the index instead of scanning all matching rows. // Skip-scan only when no extra filters are present (service/level/filters would require CTE changes) if ( @@ -412,7 +419,7 @@ export class TimescaleEngine extends StorageEngine { `; const limit = params.limit ?? 1000; - const result = await pool.query(query, [projectIds, params.from, params.to, limit]); + const result = await this.runQuery(query, [projectIds, params.from, params.to, limit]); return { values: result.rows.map((row) => row.value as string).filter((v) => v != null && v !== ''), @@ -425,7 +432,7 @@ export class TimescaleEngine extends StorageEngine { } const native = this.translator.translateDistinct(params); - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); return { values: result.rows.map((row: Record) => row.value as string).filter((v) => v != null && v !== ''), executionTimeMs: Date.now() - start, @@ -434,9 +441,8 @@ export class TimescaleEngine extends StorageEngine { async topValues(params: TopValuesParams): Promise { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateTopValues(params); - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); return { values: result.rows.map((row: Record) => ({ value: row.value as string, @@ -448,9 +454,8 @@ export class TimescaleEngine extends StorageEngine { async deleteByTimeRange(params: DeleteByTimeRangeParams): Promise { const start = Date.now(); - const pool = this.getPool(); const native = this.translator.translateDelete(params); - const result = await pool.query(native.query as string, native.parameters); + const result = await this.runQuery(native.query as string, native.parameters); return { deleted: Number(result.rowCount ?? 0), executionTimeMs: Date.now() - start, @@ -483,6 +488,12 @@ export class TimescaleEngine extends StorageEngine { return this.pool; } + private async runQuery(sql: string, params?: unknown[]): Promise { + const pool = this.getPool(); + const final = ctxComment() + sql; + return params ? pool.query(final, params as any[]) : pool.query(final); + } + private buildInsertQuery(logs: LogRecord[], returning = false): { query: string; values: unknown[] } { const s = this.schema; const t = this.tableName; @@ -539,7 +550,6 @@ export class TimescaleEngine extends StorageEngine { if (spans.length === 0) return { ingested: 0, failed: 0, durationMs: 0 }; const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const times: Date[] = []; @@ -583,7 +593,7 @@ export class TimescaleEngine extends StorageEngine { } try { - await pool.query( + await this.runQuery( `INSERT INTO ${s}.spans ( time, span_id, trace_id, parent_span_id, organization_id, project_id, service_name, operation_name, start_time, end_time, duration_ms, @@ -610,17 +620,16 @@ export class TimescaleEngine extends StorageEngine { } async upsertTrace(trace: TraceRecord): Promise { - const pool = this.getPool(); const s = this.schema; - const existing = await pool.query( + const existing = await this.runQuery( `SELECT trace_id, start_time, end_time, span_count, error FROM ${s}.traces WHERE trace_id = $1 AND project_id = $2`, [trace.traceId, trace.projectId], ); if (existing.rows.length === 0) { - await pool.query( + await this.runQuery( `INSERT INTO ${s}.traces ( trace_id, organization_id, project_id, service_name, root_service_name, root_operation_name, start_time, end_time, duration_ms, span_count, error @@ -637,7 +646,7 @@ export class TimescaleEngine extends StorageEngine { const newEnd = trace.endTime > existingEnd ? trace.endTime : existingEnd; const newDuration = newEnd.getTime() - newStart.getTime(); - await pool.query( + await this.runQuery( `UPDATE ${s}.traces SET start_time = $1, end_time = $2, duration_ms = $3, span_count = span_count + $4, error = error OR $5, @@ -653,7 +662,6 @@ export class TimescaleEngine extends StorageEngine { async querySpans(params: SpanQueryParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -703,13 +711,13 @@ export class TimescaleEngine extends StorageEngine { const sortBy = ALLOWED_SORT_COLUMNS.has(params.sortBy ?? '') ? params.sortBy! : 'start_time'; const sortOrder = ALLOWED_SORT_ORDERS.has((params.sortOrder ?? '').toLowerCase()) ? params.sortOrder!.toUpperCase() : 'ASC'; - const countResult = await pool.query( + const countResult = await this.runQuery( `SELECT COUNT(*)::int AS count FROM ${s}.spans ${where}`, values, ); const total = countResult.rows[0]?.count ?? 0; - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${s}.spans ${where} ORDER BY ${sortBy} ${sortOrder} LIMIT $${idx++} OFFSET $${idx++}`, @@ -727,9 +735,8 @@ export class TimescaleEngine extends StorageEngine { } async getSpansByTraceId(traceId: string, projectId: string): Promise { - const pool = this.getPool(); const s = this.schema; - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${s}.spans WHERE trace_id = $1 AND project_id = $2 ORDER BY start_time ASC`, [traceId, projectId], ); @@ -738,7 +745,6 @@ export class TimescaleEngine extends StorageEngine { async queryTraces(params: TraceQueryParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -777,13 +783,13 @@ export class TimescaleEngine extends StorageEngine { const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; - const countResult = await pool.query( + const countResult = await this.runQuery( `SELECT COUNT(*)::int AS count FROM ${s}.traces ${where}`, values, ); const total = countResult.rows[0]?.count ?? 0; - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${s}.traces ${where} ORDER BY start_time DESC LIMIT $${idx++} OFFSET $${idx++}`, @@ -801,9 +807,8 @@ export class TimescaleEngine extends StorageEngine { } async getTraceById(traceId: string, projectId: string): Promise { - const pool = this.getPool(); const s = this.schema; - const result = await pool.query( + const result = await this.runQuery( `SELECT * FROM ${s}.traces WHERE trace_id = $1 AND project_id = $2`, [traceId, projectId], ); @@ -815,7 +820,6 @@ export class TimescaleEngine extends StorageEngine { from?: Date, to?: Date, ): Promise { - const pool = this.getPool(); const s = this.schema; const values: unknown[] = [projectId]; let idx = 2; @@ -830,7 +834,7 @@ export class TimescaleEngine extends StorageEngine { values.push(to); } - const result = await pool.query( + const result = await this.runQuery( `SELECT parent.service_name AS source_service, child.service_name AS target_service, @@ -870,7 +874,6 @@ export class TimescaleEngine extends StorageEngine { async deleteSpansByTimeRange(params: DeleteSpansByTimeRangeParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const pids = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; @@ -884,13 +887,13 @@ export class TimescaleEngine extends StorageEngine { values.push(svc); } - const result = await pool.query( + const result = await this.runQuery( `DELETE FROM ${s}.spans WHERE ${conditions.join(' AND ')}`, values, ); // Also clean up orphaned traces - await pool.query( + await this.runQuery( `DELETE FROM ${s}.traces WHERE project_id = ANY($1) AND NOT EXISTS (SELECT 1 FROM ${s}.spans WHERE spans.trace_id = traces.trace_id AND spans.project_id = traces.project_id)`, [pids], @@ -912,7 +915,6 @@ export class TimescaleEngine extends StorageEngine { } const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const times: Date[] = []; @@ -945,7 +947,7 @@ export class TimescaleEngine extends StorageEngine { const hasExemplarsFlags: boolean[] = metrics.map(m => (m.exemplars?.length ?? 0) > 0); try { - const insertResult = await pool.query( + const insertResult = await this.runQuery( `INSERT INTO ${s}.metrics ( time, organization_id, project_id, metric_name, metric_type, value, is_monotonic, service_name, attributes, resource_attributes, histogram_data, has_exemplars @@ -992,7 +994,7 @@ export class TimescaleEngine extends StorageEngine { } if (exemplarTimes.length > 0) { - await pool.query( + await this.runQuery( `INSERT INTO ${s}.metric_exemplars ( time, metric_id, organization_id, project_id, exemplar_value, exemplar_time, trace_id, span_id, attributes @@ -1019,7 +1021,6 @@ export class TimescaleEngine extends StorageEngine { async queryMetrics(params: MetricQueryParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const limit = params.limit ?? 50; const offset = params.offset ?? 0; @@ -1069,14 +1070,14 @@ export class TimescaleEngine extends StorageEngine { const sortOrder = params.sortOrder ?? 'desc'; // Count total - const countResult = await pool.query( + const countResult = await this.runQuery( `SELECT COUNT(*)::int AS count FROM ${s}.metrics m ${where}`, values, ); const total = countResult.rows[0]?.count ?? 0; // Fetch rows - const dataResult = await pool.query( + const dataResult = await this.runQuery( `SELECT m.* FROM ${s}.metrics m ${where} ORDER BY m.time ${sortOrder} LIMIT $${idx++} OFFSET $${idx++}`, @@ -1088,7 +1089,7 @@ export class TimescaleEngine extends StorageEngine { // Optionally load exemplars if (params.includeExemplars && metricsResult.length > 0) { const metricIds = metricsResult.map((m) => m.id); - const exResult = await pool.query( + const exResult = await this.runQuery( `SELECT * FROM ${s}.metric_exemplars WHERE metric_id = ANY($1::uuid[])`, [metricIds], ); @@ -1133,7 +1134,6 @@ export class TimescaleEngine extends StorageEngine { return this.aggregateMetricsFromRollup(params, start); } - const pool = this.getPool(); const s = this.schema; const intervalSql = METRIC_INTERVAL_MAP[params.interval]; @@ -1232,7 +1232,7 @@ export class TimescaleEngine extends StorageEngine { const groupByCols = ['bucket', ...groupByColumns].join(', '); - const result = await pool.query( + const result = await this.runQuery( `SELECT ${selectCols} FROM ${s}.metrics ${where} @@ -1276,7 +1276,6 @@ export class TimescaleEngine extends StorageEngine { params: MetricAggregateParams, start: number, ): Promise { - const pool = this.getPool(); const s = this.schema; const rollupTable = params.interval === '1d' @@ -1312,12 +1311,12 @@ export class TimescaleEngine extends StorageEngine { ORDER BY bucket ASC `; - const { rows } = await pool.query(sql, placeholders); + const { rows } = await this.runQuery(sql, placeholders); // Resolve metric type let metricType = params.metricType; if (!metricType && rows.length > 0) { - const typeRes = await pool.query( + const typeRes = await this.runQuery( `SELECT metric_type FROM ${s}.${rollupTable} WHERE metric_name = $1 LIMIT 1`, [params.metricName], ); @@ -1337,7 +1336,6 @@ export class TimescaleEngine extends StorageEngine { async getMetricNames(params: MetricNamesParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const conditions: string[] = []; @@ -1373,7 +1371,7 @@ export class TimescaleEngine extends StorageEngine { const limitClause = params.limit ? `LIMIT $${idx++}` : ''; const limitValues = params.limit ? [params.limit] : []; - const result = await pool.query( + const result = await this.runQuery( `SELECT DISTINCT metric_name, metric_type FROM ${s}.metrics ${where} @@ -1393,7 +1391,6 @@ export class TimescaleEngine extends StorageEngine { async getMetricLabelKeys(params: MetricLabelParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const conditions: string[] = []; @@ -1428,7 +1425,7 @@ export class TimescaleEngine extends StorageEngine { const limitClause = params.limit ? `LIMIT $${idx++}` : ''; const limitValues = params.limit ? [params.limit] : []; - const result = await pool.query( + const result = await this.runQuery( `SELECT DISTINCT jsonb_object_keys(attributes) AS key FROM ${s}.metrics ${where} AND attributes IS NOT NULL @@ -1445,7 +1442,6 @@ export class TimescaleEngine extends StorageEngine { async getMetricLabelValues(params: MetricLabelParams, labelKey: string): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const conditions: string[] = []; @@ -1484,7 +1480,7 @@ export class TimescaleEngine extends StorageEngine { const limitClause = params.limit ? `LIMIT $${idx++}` : ''; const limitValues = params.limit ? [params.limit] : []; - const result = await pool.query( + const result = await this.runQuery( `SELECT DISTINCT attributes->>$${idx++} AS value FROM ${s}.metrics ${where} @@ -1503,7 +1499,6 @@ export class TimescaleEngine extends StorageEngine { async deleteMetricsByTimeRange(params: DeleteMetricsByTimeRangeParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const pids = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; @@ -1525,7 +1520,7 @@ export class TimescaleEngine extends StorageEngine { const where = conditions.join(' AND '); // Delete exemplars first (they reference metrics) - await pool.query( + await this.runQuery( `DELETE FROM ${s}.metric_exemplars WHERE metric_id IN ( SELECT id FROM ${s}.metrics WHERE ${where} )`, @@ -1533,7 +1528,7 @@ export class TimescaleEngine extends StorageEngine { ); // Delete metrics - const result = await pool.query( + const result = await this.runQuery( `DELETE FROM ${s}.metrics WHERE ${where}`, values, ); @@ -1546,7 +1541,6 @@ export class TimescaleEngine extends StorageEngine { async getMetricsOverview(params: MetricsOverviewParams): Promise { const start = Date.now(); - const pool = this.getPool(); const s = this.schema; const projectIds = Array.isArray(params.projectId) ? params.projectId : [params.projectId]; const placeholders: unknown[] = [params.from, params.to, projectIds]; @@ -1577,7 +1571,7 @@ export class TimescaleEngine extends StorageEngine { GROUP BY metric_name, metric_type, service_name ORDER BY service_name, metric_name `; - const result = await pool.query(sql, placeholders); + const result = await this.runQuery(sql, placeholders); rows = result.rows; } catch { // Fallback to raw metrics table @@ -1595,7 +1589,7 @@ export class TimescaleEngine extends StorageEngine { GROUP BY metric_name, metric_type, service_name ORDER BY service_name, metric_name `; - const result = await pool.query(sql, placeholders); + const result = await this.runQuery(sql, placeholders); rows = result.rows; } @@ -1617,7 +1611,7 @@ export class TimescaleEngine extends StorageEngine { ${latestServiceFilter} ORDER BY metric_name, service_name, time DESC `; - const { rows: latestRows } = await pool.query(latestSql, latestPlaceholders); + const { rows: latestRows } = await this.runQuery(latestSql, latestPlaceholders); latestMap = new Map( latestRows.map((r: Record) => [ `${r.metric_name}:${r.service_name}`, diff --git a/packages/shared/package.json b/packages/shared/package.json index 46c7b16c..85a0c401 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -10,6 +10,10 @@ ".": { "types": "./dist/index.d.ts", "import": "./dist/index.js" + }, + "./context": { + "types": "./dist/context/index.d.ts", + "import": "./dist/context/index.js" } }, "scripts": { diff --git a/packages/shared/src/context/fetch.test.ts b/packages/shared/src/context/fetch.test.ts new file mode 100644 index 00000000..19895414 --- /dev/null +++ b/packages/shared/src/context/fetch.test.ts @@ -0,0 +1,45 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { fetchWithContext } from './fetch.js'; +import { withContext } from './test-helpers.js'; + +describe('fetchWithContext', () => { + let originalFetch: typeof globalThis.fetch; + let lastInit: RequestInit | undefined; + + beforeEach(() => { + originalFetch = globalThis.fetch; + globalThis.fetch = vi.fn(async (_input, init) => { + lastInit = init; + return new Response('ok'); + }) as unknown as typeof fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + lastInit = undefined; + }); + + it('passes through unchanged when no context', async () => { + await fetchWithContext('https://x.test', { method: 'POST' }); + const headers = new Headers(lastInit!.headers); + expect(headers.get('X-Logtide-Request-Id')).toBeNull(); + }); + + it('injects X-Logtide-Request-Id when context is set', async () => { + await withContext({ requestId: 'req-42' }, async () => { + await fetchWithContext('https://x.test'); + }); + const headers = new Headers(lastInit!.headers); + expect(headers.get('X-Logtide-Request-Id')).toBe('req-42'); + }); + + it('does not overwrite an explicit header from caller', async () => { + await withContext({ requestId: 'req-42' }, async () => { + await fetchWithContext('https://x.test', { + headers: { 'X-Logtide-Request-Id': 'override' }, + }); + }); + const headers = new Headers(lastInit!.headers); + expect(headers.get('X-Logtide-Request-Id')).toBe('override'); + }); +}); diff --git a/packages/shared/src/context/fetch.ts b/packages/shared/src/context/fetch.ts new file mode 100644 index 00000000..0af7848b --- /dev/null +++ b/packages/shared/src/context/fetch.ts @@ -0,0 +1,15 @@ +import { currentOrNull } from './storage.js'; + +export async function fetchWithContext( + input: string | URL, + init?: RequestInit +): Promise { + const ctx = currentOrNull(); + if (!ctx) return fetch(input, init); + + const headers = new Headers(init?.headers); + if (!headers.has('X-Logtide-Request-Id')) { + headers.set('X-Logtide-Request-Id', ctx.requestId); + } + return fetch(input, { ...init, headers }); +} diff --git a/packages/shared/src/context/index.ts b/packages/shared/src/context/index.ts new file mode 100644 index 00000000..301a7073 --- /dev/null +++ b/packages/shared/src/context/index.ts @@ -0,0 +1,31 @@ +export * from './types.js'; +export { + contextStorage, + run, + enterWith, + current, + currentOrNull, + withPatch as with_, + runAsSystem, +} from './storage.js'; +export { serializeContext, deserializeContext } from './serialize.js'; +export { fetchWithContext } from './fetch.js'; +export { withContext } from './test-helpers.js'; + +import * as storage from './storage.js'; +import { serializeContext, deserializeContext } from './serialize.js'; + +/** + * Convenience namespace mirroring the design spec API. + * Use `context.current()`, `context.run()`, etc. + */ +export const context = { + run: storage.run, + enterWith: storage.enterWith, + current: storage.current, + currentOrNull: storage.currentOrNull, + with: storage.withPatch, + runAsSystem: storage.runAsSystem, + serialize: serializeContext, + deserialize: deserializeContext, +} as const; diff --git a/packages/shared/src/context/serialize.test.ts b/packages/shared/src/context/serialize.test.ts new file mode 100644 index 00000000..bad11c5f --- /dev/null +++ b/packages/shared/src/context/serialize.test.ts @@ -0,0 +1,43 @@ +import { describe, it, expect } from 'vitest'; +import { serializeContext, deserializeContext } from './serialize.js'; +import type { RequestContext } from './types.js'; + +const httpCtx: RequestContext = { + requestId: 'req-abc', + origin: 'http', + actor: { type: 'user', id: 'u1', email: 'a@b.test' }, + organizationId: 'org-1', + projectId: 'proj-1', + ip: '10.0.0.1', + userAgent: 'curl/8', +}; + +describe('serializeContext', () => { + it('round-trips an http context, flipping origin to job', () => { + const ser = serializeContext(httpCtx); + expect(ser.v).toBe(1); + expect(ser.requestId).toBe('req-abc'); + + const back = deserializeContext(ser); + expect(back).toMatchObject({ + requestId: 'req-abc', + origin: 'job', + actor: httpCtx.actor, + organizationId: 'org-1', + projectId: 'proj-1', + }); + }); + + it('returns undefined for unknown version (graceful fallback)', () => { + expect(deserializeContext({ v: 99, requestId: 'x' } as any)).toBeUndefined(); + }); + + it('returns undefined for null/undefined input', () => { + expect(deserializeContext(null as any)).toBeUndefined(); + expect(deserializeContext(undefined as any)).toBeUndefined(); + }); + + it('returns undefined for malformed payload (zod fails)', () => { + expect(deserializeContext({ v: 1, foo: 'bar' } as any)).toBeUndefined(); + }); +}); diff --git a/packages/shared/src/context/serialize.ts b/packages/shared/src/context/serialize.ts new file mode 100644 index 00000000..aac0be31 --- /dev/null +++ b/packages/shared/src/context/serialize.ts @@ -0,0 +1,60 @@ +import { z } from 'zod'; +import type { RequestContext, SerializedContext } from './types.js'; +import { SERIALIZED_CONTEXT_VERSION } from './types.js'; + +const ActorSchema = z.object({ + type: z.enum(['user', 'apiKey', 'system']), + id: z.string().nullable(), + email: z.string().optional(), + apiKeyType: z.enum(['write', 'full']).optional(), +}); + +const SerializedContextSchema = z.object({ + v: z.literal(1), + requestId: z.string().min(1), + origin: z.enum(['http', 'job', 'system']), + actor: ActorSchema, + organizationId: z.string().nullable(), + projectId: z.string().nullable(), + ip: z.string().optional(), + userAgent: z.string().optional(), + systemReason: z.string().optional(), +}); + +export function serializeContext(ctx: RequestContext): SerializedContext { + return { + v: SERIALIZED_CONTEXT_VERSION, + requestId: ctx.requestId, + origin: ctx.origin, + actor: ctx.actor, + organizationId: ctx.organizationId, + projectId: ctx.projectId, + ip: ctx.ip, + userAgent: ctx.userAgent, + systemReason: ctx.systemReason, + }; +} + +/** + * Returns undefined on any error: unknown version, malformed payload, etc. + * Caller must fall back to runAsSystem in that case (rolling-deploy safe). + */ +export function deserializeContext(input: unknown): RequestContext | undefined { + if (input == null || typeof input !== 'object') return undefined; + if ((input as { v?: number }).v !== SERIALIZED_CONTEXT_VERSION) return undefined; + + const parsed = SerializedContextSchema.safeParse(input); + if (!parsed.success) return undefined; + + const data = parsed.data; + return { + requestId: data.requestId, + origin: 'job', // jobs always run with origin=job, regardless of producer origin + actor: data.actor, + organizationId: data.organizationId, + projectId: data.projectId, + ip: data.ip, + userAgent: data.userAgent, + systemReason: data.systemReason, + }; +} diff --git a/packages/shared/src/context/storage.test.ts b/packages/shared/src/context/storage.test.ts new file mode 100644 index 00000000..a31d624e --- /dev/null +++ b/packages/shared/src/context/storage.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from 'vitest'; +import { + contextStorage, + run, + enterWith, + current, + currentOrNull, + withPatch, + runAsSystem, +} from './storage.js'; +import type { RequestContext } from './types.js'; + +const baseCtx: RequestContext = { + requestId: 'req-1', + origin: 'http', + actor: { type: 'user', id: 'u1', email: 'a@b.test' }, + organizationId: 'org-1', + projectId: 'p-1', +}; + +describe('context.current', () => { + it('throws when no context is established', () => { + expect(() => current()).toThrow(/RequestContext not established/); + }); + + it('returns the current context inside run', async () => { + await run(baseCtx, async () => { + expect(current()).toEqual(baseCtx); + }); + }); + + it('isolates concurrent runs', async () => { + const a = { ...baseCtx, requestId: 'A' }; + const b = { ...baseCtx, requestId: 'B' }; + await Promise.all([ + run(a, async () => { + await new Promise((r) => setTimeout(r, 5)); + expect(current().requestId).toBe('A'); + }), + run(b, async () => { + await new Promise((r) => setTimeout(r, 1)); + expect(current().requestId).toBe('B'); + }), + ]); + }); +}); + +describe('context.currentOrNull', () => { + it('returns undefined when no context', () => { + expect(currentOrNull()).toBeUndefined(); + }); + + it('returns context inside run', async () => { + await run(baseCtx, async () => { + expect(currentOrNull()).toEqual(baseCtx); + }); + }); +}); + +describe('context.enterWith', () => { + it('persists for the rest of the async chain', async () => { + await contextStorage.run(undefined as unknown as RequestContext, async () => { + // simulate Fastify hook: enterWith inside an outer scope + enterWith(baseCtx); + await new Promise((r) => setTimeout(r, 1)); + expect(current()).toEqual(baseCtx); + }); + }); +}); + +describe('context.with', () => { + it('preserves requestId by default', async () => { + await run(baseCtx, async () => { + await withPatch({ projectId: 'p-2' }, async () => { + expect(current().projectId).toBe('p-2'); + expect(current().requestId).toBe('req-1'); + }); + }); + }); + + it('throws if called outside a context', async () => { + await expect(withPatch({}, async () => {})).rejects.toThrow(/RequestContext not established/); + }); +}); + +describe('context.runAsSystem', () => { + it('requires a non-empty reason', async () => { + await expect(runAsSystem('', async () => {})).rejects.toThrow(/non-empty reason/); + }); + + it('establishes a system context with generated requestId', async () => { + let captured: RequestContext | null = null; + await runAsSystem('test-cron', async () => { + captured = current(); + }); + expect(captured).not.toBeNull(); + expect(captured!.origin).toBe('system'); + expect(captured!.actor).toEqual({ type: 'system', id: null }); + expect(captured!.systemReason).toBe('test-cron'); + expect(captured!.organizationId).toBeNull(); + expect(captured!.projectId).toBeNull(); + expect(captured!.requestId).toMatch(/^[0-9a-f-]{36}$/); + }); +}); diff --git a/packages/shared/src/context/storage.ts b/packages/shared/src/context/storage.ts new file mode 100644 index 00000000..6529def3 --- /dev/null +++ b/packages/shared/src/context/storage.ts @@ -0,0 +1,49 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import { randomUUID } from 'node:crypto'; +import type { RequestContext } from './types.js'; + +export const contextStorage = new AsyncLocalStorage(); + +export async function run(ctx: RequestContext, fn: () => Promise | T): Promise { + return contextStorage.run(ctx, fn); +} + +export function enterWith(ctx: RequestContext): void { + contextStorage.enterWith(ctx); +} + +export function currentOrNull(): RequestContext | undefined { + return contextStorage.getStore(); +} + +export function current(): RequestContext { + const ctx = contextStorage.getStore(); + if (!ctx) { + throw new Error('RequestContext not established (call context.run / context.enterWith first)'); + } + return ctx; +} + +export async function withPatch( + patch: Partial, + fn: () => Promise | T +): Promise { + const base = current(); + const next: RequestContext = { ...base, ...patch }; + return contextStorage.run(next, fn); +} + +export async function runAsSystem(reason: string, fn: () => Promise | T): Promise { + if (!reason || typeof reason !== 'string' || reason.trim() === '') { + throw new Error('runAsSystem requires a non-empty reason'); + } + const ctx: RequestContext = { + requestId: randomUUID(), + origin: 'system', + actor: { type: 'system', id: null }, + organizationId: null, + projectId: null, + systemReason: reason.trim(), + }; + return contextStorage.run(ctx, fn); +} diff --git a/packages/shared/src/context/test-helpers.ts b/packages/shared/src/context/test-helpers.ts new file mode 100644 index 00000000..dc363e9c --- /dev/null +++ b/packages/shared/src/context/test-helpers.ts @@ -0,0 +1,21 @@ +import { run } from './storage.js'; +import type { RequestContext } from './types.js'; + +/** + * Test helper: runs `fn` inside a synthetic context. + * Sensible defaults; merge in `partial` to override. + */ +export async function withContext( + partial: Partial, + fn: () => Promise | T +): Promise { + const ctx: RequestContext = { + requestId: 'test-req-id', + origin: 'http', + actor: { type: 'system', id: null }, + organizationId: null, + projectId: null, + ...partial, + }; + return run(ctx, fn); +} diff --git a/packages/shared/src/context/types.ts b/packages/shared/src/context/types.ts new file mode 100644 index 00000000..d6874e31 --- /dev/null +++ b/packages/shared/src/context/types.ts @@ -0,0 +1,37 @@ +import type { ApiKeyType } from '../constants/log-constants.js'; + +export type ActorType = 'user' | 'apiKey' | 'system'; + +export interface Actor { + readonly type: ActorType; + readonly id: string | null; + readonly email?: string; + readonly apiKeyType?: ApiKeyType; +} + +export type Origin = 'http' | 'job' | 'system'; + +export interface RequestContext { + readonly requestId: string; + readonly origin: Origin; + readonly actor: Actor; + readonly organizationId: string | null; + readonly projectId: string | null; + readonly ip?: string; + readonly userAgent?: string; + readonly systemReason?: string; +} + +export interface SerializedContext { + v: 1; + requestId: string; + origin: Origin; + actor: Actor; + organizationId: string | null; + projectId: string | null; + ip?: string; + userAgent?: string; + systemReason?: string; +} + +export const SERIALIZED_CONTEXT_VERSION = 1 as const; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 392a5095..2681c940 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -9,3 +9,6 @@ export * from './schemas/index.js'; // Export utilities export * from './utils/index.js'; + +// NOTE: context module is NOT re-exported here — it imports node:async_hooks +// and is server-only. Import it explicitly from `@logtide/shared/context`.