From b33629b82b7201366fe071e5f881907c23b1fc5f Mon Sep 17 00:00:00 2001 From: Xetera Date: Tue, 12 May 2026 12:42:54 +0300 Subject: [PATCH 1/6] wip: rpc --- package-lock.json | 7 ++ package.json | 1 + src/env.ts | 8 +- src/main.ts | 39 ++++++-- src/remote/api-client.ts | 162 ++++++++++++++++++++++++++++++++++ src/remote/query-optimizer.ts | 5 ++ src/remote/remote.ts | 24 +++++ 7 files changed, 237 insertions(+), 9 deletions(-) create mode 100644 src/remote/api-client.ts diff --git a/package-lock.json b/package-lock.json index 67d792ca..dd0b3fe8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "@pgsql/types": "^17.6.2", "@query-doctor/core": "^0.8.9", "async-sema": "^3.1.1", + "capnweb": "^0.7.0", "dedent": "^1.7.1", "fast-csv": "^5.0.5", "fastify": "^5.7.4", @@ -2248,6 +2249,12 @@ "node": ">=0.10.0" } }, + "node_modules/capnweb": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/capnweb/-/capnweb-0.7.0.tgz", + "integrity": "sha512-zO7tt5ch2tImacaR/oMd7e1dqi/fWU7hjZdvQMv6Yo3v9uUGA8cPIUQGvfQTu2c+NgyE/j/oDmMaUlf1PXyfJw==", + "license": "MIT" + }, "node_modules/chai": { "version": "6.2.2", "resolved": "https://registry.npmjs.org/chai/-/chai-6.2.2.tgz", diff --git a/package.json b/package.json index 7e4bf3ca..0a9e8892 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "@pgsql/types": "^17.6.2", "@query-doctor/core": "^0.8.9", "async-sema": "^3.1.1", + "capnweb": "^0.7.0", "dedent": "^1.7.1", "fast-csv": "^5.0.5", "fastify": "^5.7.4", diff --git a/src/env.ts b/src/env.ts index ac927f12..905dc629 100644 --- a/src/env.ts +++ b/src/env.ts @@ -14,11 +14,15 @@ const envSchema = z.object({ GITHUB_TOKEN: z.string().optional(), LOG_PATH: z.string().optional(), POSTGRES_URL: z.string().optional(), - SOURCE_DATABASE_URL: z.string().optional(), + SOURCE_DATABASE_URL: z.string({ + error: + "SOURCE_DATABASE_URL is required. Set it to the connection string of the database you want to analyze.", + }), DEBUG: z.stringbool().default(false), STATISTICS_PATH: z.string().optional(), - SITE_API_ENDPOINT: z.url().optional(), + SITE_API_ENDPOINT: z.url().default("https://api.querydoctor.com"), + TOKEN: z.string().optional(), GITHUB_REPOSITORY: z.string().optional(), }); diff --git a/src/main.ts b/src/main.ts index f6c089f9..059afa73 100644 --- a/src/main.ts +++ b/src/main.ts @@ -13,6 +13,11 @@ import { } from "./reporters/site-api.ts"; import { formatCost, queryPreview } from "./reporters/github/github.ts"; import { DEFAULT_CONFIG, fetchAnalyzerConfig } from "./config.ts"; +import { ApiClient, hookUpApiReporter } from "./remote/api-client.ts"; +import { Remote } from "./remote/remote.ts"; +import { ConnectionManager } from "./sync/connection-manager.ts"; + +const INVALID_TOKEN_ERROR = "Unauthorized" async function runInCI( targetPostgresUrl: Connectable, @@ -136,14 +141,38 @@ async function runOutsideCI() { "main", ); if (!env.POSTGRES_URL) { - core.setFailed("POSTGRES_URL environment variable is not set"); - process.exit(1); + throw new Error("POSTGRES_URL environment variable is not set. If you're seeing this inside Docker something has gone wrong"); + } + if (!env.TOKEN) { + throw new Error("TOKEN environment variable is not set\nYou probably forgot to pass a `-e TOKEN=...` parameter to the docker container"); + } + const remote = new Remote( + Connectable.fromString(env.POSTGRES_URL), + ConnectionManager.forLocalDatabase(), + ); + try { + const client = await ApiClient.connect(env.SITE_API_ENDPOINT, env.TOKEN, { kind: "persistent" }, remote); + await client.ping(); + log.info("Connected to the api", "main") + hookUpApiReporter(client, remote); + const endpoint = env.SITE_API_ENDPOINT; + const token = env.TOKEN; + client.onRpcBroken(() => { + ApiClient.connectWithReconnect(endpoint, token, { kind: "persistent" }, remote); + }); + } catch (err) { + if (err instanceof Error) { + if (err.message === INVALID_TOKEN_ERROR) { + throw new Error("Your TOKEN was invalid. Make sure to go to your project settings to copy the right TOKEN value.") + } + } + throw new Error(`Failed to connect to ${env.SITE_API_ENDPOINT}\nIf you're using a custom SITE_API_ENDPOINT you need to double check your db\n${err}`); } const server = await createServer( env.HOST, env.PORT, Connectable.fromString(env.POSTGRES_URL), - env.SOURCE_DATABASE_URL ? Connectable.fromString(env.SOURCE_DATABASE_URL) : undefined, + Connectable.fromString(env.SOURCE_DATABASE_URL), ); const shutdown = async () => { @@ -162,10 +191,6 @@ async function main() { core.setFailed("POSTGRES_URL environment variable is not set"); process.exit(1); } - if (!env.SOURCE_DATABASE_URL) { - core.setFailed("SOURCE_DATABASE_URL environment variable is not set"); - process.exit(1); - } if (!env.LOG_PATH) { core.setFailed("LOG_PATH environment variable is not set"); process.exit(1); diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts new file mode 100644 index 00000000..d2800539 --- /dev/null +++ b/src/remote/api-client.ts @@ -0,0 +1,162 @@ +import { newWebSocketRpcSession, RpcTarget } from "capnweb"; +import type { RpcStub } from "capnweb"; +import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi } from "@query-doctor/core"; +import type { ExportedStats } from "@query-doctor/core"; +import { PgIdentifier, Statistics } from "@query-doctor/core"; +import { log } from "../log.ts"; +import type { Remote } from "./remote.ts"; +import type { OptimizedQuery } from "../sql/recent-query.ts"; + +export function hookUpApiReporter(api: RpcStub, remote: Remote): () => void { + const onExtensionPresenceChanged = (presence: Parameters[0]) => { + api.setExtensionPresence(presence).catch((err) => { + log.error(`Failed to report extension presence: ${err}`, "api-client"); + }); + }; + const onDumpLog = (line: string) => { + api.log(line, "pg_dump").catch((err) => { + log.error(`Failed to send dump log: ${err}`, "api-client"); + }); + }; + const onRestoreLog = (line: string) => { + api.log(line, "pg_restore").catch((err) => { + log.error(`Failed to send restore log: ${err}`, "api-client"); + }); + }; + const onSchemaSynced = (schema: Parameters[0]) => { + api.pushSchema(schema).catch((err) => { + log.error(`Failed to push schema: ${err}`, "api-client"); + }); + }; + const onStatsApplied = (stats: Parameters[0]) => { + api.pushStats(stats).catch((err) => { + log.error(`Failed to push stats: ${err}`, "api-client"); + }); + }; + const onQueriesPolled = (queries: Parameters[0]) => { + api.pushQuery(queries).catch((err) => { + log.error(`Failed to push polled queries: ${err}`, "api-client"); + }); + }; + const pushOptimizedQuery = (query: OptimizedQuery) => { + api.pushQuery([query]).catch((err) => { + log.error(`Failed to push optimized query: ${err}`, "api-client"); + }); + }; + + remote.on("extensionPresenceChanged", onExtensionPresenceChanged); + remote.on("dumpLog", onDumpLog); + remote.on("restoreLog", onRestoreLog); + remote.on("schemaSynced", onSchemaSynced); + remote.on("statsApplied", onStatsApplied); + remote.on("queriesPolled", onQueriesPolled); + remote.optimizer.on("noImprovements", pushOptimizedQuery); + remote.optimizer.on("improvementsAvailable", pushOptimizedQuery); + remote.optimizer.on("zeroCostPlan", pushOptimizedQuery); + remote.optimizer.on("timeout", pushOptimizedQuery); + + return () => { + remote.off("extensionPresenceChanged", onExtensionPresenceChanged); + remote.off("dumpLog", onDumpLog); + remote.off("restoreLog", onRestoreLog); + remote.off("schemaSynced", onSchemaSynced); + remote.off("statsApplied", onStatsApplied); + remote.off("queriesPolled", onQueriesPolled); + remote.optimizer.off("noImprovements", pushOptimizedQuery); + remote.optimizer.off("improvementsAvailable", pushOptimizedQuery); + remote.optimizer.off("zeroCostPlan", pushOptimizedQuery); + remote.optimizer.off("timeout", pushOptimizedQuery); + }; +} + +export class ApiClient extends RpcTarget implements ClientApi { + static #name = "ApiClient" + static #PING_INTERVAL_MS = 30_000; + static #PING_MAX_BACKOFF_MS = 300_000; + + private constructor(private readonly remote: Remote) { + super(); + } + + static async connect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): Promise> { + const wsEndpoint = `${endpoint}/relay`.replace(/^http/, "ws"); + const unauthenticated = newWebSocketRpcSession(wsEndpoint); + const api = await unauthenticated.authenticate(token, new this(remote), mode) as unknown as RpcStub; + this.schedulePingTimer(api); + return api; + } + + static connectWithReconnect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): void { + let cleanup: (() => void) | undefined; + const attempt = async (failCount: number) => { + try { + const api = await this.connect(endpoint, token, mode, remote); + log.info(`Connected to the api`, this.#name); + cleanup = hookUpApiReporter(api, remote); + api.onRpcBroken((err) => { + log.error(`Connection broken: ${err}`, this.#name); + cleanup?.(); + cleanup = undefined; + const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); + setTimeout(() => attempt(failCount + 1), delay); + }); + } catch (err) { + log.error(`Failed to connect: ${err}`, this.#name); + const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); + setTimeout(() => attempt(failCount + 1), delay); + } + }; + attempt(0); + } + + static schedulePingTimer(api: RpcStub, failCount = 0) { + const timer = setInterval(() => { + api.ping().then(() => { + if (failCount > 0) { + log.info(`Reached the server again`, this.#name) + failCount = 0 + } + }, err => { + log.error(`Could not ping the API server\n${err}`, this.#name) + clearInterval(timer); + const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); + setTimeout(() => { + ApiClient.schedulePingTimer(api, failCount + 1) + }, delay) + }); + }, this.#PING_INTERVAL_MS); + } + + async repull(): Promise { + await this.remote.resync(); + return {}; + } + + async refreshQueries(): Promise { + await this.remote.resync(); + } + + async updateStatistics(stats: ExportedStats[]): Promise { + await this.remote.applyStatistics(Statistics.statsModeFromExport(stats)); + } + + async hideIndex(indexName: string): Promise { + this.remote.optimizer.toggleIndex(PgIdentifier.fromString(indexName)); + } + + async addIndex(index: IndexDefinition): Promise { + await this.remote.optimizer.createIndex( + index.tableName, + index.columns.map((c) => ({ + name: c.name, + order: (c.order?.toLowerCase() ?? "asc") as "asc" | "desc", + })), + ); + } + + async runQuery(_query: string): Promise { + log.warn("runQuery is not implemented", ApiClient.name); + } +} + +const secs = (ms: number) => ms * 1000; diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index c67ad68e..12927ba2 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -18,6 +18,7 @@ import { PostgresVersion, Statistics, StatisticsMode, + type ExportedStats, } from "@query-doctor/core"; import { Connectable } from "../sync/connectable.ts"; import { parse } from "@libpg-query/parser"; @@ -107,6 +108,10 @@ export class QueryOptimizer extends EventEmitter { return this.target?.statistics.computedStats; } + get ownMetadata(): ExportedStats[] | undefined { + return this.target?.statistics.ownMetadata; + } + getExistingIndexes(): IndexedTable[] { return this.existingIndexes; } diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 212bff07..f6ec53d1 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -4,6 +4,8 @@ import { PostgresVersion, Statistics, StatisticsMode, + type ExtensionPresence, + type ExportedStats, } from "@query-doctor/core"; import { type Connectable } from "../sync/connectable.ts"; import { DumpCommand, RestoreCommand } from "../sync/schema-link.ts"; @@ -22,6 +24,10 @@ import { SchemaLoader } from "./schema-loader.ts"; type RemoteEvents = { dumpLog: [line: string]; restoreLog: [line: string]; + extensionPresenceChanged: [presence: ExtensionPresence]; + queriesPolled: [queries: RecentQuery[]]; + schemaSynced: [schema: FullSchema]; + statsApplied: [stats: ExportedStats[]]; }; /** @@ -56,6 +62,7 @@ export class Remote extends EventEmitter { */ private baseDbURL: Connectable; private generation = 0; + private lastSourceDb?: Connectable; /** The URL of the current generation optimizing db */ private optimizingDbUDRL: Connectable; @@ -86,6 +93,13 @@ export class Remote extends EventEmitter { this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL); } + async resync(): Promise> { + if (!this.lastSourceDb) { + throw new Error("No source database has been synced yet"); + } + return this.syncFrom(this.lastSourceDb, { type: "pullFromSource" }); + } + async syncFrom( source: Connectable, statsStrategy: StatisticsStrategy = { type: "pullFromSource" }, @@ -105,6 +119,7 @@ export class Remote extends EventEmitter { }; } > { + this.lastSourceDb = source; await this.resetDatabase(); // First batch: get schema and other info in parallel (needed for stats decision) @@ -134,6 +149,7 @@ export class Remote extends EventEmitter { if (fullSchema.status === "fulfilled") { this.schemaLoader?.update(fullSchema.value); + this.emit("schemaSynced", fullSchema.value); } // Second: resolve stats strategy using table list from schema @@ -162,6 +178,7 @@ export class Remote extends EventEmitter { type: "extension_not_installed", extensionName: recentQueries.reason.extensionNames[0], }; + this.emit("extensionPresenceChanged", { type: "missing" }); } await this.onSuccessfulSync( @@ -359,6 +376,10 @@ export class Remote extends EventEmitter { async applyStatistics(statsMode: StatisticsMode): Promise { await this.optimizer.setStatistics(statsMode); + const stats = this.optimizer.ownMetadata; + if (stats) { + this.emit("statsApplied", stats); + } // don't block the reply by awaiting all optimizations this.optimizer.restart(); } @@ -410,8 +431,11 @@ export class Remote extends EventEmitter { console.error(error); }); this.pgStatStatementsStatus = PgStatStatementsStatus.Installed; + this.emit("extensionPresenceChanged", { type: "present", extensionName: "pg_stat_statements", needsRestart: false }); + this.emit("queriesPolled", queries); }).on("pgStatStatementsNotInstalled", () => { this.pgStatStatementsStatus = PgStatStatementsStatus.NotInstalled; + this.emit("extensionPresenceChanged", { type: "missing" }); }); this.queryLoader.on("exit", () => { log.error("Query loader exited", "remote"); From 848dd7898dd9a310377158b07c3f81ce22e59dd9 Mon Sep 17 00:00:00 2001 From: Xetera Date: Wed, 13 May 2026 18:04:23 +0300 Subject: [PATCH 2/6] feat: emit queries --- src/main.ts | 27 +++++++-------------------- src/remote/api-client.ts | 40 ++++++++++++++++++---------------------- src/remote/remote.ts | 6 +++++- src/server/http.ts | 6 ++++-- src/sql/recent-query.ts | 28 ++++++++++++++++++++++++++++ src/sync/pg-connector.ts | 3 +-- 6 files changed, 63 insertions(+), 47 deletions(-) diff --git a/src/main.ts b/src/main.ts index 059afa73..542f1d14 100644 --- a/src/main.ts +++ b/src/main.ts @@ -146,33 +146,20 @@ async function runOutsideCI() { if (!env.TOKEN) { throw new Error("TOKEN environment variable is not set\nYou probably forgot to pass a `-e TOKEN=...` parameter to the docker container"); } + const sourceDb = Connectable.fromString(env.SOURCE_DATABASE_URL) const remote = new Remote( Connectable.fromString(env.POSTGRES_URL), ConnectionManager.forLocalDatabase(), + ConnectionManager.forRemoteDatabase(), + { disableQueryLoader: false }, + sourceDb, ); - try { - const client = await ApiClient.connect(env.SITE_API_ENDPOINT, env.TOKEN, { kind: "persistent" }, remote); - await client.ping(); - log.info("Connected to the api", "main") - hookUpApiReporter(client, remote); - const endpoint = env.SITE_API_ENDPOINT; - const token = env.TOKEN; - client.onRpcBroken(() => { - ApiClient.connectWithReconnect(endpoint, token, { kind: "persistent" }, remote); - }); - } catch (err) { - if (err instanceof Error) { - if (err.message === INVALID_TOKEN_ERROR) { - throw new Error("Your TOKEN was invalid. Make sure to go to your project settings to copy the right TOKEN value.") - } - } - throw new Error(`Failed to connect to ${env.SITE_API_ENDPOINT}\nIf you're using a custom SITE_API_ENDPOINT you need to double check your db\n${err}`); - } + ApiClient.connectWithReconnect(env.SITE_API_ENDPOINT, env.TOKEN, { kind: "persistent" }, remote); const server = await createServer( env.HOST, env.PORT, - Connectable.fromString(env.POSTGRES_URL), - Connectable.fromString(env.SOURCE_DATABASE_URL), + remote, + sourceDb ); const shutdown = async () => { diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts index d2800539..200d272f 100644 --- a/src/remote/api-client.ts +++ b/src/remote/api-client.ts @@ -24,7 +24,7 @@ export function hookUpApiReporter(api: RpcStub, remote: Remote): () = }); }; const onSchemaSynced = (schema: Parameters[0]) => { - api.pushSchema(schema).catch((err) => { + api.pushSchema(JSON.parse(JSON.stringify(schema))).catch((err) => { log.error(`Failed to push schema: ${err}`, "api-client"); }); }; @@ -34,12 +34,15 @@ export function hookUpApiReporter(api: RpcStub, remote: Remote): () = }); }; const onQueriesPolled = (queries: Parameters[0]) => { - api.pushQuery(queries).catch((err) => { + api.pushQuery(JSON.parse(JSON.stringify(queries))).catch((err) => { log.error(`Failed to push polled queries: ${err}`, "api-client"); }); }; const pushOptimizedQuery = (query: OptimizedQuery) => { - api.pushQuery([query]).catch((err) => { + console.log('pushing optimization 2', query) + const q = [query.toJSON()] + console.log('pushing optimization', q) + api.pushQuery(q).catch((err) => { log.error(`Failed to push optimized query: ${err}`, "api-client"); }); }; @@ -94,42 +97,37 @@ export class ApiClient extends RpcTarget implements ClientApi { log.info(`Connected to the api`, this.#name); cleanup = hookUpApiReporter(api, remote); api.onRpcBroken((err) => { - log.error(`Connection broken: ${err}`, this.#name); + const delay = Math.min(failCount * 1000, this.#PING_MAX_BACKOFF_MS); + log.error(`Connection broken: ${err}, reconnecting in ${delay}ms`, this.#name); cleanup?.(); cleanup = undefined; - const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); setTimeout(() => attempt(failCount + 1), delay); }); } catch (err) { - log.error(`Failed to connect: ${err}`, this.#name); - const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); + if (err instanceof Error && err.message === "Unauthorized") { + log.error(`Invalid TOKEN, cannot connect to the api`, this.#name); + return; + } + const delay = Math.min(failCount * 1000, this.#PING_MAX_BACKOFF_MS); + log.error(`Failed to connect: ${err}, reconnecting in ${delay}ms`, this.#name); setTimeout(() => attempt(failCount + 1), delay); } }; attempt(0); } - static schedulePingTimer(api: RpcStub, failCount = 0) { + static schedulePingTimer(api: RpcStub) { const timer = setInterval(() => { - api.ping().then(() => { - if (failCount > 0) { - log.info(`Reached the server again`, this.#name) - failCount = 0 - } - }, err => { + api.ping().catch(err => { + console.error(err) log.error(`Could not ping the API server\n${err}`, this.#name) clearInterval(timer); - const delay = Math.min(2 ** secs(failCount), this.#PING_MAX_BACKOFF_MS); - setTimeout(() => { - ApiClient.schedulePingTimer(api, failCount + 1) - }, delay) }); }, this.#PING_INTERVAL_MS); } - async repull(): Promise { + async repull(): Promise { await this.remote.resync(); - return {}; } async refreshQueries(): Promise { @@ -158,5 +156,3 @@ export class ApiClient extends RpcTarget implements ClientApi { log.warn("runQuery is not implemented", ApiClient.name); } } - -const secs = (ms: number) => ms * 1000; diff --git a/src/remote/remote.ts b/src/remote/remote.ts index f6ec53d1..3cdac44e 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -83,7 +83,8 @@ export class Remote extends EventEmitter { /** The manager for ONLY the source db connections */ private readonly sourceManager: ConnectionManager = ConnectionManager .forRemoteDatabase(), - private readonly options: { disableQueryLoader: boolean } = { disableQueryLoader: false } + private readonly options: { disableQueryLoader: boolean } = { disableQueryLoader: false }, + initialSourceDb?: Connectable, ) { super(); this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName); @@ -91,6 +92,9 @@ export class Remote extends EventEmitter { Remote.defaultOptimizingDbPrefix, ); this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL); + if (initialSourceDb) { + this.lastSourceDb = initialSourceDb; + } } async resync(): Promise> { diff --git a/src/server/http.ts b/src/server/http.ts index 8618ef68..3aa36e3c 100644 --- a/src/server/http.ts +++ b/src/server/http.ts @@ -117,7 +117,7 @@ async function onSyncLiveQuery(body: unknown) { export async function createServer( hostname: string, port: number, - targetDb?: Connectable, + targetDb?: Connectable | Remote, sourceDb?: Connectable, ): Promise { const fastify = Fastify({ logger: false }); @@ -148,7 +148,9 @@ export async function createServer( const remoteController = targetDb ? new RemoteController( - new Remote(targetDb, optimizingDbConnectionManager), + targetDb instanceof Remote + ? targetDb + : new Remote(targetDb, optimizingDbConnectionManager), ) : undefined; diff --git a/src/sql/recent-query.ts b/src/sql/recent-query.ts index 39290c0e..a17d65fc 100644 --- a/src/sql/recent-query.ts +++ b/src/sql/recent-query.ts @@ -39,6 +39,7 @@ export class RecentQuery { readonly isIntrospection: boolean; readonly isTargetlessSelectQuery: boolean; readonly analysisSkipped: boolean; + optimization: LiveQueryOptimization; /** Use {@link RecentQuery.analyze} instead */ constructor( @@ -70,12 +71,39 @@ export class RecentQuery { this.isTargetlessSelectQuery = this.isSelectQuery ? RecentQuery.isTargetlessSelectQuery(tableReferences) : false; + this.optimization = { state: "waiting" }; } withOptimization(optimization: LiveQueryOptimization): OptimizedQuery { return Object.assign(this, { optimization }); } + toJSON() { + // TODO: these fields should be calling toJSON recursively maybe? + return JSON.parse(JSON.stringify({ + formattedQuery: this.formattedQuery, + displayQuery: this.displayQuery, + username: this.username, + query: this.query, + meanTime: this.meanTime, + calls: this.calls, + rows: this.rows, + topLevel: this.topLevel, + isSystemQuery: this.isSystemQuery, + isSelectQuery: this.isSelectQuery, + isIntrospection: this.isIntrospection, + isTargetlessSelectQuery: this.isTargetlessSelectQuery, + analysisSkipped: this.analysisSkipped, + tableReferences: this.tableReferences, + columnReferences: this.columnReferences, + tags: this.tags, + nudges: this.nudges, + hash: this.hash, + seenAt: this.seenAt, + optimization: this.optimization, + })); + } + /** * Queries beyond this size are included in results but skip expensive * prettier.format() and Analyzer.analyze() to avoid OOM from massive diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index efadd5cb..b5a39395 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -467,8 +467,6 @@ ORDER BY } private async getQuerySource(): Promise { - if (this.querySource) - return this.querySource const results = await this.db.exec<{ schema: string; extension: string }>(` SELECT e.extname as extension, n.nspname as schema FROM pg_extension e @@ -477,6 +475,7 @@ ORDER BY `); const firstResult = results[0]; if (!firstResult) { + this.querySource = undefined; throw new ExtensionNotInstalledError([ "pg_stat_statements", "pg_stat_monitor" From 6bfac7608d4934807769abdd77df8f9d755cb034 Mon Sep 17 00:00:00 2001 From: Xetera Date: Thu, 14 May 2026 18:19:09 +0300 Subject: [PATCH 3/6] fix: implement CI workflow using rpc --- src/config.test.ts | 94 +++++++++++++++------------------------- src/config.ts | 31 ------------- src/main.ts | 30 ++++++++++--- src/remote/api-client.ts | 9 ++-- src/runner.ts | 3 +- src/sanitize.test.ts | 4 ++ src/test-setup.ts | 1 + vitest.config.ts | 1 + 8 files changed, 73 insertions(+), 100 deletions(-) create mode 100644 src/test-setup.ts diff --git a/src/config.test.ts b/src/config.test.ts index 573e9bed..b57a8883 100644 --- a/src/config.test.ts +++ b/src/config.test.ts @@ -1,79 +1,57 @@ -import { test, expect, vi, afterEach } from "vitest"; -import { fetchAnalyzerConfig, DEFAULT_CONFIG } from "./config.ts"; - -afterEach(() => { - vi.restoreAllMocks(); -}); - -test("returns parsed config from successful response", async () => { +import { test, expect, vi } from "vitest"; +import { DEFAULT_CONFIG } from "./config.ts"; +import type { ServerApi } from "@query-doctor/core"; +import type { RpcStub } from "capnweb"; + +function makeApi(overrides: Partial> = {}): RpcStub { + return overrides as RpcStub; +} + +async function resolveConfig( + api: RpcStub, + repo: string | undefined, + branch: string, +) { + if (!repo) return DEFAULT_CONFIG; + return api.getRepoConfig(repo, branch).catch(() => DEFAULT_CONFIG); +} + +test("returns config from successful getRepoConfig call", async () => { const config = { minimumCost: 100, regressionThreshold: 0.5, ignoredQueryHashes: ["abc123"], + lastSeenQueryHashes: [], acknowledgedQueryHashes: [], comparisonBranch: undefined, }; - vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(config, { status: 200 }), - ); + const api = makeApi({ getRepoConfig: vi.fn().mockResolvedValue(config) }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, "my/repo", "main"); expect(result).toEqual(config); }); -test("returns defaults when response is not ok", async () => { - vi.spyOn(globalThis, "fetch").mockResolvedValue( - new Response("Not Found", { status: 404 }), - ); +test("returns defaults when getRepoConfig throws", async () => { + const api = makeApi({ + getRepoConfig: vi.fn().mockRejectedValue(new Error("rpc error")), + }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, "my/repo", "main"); expect(result).toEqual(DEFAULT_CONFIG); }); -test("returns defaults when fetch throws", async () => { - vi.spyOn(globalThis, "fetch").mockRejectedValue(new Error("network error")); +test("returns defaults when repo is undefined", async () => { + const api = makeApi({ getRepoConfig: vi.fn() }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, undefined, "main"); expect(result).toEqual(DEFAULT_CONFIG); + expect(api.getRepoConfig).not.toHaveBeenCalled(); }); -test("constructs correct URL with trailing slash stripped", async () => { - const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(DEFAULT_CONFIG, { status: 200 }), - ); - - await fetchAnalyzerConfig("https://api.example.com/", "org/repo"); - expect(mockFetch).toHaveBeenCalledWith( - "https://api.example.com/ci/repos/org%2Frepo/config", - expect.any(Object), - ); -}); - -test("encodes repo name in URL", async () => { - const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(DEFAULT_CONFIG, { status: 200 }), - ); - - await fetchAnalyzerConfig("https://api.example.com", "org/repo with spaces"); - expect(mockFetch).toHaveBeenCalledWith( - "https://api.example.com/ci/repos/org%2Frepo%20with%20spaces/config", - expect.any(Object), - ); -}); - -test("passes through partial response with missing optional fields", async () => { - const partial = { - minimumCost: 50, - regressionThreshold: 0.1, - ignoredQueryHashes: [], - // all required fields present - }; - vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(partial, { status: 200 }), - ); +test("passes repo and branch to getRepoConfig", async () => { + const getRepoConfig = vi.fn().mockResolvedValue(DEFAULT_CONFIG); + const api = makeApi({ getRepoConfig }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); - expect(result.minimumCost).toBe(50); - expect(result.regressionThreshold).toBe(0.1); - expect(result.ignoredQueryHashes).toEqual([]); + await resolveConfig(api, "org/repo", "feat/my-branch"); + expect(getRepoConfig).toHaveBeenCalledWith("org/repo", "feat/my-branch"); }); diff --git a/src/config.ts b/src/config.ts index 85c5dea7..e46adb3f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -12,34 +12,3 @@ export const DEFAULT_CONFIG: AnalyzerConfig = { ignoredQueryHashes: [], acknowledgedQueryHashes: [], }; - -export async function fetchAnalyzerConfig( - endpoint: string, - repo: string, -): Promise { - const url = `${endpoint.replace(/\/$/, "")}/ci/repos/${encodeURIComponent(repo)}/config`; - console.log(`Fetching config from ${url}`); - try { - const response = await fetch(url, { - signal: AbortSignal.timeout(5000), - }); - if (!response.ok) { - console.warn(`Config fetch returned ${response.status}, using defaults`); - return DEFAULT_CONFIG; - } - const data = (await response.json()) as Partial; - console.log( - `Config loaded: minimumCost=${data.minimumCost}, regressionThreshold=${data.regressionThreshold}, ignoredHashes=${data.ignoredQueryHashes?.length ?? 0}, acknowledgedHashes=${data.acknowledgedQueryHashes?.length ?? 0}, comparisonBranch=${data.comparisonBranch ?? "(same branch)"}`, - ); - return { - minimumCost: data.minimumCost ?? 0, - regressionThreshold: data.regressionThreshold ?? 0, - ignoredQueryHashes: data.ignoredQueryHashes ?? [], - acknowledgedQueryHashes: data.acknowledgedQueryHashes ?? [], - comparisonBranch: data.comparisonBranch, - }; - } catch (err) { - console.warn(`Failed to fetch config: ${err}. Using defaults`); - return DEFAULT_CONFIG; - } -} diff --git a/src/main.ts b/src/main.ts index 542f1d14..61c57338 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,10 +12,12 @@ import { postToSiteApi, } from "./reporters/site-api.ts"; import { formatCost, queryPreview } from "./reporters/github/github.ts"; -import { DEFAULT_CONFIG, fetchAnalyzerConfig } from "./config.ts"; +import { DEFAULT_CONFIG, type AnalyzerConfig } from "./config.ts"; import { ApiClient, hookUpApiReporter } from "./remote/api-client.ts"; import { Remote } from "./remote/remote.ts"; import { ConnectionManager } from "./sync/connection-manager.ts"; +import type { RpcStub } from "capnweb"; +import type { ServerApi } from "@query-doctor/core"; const INVALID_TOKEN_ERROR = "Unauthorized" @@ -30,10 +32,27 @@ async function runInCI( const branch = process.env.GITHUB_HEAD_REF || process.env.GITHUB_REF_NAME || ""; - const config = - siteApiEndpoint && repo - ? await fetchAnalyzerConfig(siteApiEndpoint, repo) - : DEFAULT_CONFIG; + const remote = new Remote( + targetPostgresUrl, + ConnectionManager.forLocalDatabase(), + ConnectionManager.forRemoteDatabase(), + { disableQueryLoader: true }, + ); + + if (!env.TOKEN) { + throw new Error("CI mode cannot be run without a TOKEN variable provided") + } + + let api = await ApiClient.connect(siteApiEndpoint, env.TOKEN, { kind: "ci", branch, sha: "" }, remote); + + const config = repo + ? await api.getRepoConfig(repo, branch).catch( + (err) => { + log.warn(`Failed to fetch repo config via RPC: ${err}. Using defaults`, "main"); + return DEFAULT_CONFIG; + }, + ) + : DEFAULT_CONFIG; const runner = await Runner.build({ targetPostgresUrl, @@ -41,6 +60,7 @@ async function runInCI( logPath, maxCost, ignoredQueryHashes: config.ignoredQueryHashes, + remote, }); let allResults: QueryProcessResult[]; let reportContext; diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts index 200d272f..95e173bf 100644 --- a/src/remote/api-client.ts +++ b/src/remote/api-client.ts @@ -1,6 +1,6 @@ import { newWebSocketRpcSession, RpcTarget } from "capnweb"; import type { RpcStub } from "capnweb"; -import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi } from "@query-doctor/core"; +import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi, RecentQuery } from "@query-doctor/core"; import type { ExportedStats } from "@query-doctor/core"; import { PgIdentifier, Statistics } from "@query-doctor/core"; import { log } from "../log.ts"; @@ -33,15 +33,13 @@ export function hookUpApiReporter(api: RpcStub, remote: Remote): () = log.error(`Failed to push stats: ${err}`, "api-client"); }); }; - const onQueriesPolled = (queries: Parameters[0]) => { + const onQueriesPolled = (queries: RecentQuery[]) => { api.pushQuery(JSON.parse(JSON.stringify(queries))).catch((err) => { log.error(`Failed to push polled queries: ${err}`, "api-client"); }); }; const pushOptimizedQuery = (query: OptimizedQuery) => { - console.log('pushing optimization 2', query) const q = [query.toJSON()] - console.log('pushing optimization', q) api.pushQuery(q).catch((err) => { log.error(`Failed to push optimized query: ${err}`, "api-client"); }); @@ -75,12 +73,13 @@ export function hookUpApiReporter(api: RpcStub, remote: Remote): () = export class ApiClient extends RpcTarget implements ClientApi { static #name = "ApiClient" static #PING_INTERVAL_MS = 30_000; - static #PING_MAX_BACKOFF_MS = 300_000; + static #PING_MAX_BACKOFF_MS = 10_000; private constructor(private readonly remote: Remote) { super(); } + static async connect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): Promise> { const wsEndpoint = `${endpoint}/relay`.replace(/^http/, "ws"); const unauthenticated = newWebSocketRpcSession(wsEndpoint); diff --git a/src/runner.ts b/src/runner.ts index 44a07fee..000e68cb 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -39,8 +39,9 @@ export class Runner { maxCost?: number; logPath: string; ignoredQueryHashes?: string[]; + remote?: Remote; }) { - const remote = new Remote( + const remote = options.remote ?? new Remote( options.targetPostgresUrl, ConnectionManager.forLocalDatabase(), ConnectionManager.forRemoteDatabase(), diff --git a/src/sanitize.test.ts b/src/sanitize.test.ts index 7b7e5d91..e349b611 100644 --- a/src/sanitize.test.ts +++ b/src/sanitize.test.ts @@ -1,6 +1,7 @@ import { test, expect, vi } from "vitest"; test("returns original URL when HOSTED is false", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "false"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -9,6 +10,7 @@ test("returns original URL when HOSTED is false", async () => { }); test("returns hashed URL when HOSTED is true", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -21,6 +23,7 @@ test("returns hashed URL when HOSTED is true", async () => { }); test("same input produces same hash", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -29,6 +32,7 @@ test("same input produces same hash", async () => { }); test("different inputs produce different hashes", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); diff --git a/src/test-setup.ts b/src/test-setup.ts new file mode 100644 index 00000000..1543b5db --- /dev/null +++ b/src/test-setup.ts @@ -0,0 +1 @@ +process.env.SOURCE_DATABASE_URL ??= "postgres://localhost/test"; diff --git a/vitest.config.ts b/vitest.config.ts index 6f7984b9..395acdb5 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -4,5 +4,6 @@ export default defineConfig({ test: { include: ["src/**/*.test.ts"], testTimeout: 120_000, + setupFiles: ["src/test-setup.ts"], }, }); From a3e5ae9817d3a072ec0a9ba2962ca80467faf171 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 15 May 2026 12:28:26 +0400 Subject: [PATCH 4/6] chore: bump chore to 0.9.0 --- package-lock.json | 9 +++++---- package.json | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index dd0b3fe8..20098cb7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.8.9", + "@query-doctor/core": "^0.9.0", "async-sema": "^3.1.1", "capnweb": "^0.7.0", "dedent": "^1.7.1", @@ -1125,11 +1125,12 @@ "license": "BSD-3-Clause" }, "node_modules/@query-doctor/core": { - "version": "0.8.9", - "resolved": "https://registry.npmjs.org/@query-doctor/core/-/core-0.8.9.tgz", - "integrity": "sha512-zLTFuM+LCXMUKMlS+h4GbkOFwlSkk521BjBeTct7Ypcojqb4DE9nVliW4zaSxJopQc2wNz+ZqaBcl/QYADQZ+g==", + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/@query-doctor/core/-/core-0.9.0.tgz", + "integrity": "sha512-2v5RRIMsNBypCohZ/cXDGXXNKiSbYkGkDEGTdVN4ceq2X5kM9YjdQcuvsVt41S1EIKPIg6IHtuPISHQt/JN51w==", "dependencies": { "@pgsql/types": "^17.6.2", + "capnweb": "^0.7.0", "colorette": "^2.0.20", "dedent": "^1.7.2", "pgsql-deparser": "^17.17.2", diff --git a/package.json b/package.json index 0a9e8892..c6da182d 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.8.9", + "@query-doctor/core": "^0.9.0", "async-sema": "^3.1.1", "capnweb": "^0.7.0", "dedent": "^1.7.1", From 65c2643db5f269e9cd0a11222ecf2052c131add7 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 15 May 2026 13:41:15 +0400 Subject: [PATCH 5/6] fix: assign null to querySource when extension not found --- src/sync/pg-connector.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index b5a39395..fb3c8e3e 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -475,7 +475,7 @@ ORDER BY `); const firstResult = results[0]; if (!firstResult) { - this.querySource = undefined; + this.querySource = null; throw new ExtensionNotInstalledError([ "pg_stat_statements", "pg_stat_monitor" From 248478efc39d6fbe2bc0a0b4de18c10d32b72c5e Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 15 May 2026 13:41:15 +0400 Subject: [PATCH 6/6] test: add optimization and toJSON to RecentQuery test fixture --- src/remote/query-loader.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/remote/query-loader.test.ts b/src/remote/query-loader.test.ts index c918ad7c..33edca12 100644 --- a/src/remote/query-loader.test.ts +++ b/src/remote/query-loader.test.ts @@ -35,9 +35,13 @@ function createMockRecentQuery(query: string): RecentQuery { nudges: [], hash: "test_hash" as QueryHash, seenAt: Date.now(), + optimization: { state: "waiting" }, withOptimization: function () { return this as OptimizedQuery; }, + toJSON() { + return {}; + }, } as RecentQuery; }