Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
edf2ca4
context: add RequestContext types
Polliog May 7, 2026
7985cb3
context: add async storage with run/enterWith/with/runAsSystem
Polliog May 7, 2026
f51b448
context: add serialize/deserialize for queue payloads
Polliog May 7, 2026
83f264f
context: add withContext test helper
Polliog May 7, 2026
264d91e
context: add fetchWithContext helper
Polliog May 7, 2026
29329a5
context: export public API from @logtide/shared
Polliog May 7, 2026
abf951e
context: register Fastify plugin after authPlugin
Polliog May 7, 2026
fd8134e
context: align Actor.apiKeyType with ApiKeyType
Polliog May 7, 2026
dd84b23
context: piggy-back _ctx on BullMQ producer payloads
Polliog May 7, 2026
a9a13a0
context: piggy-back _ctx on graphile-worker payloads
Polliog May 7, 2026
4894bcc
context: wrap worker processors with context.run/runAsSystem
Polliog May 7, 2026
5ed0e09
context: add SQL comment formatter (driver-wrap pending)
Polliog May 7, 2026
6da9d4b
context: prepend SQL comment via pg.Pool wrapper
Polliog May 7, 2026
3cb0c68
context: wrap cron callbacks in runAsSystem
Polliog May 7, 2026
8c0d86b
context: timescale engine prepends SQL comment from context
Polliog May 7, 2026
01fb0c6
context: clickhouse engine sets query_id and SQL comment
Polliog May 7, 2026
5138b82
context: mongodb engine injects $comment from context
Polliog May 7, 2026
8f692be
context: add propagation integration test
Polliog May 7, 2026
7bc16db
context: also patch pool.connect to cover kysely client.query
Polliog May 7, 2026
a1ec3fe
context: clickhouse insert() carries query_id + log_comment
Polliog May 7, 2026
b5e9a18
context: cover user and apiKey actor cases in fastify plugin test
Polliog May 7, 2026
882a34e
context: guard against null client in patched pool.connect
Polliog May 7, 2026
faff62b
context: fix clickhouse runQuery typing and remove unused client refs
Polliog May 7, 2026
efaafa3
load-test: seed script creates full-scope API key
Polliog May 7, 2026
8eea72e
apikeys: debounce last_used updates 60s per key
Polliog May 8, 2026
1645b44
merge develop into feature/213
Polliog May 8, 2026
ddd1c5d
context: split context module into @logtide/shared/context subpath
Polliog May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions packages/backend/src/context/bullmq-context.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { describe, it, expect, vi } from 'vitest';
import { context, withContext } from '@logtide/shared/context';
import { attachContextToPayload, wrapProcessorWithContext, CTX_KEY } from './bullmq-context.js';

describe('attachContextToPayload', () => {
it('returns input unchanged when no context', () => {
const data = { foo: 1 };
expect(attachContextToPayload(data)).toBe(data);
});

it('adds _ctx with v=1 when a context is active', async () => {
await withContext({ requestId: 'req-1', organizationId: 'org-1' }, async () => {
const out = attachContextToPayload({ foo: 1 }) as any;
expect(out._ctx.v).toBe(1);
expect(out._ctx.requestId).toBe('req-1');
expect(out._ctx.organizationId).toBe('org-1');
expect(out.foo).toBe(1);
});
});

it('handles non-object payloads gracefully', () => {
expect(attachContextToPayload('hello' as unknown as object)).toBe('hello');
});
});

describe('wrapProcessorWithContext', () => {
it('strips _ctx and runs processor inside context.run', async () => {
const processor = vi.fn(async (job) => {
// expect data does not contain _ctx
expect((job.data as any)._ctx).toBeUndefined();
// expect context is established
const ctx = context.current();
expect(ctx.requestId).toBe('req-7');
expect(ctx.origin).toBe('job');
});

const wrapped = wrapProcessorWithContext('test-job', processor);
await wrapped({
id: 'j1',
name: 'test-job',
data: {
payload: 'x',
[CTX_KEY]: {
v: 1,
requestId: 'req-7',
origin: 'http',
actor: { type: 'system', id: null },
organizationId: null,
projectId: null,
},
} as any,
});
expect(processor).toHaveBeenCalledOnce();
});

it('falls back to runAsSystem when _ctx is missing', async () => {
const processor = vi.fn(async () => {
const ctx = context.current();
expect(ctx.origin).toBe('system');
expect(ctx.systemReason).toBe('bullmq-job:no-ctx-job');
});

const wrapped = wrapProcessorWithContext('no-ctx-job', processor);
await wrapped({ id: 'j2', name: 'no-ctx-job', data: { payload: 'y' } as any });
expect(processor).toHaveBeenCalledOnce();
});

it('falls back to runAsSystem when _ctx has unknown version', async () => {
const processor = vi.fn(async () => {
const ctx = context.current();
expect(ctx.origin).toBe('system');
});

const wrapped = wrapProcessorWithContext('bad-ctx-job', processor);
await wrapped({
id: 'j3',
name: 'bad-ctx-job',
data: { [CTX_KEY]: { v: 99 } } as any,
});
expect(processor).toHaveBeenCalledOnce();
});
});
45 changes: 45 additions & 0 deletions packages/backend/src/context/bullmq-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { currentOrNull, deserializeContext, run, runAsSystem, serializeContext } from '@logtide/shared/context';
import type { JobProcessor } from '../queue/abstractions/types.js';

export const CTX_KEY = '_ctx';

/**
* Adds `_ctx` to a job payload if a context is currently active.
* Returns the (possibly modified) payload as `unknown` to keep adapter generics sane.
*/
export function attachContextToPayload<T>(data: T): T {
const ctx = currentOrNull();
if (!ctx) return data;
if (typeof data !== 'object' || data === null) return data;
return { ...(data as object), [CTX_KEY]: serializeContext(ctx) } as T;
}

/**
* Wraps a JobProcessor: detaches `_ctx` from the payload (so the user processor sees clean data)
* and runs the processor inside the appropriate context.run / runAsSystem.
*/
export function wrapProcessorWithContext<T>(
jobName: string,
processor: JobProcessor<T>
): JobProcessor<T> {
return async (job) => {
const raw = job.data as T & { [CTX_KEY]?: unknown };
const serialized = (raw as Record<string, unknown> | null)?.[CTX_KEY];
const cleanData =
raw && typeof raw === 'object'
? (() => {
const { [CTX_KEY]: _omit, ...rest } = raw as Record<string, unknown>;
return rest as T;
})()
: raw;

const cleanJob = { ...job, data: cleanData };

const ctx = serialized != null ? deserializeContext(serialized) : undefined;
if (ctx) {
await run(ctx, () => processor(cleanJob));
} else {
await runAsSystem(`bullmq-job:${jobName}`, () => processor(cleanJob));
}
};
}
36 changes: 36 additions & 0 deletions packages/backend/src/context/database-comment.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { describe, it, expect } from 'vitest';
import { withContext, currentOrNull } from '@logtide/shared/context';
import { formatContextComment } from './kysely-plugin.js';

describe('SQL comment injection helpers', () => {
it('formatContextComment can be invoked deterministically', () => {
const out = formatContextComment({
requestId: 'r1',
origin: 'http',
organizationId: 'o1',
actor: { type: 'user', id: 'u1' },
});
expect(out).toMatch(/^\/\* req=r1 origin=http org=o1 actor=user:u1 \*\/ $/);
});

it('manually-built patched query function prepends comment', async () => {
const captured: string[] = [];
const fakeOriginal = (sql: string) => {
captured.push(sql);
return Promise.resolve({ rows: [], rowCount: 0 });
};
const patched = (...args: any[]) => {
const ctx = currentOrNull();
if (!ctx) return fakeOriginal(...(args as [string]));
const comment = formatContextComment(ctx);
args[0] = comment + args[0];
return fakeOriginal(...(args as [string]));
};

await withContext({ requestId: 'rZ', organizationId: 'oZ' }, async () => {
await patched('SELECT 1');
});

expect(captured[0]).toMatch(/^\/\* req=rZ /);
});
});
67 changes: 67 additions & 0 deletions packages/backend/src/context/fastify-plugin.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { describe, it, expect } from 'vitest';
import Fastify from 'fastify';
import { context } from '@logtide/shared/context';
import { contextPlugin } from './fastify-plugin.js';

describe('contextPlugin', () => {
async function buildApp(authStub?: (req: any) => void) {
const app = Fastify({ logger: false });
if (authStub) {
// Simulate the auth plugin running BEFORE contextPlugin (registration order)
app.addHook('onRequest', async (req) => {
authStub(req);
});
}
await app.register(contextPlugin);
app.get('/echo', async () => {
const ctx = context.current();
return {
requestId: ctx.requestId,
origin: ctx.origin,
actorType: ctx.actor.type,
actorId: ctx.actor.id,
organizationId: ctx.organizationId,
};
});
return app;
}

it('establishes a context for every request (anonymous)', async () => {
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/echo' });
expect(res.statusCode).toBe(200);
const body = res.json();
expect(body.origin).toBe('http');
expect(body.actorType).toBe('system');
expect(body.requestId).toBeTruthy();
await app.close();
});

it('captures user actor when auth populated request.user', async () => {
const app = await buildApp((req) => {
req.user = { id: 'u-abc', email: 'alice@test' };
req.organizationId = 'org-1';
});
const res = await app.inject({ method: 'GET', url: '/echo' });
const body = res.json();
expect(body.actorType).toBe('user');
expect(body.actorId).toBe('u-abc');
expect(body.organizationId).toBe('org-1');
await app.close();
});

it('captures apiKey actor when auth populated request.apiKeyId', async () => {
const app = await buildApp((req) => {
req.apiKeyId = 'key-xyz';
req.apiKeyType = 'write';
req.organizationId = 'org-2';
req.projectId = 'proj-2';
});
const res = await app.inject({ method: 'GET', url: '/echo' });
const body = res.json();
expect(body.actorType).toBe('apiKey');
expect(body.actorId).toBe('key-xyz');
expect(body.organizationId).toBe('org-2');
await app.close();
});
});
44 changes: 44 additions & 0 deletions packages/backend/src/context/fastify-plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import type { FastifyPluginAsync, FastifyRequest } from 'fastify';
import fp from 'fastify-plugin';
import type { ApiKeyType } from '@logtide/shared';
import { context, type Actor, type RequestContext } from '@logtide/shared/context';

type AuthDecoratedRequest = FastifyRequest & {
organizationId?: string;
projectId?: string;
apiKeyId?: string;
apiKeyType?: ApiKeyType;
user?: { id: string; email: string };
};

const plugin: FastifyPluginAsync = async (fastify) => {
fastify.addHook('onRequest', async (request) => {
const r = request as AuthDecoratedRequest;

const actor: Actor = r.user
? { type: 'user', id: r.user.id, email: r.user.email }
: r.apiKeyId
? { type: 'apiKey', id: r.apiKeyId, apiKeyType: r.apiKeyType }
: { type: 'system', id: null };

const userAgentHeader = r.headers['user-agent'];
const userAgent =
typeof userAgentHeader === 'string' ? userAgentHeader : undefined;

const ctx: RequestContext = {
requestId: r.id,
origin: 'http',
actor,
organizationId: r.organizationId ?? null,
projectId: r.projectId ?? null,
ip: r.ip,
userAgent,
};
context.enterWith(ctx);
});
};

export const contextPlugin = fp(plugin, {
name: 'context',
fastify: '5.x',
});
2 changes: 2 additions & 0 deletions packages/backend/src/context/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { contextPlugin } from './fastify-plugin.js';
export { context } from '@logtide/shared/context';
34 changes: 34 additions & 0 deletions packages/backend/src/context/kysely-plugin.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { describe, it, expect } from 'vitest';
import { formatContextComment, safeForComment } from './kysely-plugin.js';

describe('formatContextComment', () => {
it('formats basic context', () => {
const out = formatContextComment({
requestId: 'req-1',
origin: 'http',
organizationId: 'org-1',
actor: { type: 'user', id: 'u1' },
});
expect(out).toBe('/* req=req-1 origin=http org=org-1 actor=user:u1 */ ');
});

it('uses dash placeholder for null fields', () => {
const out = formatContextComment({
requestId: 'req-1',
origin: 'system',
organizationId: null,
actor: { type: 'system', id: null },
});
expect(out).toBe('/* req=req-1 origin=system org=- actor=system:- */ ');
});
});

describe('safeForComment', () => {
it('strips dangerous characters', () => {
expect(safeForComment('a*/b\nc')).toBe('abc');
});
it('returns dash for empty result', () => {
expect(safeForComment('***')).toBe('-');
expect(safeForComment(null)).toBe('-');
});
});
45 changes: 45 additions & 0 deletions packages/backend/src/context/kysely-plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type {
KyselyPlugin,
PluginTransformQueryArgs,
PluginTransformResultArgs,
QueryResult,
RootOperationNode,
UnknownRow,
} from 'kysely';

/**
* v1 placeholder. The actual SQL comment injection happens at the pg.Pool level
* (see database/connection.ts) so the comment lands in front of the wire query
* without depending on Kysely AST internals (which differ per query kind).
*
* Kept as a stub so future work can move comment injection back into the plugin
* if Kysely exposes a stable RawNode-prefix API.
*/
export class ContextSqlCommentPlugin implements KyselyPlugin {
transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
return args.node;
}
transformResult(args: PluginTransformResultArgs): Promise<QueryResult<UnknownRow>> {
return Promise.resolve(args.result);
}
}

const SAFE_RE = /[^a-zA-Z0-9_:-]/g;

export function safeForComment(value: string | null | undefined): string {
if (!value) return '-';
const cleaned = value.replace(SAFE_RE, '');
return cleaned.length > 0 ? cleaned : '-';
}

export function formatContextComment(ctx: {
requestId: string;
origin: string;
organizationId: string | null;
actor: { type: string; id: string | null };
}): string {
const actor = `${safeForComment(ctx.actor.type)}:${safeForComment(ctx.actor.id)}`;
return `/* req=${safeForComment(ctx.requestId)} origin=${safeForComment(
ctx.origin
)} org=${safeForComment(ctx.organizationId)} actor=${actor} */ `;
}
Loading
Loading