diff --git a/package-lock.json b/package-lock.json index 67d792c..20098cb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,8 +16,9 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.8.9", + "@query-doctor/core": "^0.9.0", "async-sema": "^3.1.1", + "capnweb": "^0.7.0", "dedent": "^1.7.1", "fast-csv": "^5.0.5", "fastify": "^5.7.4", @@ -1124,11 +1125,12 @@ "license": "BSD-3-Clause" }, "node_modules/@query-doctor/core": { - "version": "0.8.9", - "resolved": "https://registry.npmjs.org/@query-doctor/core/-/core-0.8.9.tgz", - "integrity": "sha512-zLTFuM+LCXMUKMlS+h4GbkOFwlSkk521BjBeTct7Ypcojqb4DE9nVliW4zaSxJopQc2wNz+ZqaBcl/QYADQZ+g==", + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/@query-doctor/core/-/core-0.9.0.tgz", + "integrity": "sha512-2v5RRIMsNBypCohZ/cXDGXXNKiSbYkGkDEGTdVN4ceq2X5kM9YjdQcuvsVt41S1EIKPIg6IHtuPISHQt/JN51w==", "dependencies": { "@pgsql/types": "^17.6.2", + "capnweb": "^0.7.0", "colorette": "^2.0.20", "dedent": "^1.7.2", "pgsql-deparser": "^17.17.2", @@ -2248,6 +2250,12 @@ "node": ">=0.10.0" } }, + "node_modules/capnweb": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/capnweb/-/capnweb-0.7.0.tgz", + "integrity": "sha512-zO7tt5ch2tImacaR/oMd7e1dqi/fWU7hjZdvQMv6Yo3v9uUGA8cPIUQGvfQTu2c+NgyE/j/oDmMaUlf1PXyfJw==", + "license": "MIT" + }, "node_modules/chai": { "version": "6.2.2", "resolved": "https://registry.npmjs.org/chai/-/chai-6.2.2.tgz", diff --git a/package.json b/package.json index 7e4bf3c..c6da182 100644 --- a/package.json +++ b/package.json @@ -21,8 +21,9 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.8.9", + "@query-doctor/core": "^0.9.0", "async-sema": "^3.1.1", + "capnweb": "^0.7.0", "dedent": "^1.7.1", "fast-csv": "^5.0.5", "fastify": "^5.7.4", diff --git a/src/config.test.ts b/src/config.test.ts index 573e9be..b57a888 100644 --- a/src/config.test.ts +++ b/src/config.test.ts @@ -1,79 +1,57 @@ -import { test, expect, vi, afterEach } from "vitest"; -import { fetchAnalyzerConfig, DEFAULT_CONFIG } from "./config.ts"; - -afterEach(() => { - vi.restoreAllMocks(); -}); - -test("returns parsed config from successful response", async () => { +import { test, expect, vi } from "vitest"; +import { DEFAULT_CONFIG } from "./config.ts"; +import type { ServerApi } from "@query-doctor/core"; +import type { RpcStub } from "capnweb"; + +function makeApi(overrides: Partial> = {}): RpcStub { + return overrides as RpcStub; +} + +async function resolveConfig( + api: RpcStub, + repo: string | undefined, + branch: string, +) { + if (!repo) return DEFAULT_CONFIG; + return api.getRepoConfig(repo, branch).catch(() => DEFAULT_CONFIG); +} + +test("returns config from successful getRepoConfig call", async () => { const config = { minimumCost: 100, regressionThreshold: 0.5, ignoredQueryHashes: ["abc123"], + lastSeenQueryHashes: [], acknowledgedQueryHashes: [], comparisonBranch: undefined, }; - vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(config, { status: 200 }), - ); + const api = makeApi({ getRepoConfig: vi.fn().mockResolvedValue(config) }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, "my/repo", "main"); expect(result).toEqual(config); }); -test("returns defaults when response is not ok", async () => { - vi.spyOn(globalThis, "fetch").mockResolvedValue( - new Response("Not Found", { status: 404 }), - ); +test("returns defaults when getRepoConfig throws", async () => { + const api = makeApi({ + getRepoConfig: vi.fn().mockRejectedValue(new Error("rpc error")), + }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, "my/repo", "main"); expect(result).toEqual(DEFAULT_CONFIG); }); -test("returns defaults when fetch throws", async () => { - vi.spyOn(globalThis, "fetch").mockRejectedValue(new Error("network error")); +test("returns defaults when repo is undefined", async () => { + const api = makeApi({ getRepoConfig: vi.fn() }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); + const result = await resolveConfig(api, undefined, "main"); expect(result).toEqual(DEFAULT_CONFIG); + expect(api.getRepoConfig).not.toHaveBeenCalled(); }); -test("constructs correct URL with trailing slash stripped", async () => { - const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(DEFAULT_CONFIG, { status: 200 }), - ); - - await fetchAnalyzerConfig("https://api.example.com/", "org/repo"); - expect(mockFetch).toHaveBeenCalledWith( - "https://api.example.com/ci/repos/org%2Frepo/config", - expect.any(Object), - ); -}); - -test("encodes repo name in URL", async () => { - const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(DEFAULT_CONFIG, { status: 200 }), - ); - - await fetchAnalyzerConfig("https://api.example.com", "org/repo with spaces"); - expect(mockFetch).toHaveBeenCalledWith( - "https://api.example.com/ci/repos/org%2Frepo%20with%20spaces/config", - expect.any(Object), - ); -}); - -test("passes through partial response with missing optional fields", async () => { - const partial = { - minimumCost: 50, - regressionThreshold: 0.1, - ignoredQueryHashes: [], - // all required fields present - }; - vi.spyOn(globalThis, "fetch").mockResolvedValue( - Response.json(partial, { status: 200 }), - ); +test("passes repo and branch to getRepoConfig", async () => { + const getRepoConfig = vi.fn().mockResolvedValue(DEFAULT_CONFIG); + const api = makeApi({ getRepoConfig }); - const result = await fetchAnalyzerConfig("https://api.example.com", "my/repo"); - expect(result.minimumCost).toBe(50); - expect(result.regressionThreshold).toBe(0.1); - expect(result.ignoredQueryHashes).toEqual([]); + await resolveConfig(api, "org/repo", "feat/my-branch"); + expect(getRepoConfig).toHaveBeenCalledWith("org/repo", "feat/my-branch"); }); diff --git a/src/config.ts b/src/config.ts index 85c5dea..e46adb3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -12,34 +12,3 @@ export const DEFAULT_CONFIG: AnalyzerConfig = { ignoredQueryHashes: [], acknowledgedQueryHashes: [], }; - -export async function fetchAnalyzerConfig( - endpoint: string, - repo: string, -): Promise { - const url = `${endpoint.replace(/\/$/, "")}/ci/repos/${encodeURIComponent(repo)}/config`; - console.log(`Fetching config from ${url}`); - try { - const response = await fetch(url, { - signal: AbortSignal.timeout(5000), - }); - if (!response.ok) { - console.warn(`Config fetch returned ${response.status}, using defaults`); - return DEFAULT_CONFIG; - } - const data = (await response.json()) as Partial; - console.log( - `Config loaded: minimumCost=${data.minimumCost}, regressionThreshold=${data.regressionThreshold}, ignoredHashes=${data.ignoredQueryHashes?.length ?? 0}, acknowledgedHashes=${data.acknowledgedQueryHashes?.length ?? 0}, comparisonBranch=${data.comparisonBranch ?? "(same branch)"}`, - ); - return { - minimumCost: data.minimumCost ?? 0, - regressionThreshold: data.regressionThreshold ?? 0, - ignoredQueryHashes: data.ignoredQueryHashes ?? [], - acknowledgedQueryHashes: data.acknowledgedQueryHashes ?? [], - comparisonBranch: data.comparisonBranch, - }; - } catch (err) { - console.warn(`Failed to fetch config: ${err}. Using defaults`); - return DEFAULT_CONFIG; - } -} diff --git a/src/env.ts b/src/env.ts index ac927f1..905dc62 100644 --- a/src/env.ts +++ b/src/env.ts @@ -14,11 +14,15 @@ const envSchema = z.object({ GITHUB_TOKEN: z.string().optional(), LOG_PATH: z.string().optional(), POSTGRES_URL: z.string().optional(), - SOURCE_DATABASE_URL: z.string().optional(), + SOURCE_DATABASE_URL: z.string({ + error: + "SOURCE_DATABASE_URL is required. Set it to the connection string of the database you want to analyze.", + }), DEBUG: z.stringbool().default(false), STATISTICS_PATH: z.string().optional(), - SITE_API_ENDPOINT: z.url().optional(), + SITE_API_ENDPOINT: z.url().default("https://api.querydoctor.com"), + TOKEN: z.string().optional(), GITHUB_REPOSITORY: z.string().optional(), }); diff --git a/src/main.ts b/src/main.ts index f6c089f..61c5733 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,7 +12,14 @@ import { postToSiteApi, } from "./reporters/site-api.ts"; import { formatCost, queryPreview } from "./reporters/github/github.ts"; -import { DEFAULT_CONFIG, fetchAnalyzerConfig } from "./config.ts"; +import { DEFAULT_CONFIG, type AnalyzerConfig } from "./config.ts"; +import { ApiClient, hookUpApiReporter } from "./remote/api-client.ts"; +import { Remote } from "./remote/remote.ts"; +import { ConnectionManager } from "./sync/connection-manager.ts"; +import type { RpcStub } from "capnweb"; +import type { ServerApi } from "@query-doctor/core"; + +const INVALID_TOKEN_ERROR = "Unauthorized" async function runInCI( targetPostgresUrl: Connectable, @@ -25,10 +32,27 @@ async function runInCI( const branch = process.env.GITHUB_HEAD_REF || process.env.GITHUB_REF_NAME || ""; - const config = - siteApiEndpoint && repo - ? await fetchAnalyzerConfig(siteApiEndpoint, repo) - : DEFAULT_CONFIG; + const remote = new Remote( + targetPostgresUrl, + ConnectionManager.forLocalDatabase(), + ConnectionManager.forRemoteDatabase(), + { disableQueryLoader: true }, + ); + + if (!env.TOKEN) { + throw new Error("CI mode cannot be run without a TOKEN variable provided") + } + + let api = await ApiClient.connect(siteApiEndpoint, env.TOKEN, { kind: "ci", branch, sha: "" }, remote); + + const config = repo + ? await api.getRepoConfig(repo, branch).catch( + (err) => { + log.warn(`Failed to fetch repo config via RPC: ${err}. Using defaults`, "main"); + return DEFAULT_CONFIG; + }, + ) + : DEFAULT_CONFIG; const runner = await Runner.build({ targetPostgresUrl, @@ -36,6 +60,7 @@ async function runInCI( logPath, maxCost, ignoredQueryHashes: config.ignoredQueryHashes, + remote, }); let allResults: QueryProcessResult[]; let reportContext; @@ -136,14 +161,25 @@ async function runOutsideCI() { "main", ); if (!env.POSTGRES_URL) { - core.setFailed("POSTGRES_URL environment variable is not set"); - process.exit(1); + throw new Error("POSTGRES_URL environment variable is not set. If you're seeing this inside Docker something has gone wrong"); } + if (!env.TOKEN) { + throw new Error("TOKEN environment variable is not set\nYou probably forgot to pass a `-e TOKEN=...` parameter to the docker container"); + } + const sourceDb = Connectable.fromString(env.SOURCE_DATABASE_URL) + const remote = new Remote( + Connectable.fromString(env.POSTGRES_URL), + ConnectionManager.forLocalDatabase(), + ConnectionManager.forRemoteDatabase(), + { disableQueryLoader: false }, + sourceDb, + ); + ApiClient.connectWithReconnect(env.SITE_API_ENDPOINT, env.TOKEN, { kind: "persistent" }, remote); const server = await createServer( env.HOST, env.PORT, - Connectable.fromString(env.POSTGRES_URL), - env.SOURCE_DATABASE_URL ? Connectable.fromString(env.SOURCE_DATABASE_URL) : undefined, + remote, + sourceDb ); const shutdown = async () => { @@ -162,10 +198,6 @@ async function main() { core.setFailed("POSTGRES_URL environment variable is not set"); process.exit(1); } - if (!env.SOURCE_DATABASE_URL) { - core.setFailed("SOURCE_DATABASE_URL environment variable is not set"); - process.exit(1); - } if (!env.LOG_PATH) { core.setFailed("LOG_PATH environment variable is not set"); process.exit(1); diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts new file mode 100644 index 0000000..95e173b --- /dev/null +++ b/src/remote/api-client.ts @@ -0,0 +1,157 @@ +import { newWebSocketRpcSession, RpcTarget } from "capnweb"; +import type { RpcStub } from "capnweb"; +import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi, RecentQuery } from "@query-doctor/core"; +import type { ExportedStats } from "@query-doctor/core"; +import { PgIdentifier, Statistics } from "@query-doctor/core"; +import { log } from "../log.ts"; +import type { Remote } from "./remote.ts"; +import type { OptimizedQuery } from "../sql/recent-query.ts"; + +export function hookUpApiReporter(api: RpcStub, remote: Remote): () => void { + const onExtensionPresenceChanged = (presence: Parameters[0]) => { + api.setExtensionPresence(presence).catch((err) => { + log.error(`Failed to report extension presence: ${err}`, "api-client"); + }); + }; + const onDumpLog = (line: string) => { + api.log(line, "pg_dump").catch((err) => { + log.error(`Failed to send dump log: ${err}`, "api-client"); + }); + }; + const onRestoreLog = (line: string) => { + api.log(line, "pg_restore").catch((err) => { + log.error(`Failed to send restore log: ${err}`, "api-client"); + }); + }; + const onSchemaSynced = (schema: Parameters[0]) => { + api.pushSchema(JSON.parse(JSON.stringify(schema))).catch((err) => { + log.error(`Failed to push schema: ${err}`, "api-client"); + }); + }; + const onStatsApplied = (stats: Parameters[0]) => { + api.pushStats(stats).catch((err) => { + log.error(`Failed to push stats: ${err}`, "api-client"); + }); + }; + const onQueriesPolled = (queries: RecentQuery[]) => { + api.pushQuery(JSON.parse(JSON.stringify(queries))).catch((err) => { + log.error(`Failed to push polled queries: ${err}`, "api-client"); + }); + }; + const pushOptimizedQuery = (query: OptimizedQuery) => { + const q = [query.toJSON()] + api.pushQuery(q).catch((err) => { + log.error(`Failed to push optimized query: ${err}`, "api-client"); + }); + }; + + remote.on("extensionPresenceChanged", onExtensionPresenceChanged); + remote.on("dumpLog", onDumpLog); + remote.on("restoreLog", onRestoreLog); + remote.on("schemaSynced", onSchemaSynced); + remote.on("statsApplied", onStatsApplied); + remote.on("queriesPolled", onQueriesPolled); + remote.optimizer.on("noImprovements", pushOptimizedQuery); + remote.optimizer.on("improvementsAvailable", pushOptimizedQuery); + remote.optimizer.on("zeroCostPlan", pushOptimizedQuery); + remote.optimizer.on("timeout", pushOptimizedQuery); + + return () => { + remote.off("extensionPresenceChanged", onExtensionPresenceChanged); + remote.off("dumpLog", onDumpLog); + remote.off("restoreLog", onRestoreLog); + remote.off("schemaSynced", onSchemaSynced); + remote.off("statsApplied", onStatsApplied); + remote.off("queriesPolled", onQueriesPolled); + remote.optimizer.off("noImprovements", pushOptimizedQuery); + remote.optimizer.off("improvementsAvailable", pushOptimizedQuery); + remote.optimizer.off("zeroCostPlan", pushOptimizedQuery); + remote.optimizer.off("timeout", pushOptimizedQuery); + }; +} + +export class ApiClient extends RpcTarget implements ClientApi { + static #name = "ApiClient" + static #PING_INTERVAL_MS = 30_000; + static #PING_MAX_BACKOFF_MS = 10_000; + + private constructor(private readonly remote: Remote) { + super(); + } + + + static async connect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): Promise> { + const wsEndpoint = `${endpoint}/relay`.replace(/^http/, "ws"); + const unauthenticated = newWebSocketRpcSession(wsEndpoint); + const api = await unauthenticated.authenticate(token, new this(remote), mode) as unknown as RpcStub; + this.schedulePingTimer(api); + return api; + } + + static connectWithReconnect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): void { + let cleanup: (() => void) | undefined; + const attempt = async (failCount: number) => { + try { + const api = await this.connect(endpoint, token, mode, remote); + log.info(`Connected to the api`, this.#name); + cleanup = hookUpApiReporter(api, remote); + api.onRpcBroken((err) => { + const delay = Math.min(failCount * 1000, this.#PING_MAX_BACKOFF_MS); + log.error(`Connection broken: ${err}, reconnecting in ${delay}ms`, this.#name); + cleanup?.(); + cleanup = undefined; + setTimeout(() => attempt(failCount + 1), delay); + }); + } catch (err) { + if (err instanceof Error && err.message === "Unauthorized") { + log.error(`Invalid TOKEN, cannot connect to the api`, this.#name); + return; + } + const delay = Math.min(failCount * 1000, this.#PING_MAX_BACKOFF_MS); + log.error(`Failed to connect: ${err}, reconnecting in ${delay}ms`, this.#name); + setTimeout(() => attempt(failCount + 1), delay); + } + }; + attempt(0); + } + + static schedulePingTimer(api: RpcStub) { + const timer = setInterval(() => { + api.ping().catch(err => { + console.error(err) + log.error(`Could not ping the API server\n${err}`, this.#name) + clearInterval(timer); + }); + }, this.#PING_INTERVAL_MS); + } + + async repull(): Promise { + await this.remote.resync(); + } + + async refreshQueries(): Promise { + await this.remote.resync(); + } + + async updateStatistics(stats: ExportedStats[]): Promise { + await this.remote.applyStatistics(Statistics.statsModeFromExport(stats)); + } + + async hideIndex(indexName: string): Promise { + this.remote.optimizer.toggleIndex(PgIdentifier.fromString(indexName)); + } + + async addIndex(index: IndexDefinition): Promise { + await this.remote.optimizer.createIndex( + index.tableName, + index.columns.map((c) => ({ + name: c.name, + order: (c.order?.toLowerCase() ?? "asc") as "asc" | "desc", + })), + ); + } + + async runQuery(_query: string): Promise { + log.warn("runQuery is not implemented", ApiClient.name); + } +} diff --git a/src/remote/query-loader.test.ts b/src/remote/query-loader.test.ts index c918ad7..33edca1 100644 --- a/src/remote/query-loader.test.ts +++ b/src/remote/query-loader.test.ts @@ -35,9 +35,13 @@ function createMockRecentQuery(query: string): RecentQuery { nudges: [], hash: "test_hash" as QueryHash, seenAt: Date.now(), + optimization: { state: "waiting" }, withOptimization: function () { return this as OptimizedQuery; }, + toJSON() { + return {}; + }, } as RecentQuery; } diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index c67ad68..12927ba 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -18,6 +18,7 @@ import { PostgresVersion, Statistics, StatisticsMode, + type ExportedStats, } from "@query-doctor/core"; import { Connectable } from "../sync/connectable.ts"; import { parse } from "@libpg-query/parser"; @@ -107,6 +108,10 @@ export class QueryOptimizer extends EventEmitter { return this.target?.statistics.computedStats; } + get ownMetadata(): ExportedStats[] | undefined { + return this.target?.statistics.ownMetadata; + } + getExistingIndexes(): IndexedTable[] { return this.existingIndexes; } diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 212bff0..3cdac44 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -4,6 +4,8 @@ import { PostgresVersion, Statistics, StatisticsMode, + type ExtensionPresence, + type ExportedStats, } from "@query-doctor/core"; import { type Connectable } from "../sync/connectable.ts"; import { DumpCommand, RestoreCommand } from "../sync/schema-link.ts"; @@ -22,6 +24,10 @@ import { SchemaLoader } from "./schema-loader.ts"; type RemoteEvents = { dumpLog: [line: string]; restoreLog: [line: string]; + extensionPresenceChanged: [presence: ExtensionPresence]; + queriesPolled: [queries: RecentQuery[]]; + schemaSynced: [schema: FullSchema]; + statsApplied: [stats: ExportedStats[]]; }; /** @@ -56,6 +62,7 @@ export class Remote extends EventEmitter { */ private baseDbURL: Connectable; private generation = 0; + private lastSourceDb?: Connectable; /** The URL of the current generation optimizing db */ private optimizingDbUDRL: Connectable; @@ -76,7 +83,8 @@ export class Remote extends EventEmitter { /** The manager for ONLY the source db connections */ private readonly sourceManager: ConnectionManager = ConnectionManager .forRemoteDatabase(), - private readonly options: { disableQueryLoader: boolean } = { disableQueryLoader: false } + private readonly options: { disableQueryLoader: boolean } = { disableQueryLoader: false }, + initialSourceDb?: Connectable, ) { super(); this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName); @@ -84,6 +92,16 @@ export class Remote extends EventEmitter { Remote.defaultOptimizingDbPrefix, ); this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL); + if (initialSourceDb) { + this.lastSourceDb = initialSourceDb; + } + } + + async resync(): Promise> { + if (!this.lastSourceDb) { + throw new Error("No source database has been synced yet"); + } + return this.syncFrom(this.lastSourceDb, { type: "pullFromSource" }); } async syncFrom( @@ -105,6 +123,7 @@ export class Remote extends EventEmitter { }; } > { + this.lastSourceDb = source; await this.resetDatabase(); // First batch: get schema and other info in parallel (needed for stats decision) @@ -134,6 +153,7 @@ export class Remote extends EventEmitter { if (fullSchema.status === "fulfilled") { this.schemaLoader?.update(fullSchema.value); + this.emit("schemaSynced", fullSchema.value); } // Second: resolve stats strategy using table list from schema @@ -162,6 +182,7 @@ export class Remote extends EventEmitter { type: "extension_not_installed", extensionName: recentQueries.reason.extensionNames[0], }; + this.emit("extensionPresenceChanged", { type: "missing" }); } await this.onSuccessfulSync( @@ -359,6 +380,10 @@ export class Remote extends EventEmitter { async applyStatistics(statsMode: StatisticsMode): Promise { await this.optimizer.setStatistics(statsMode); + const stats = this.optimizer.ownMetadata; + if (stats) { + this.emit("statsApplied", stats); + } // don't block the reply by awaiting all optimizations this.optimizer.restart(); } @@ -410,8 +435,11 @@ export class Remote extends EventEmitter { console.error(error); }); this.pgStatStatementsStatus = PgStatStatementsStatus.Installed; + this.emit("extensionPresenceChanged", { type: "present", extensionName: "pg_stat_statements", needsRestart: false }); + this.emit("queriesPolled", queries); }).on("pgStatStatementsNotInstalled", () => { this.pgStatStatementsStatus = PgStatStatementsStatus.NotInstalled; + this.emit("extensionPresenceChanged", { type: "missing" }); }); this.queryLoader.on("exit", () => { log.error("Query loader exited", "remote"); diff --git a/src/runner.ts b/src/runner.ts index 44a07fe..000e68c 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -39,8 +39,9 @@ export class Runner { maxCost?: number; logPath: string; ignoredQueryHashes?: string[]; + remote?: Remote; }) { - const remote = new Remote( + const remote = options.remote ?? new Remote( options.targetPostgresUrl, ConnectionManager.forLocalDatabase(), ConnectionManager.forRemoteDatabase(), diff --git a/src/sanitize.test.ts b/src/sanitize.test.ts index 7b7e5d9..e349b61 100644 --- a/src/sanitize.test.ts +++ b/src/sanitize.test.ts @@ -1,6 +1,7 @@ import { test, expect, vi } from "vitest"; test("returns original URL when HOSTED is false", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "false"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -9,6 +10,7 @@ test("returns original URL when HOSTED is false", async () => { }); test("returns hashed URL when HOSTED is true", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -21,6 +23,7 @@ test("returns hashed URL when HOSTED is true", async () => { }); test("same input produces same hash", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); @@ -29,6 +32,7 @@ test("same input produces same hash", async () => { }); test("different inputs produce different hashes", async () => { + vi.stubEnv("SOURCE_DATABASE_URL", "postgres://localhost/test"); vi.stubEnv("HOSTED", "true"); vi.resetModules(); const { sanitizePostgresUrl } = await import("./sanitize.ts"); diff --git a/src/server/http.ts b/src/server/http.ts index 8618ef6..3aa36e3 100644 --- a/src/server/http.ts +++ b/src/server/http.ts @@ -117,7 +117,7 @@ async function onSyncLiveQuery(body: unknown) { export async function createServer( hostname: string, port: number, - targetDb?: Connectable, + targetDb?: Connectable | Remote, sourceDb?: Connectable, ): Promise { const fastify = Fastify({ logger: false }); @@ -148,7 +148,9 @@ export async function createServer( const remoteController = targetDb ? new RemoteController( - new Remote(targetDb, optimizingDbConnectionManager), + targetDb instanceof Remote + ? targetDb + : new Remote(targetDb, optimizingDbConnectionManager), ) : undefined; diff --git a/src/sql/recent-query.ts b/src/sql/recent-query.ts index 39290c0..a17d65f 100644 --- a/src/sql/recent-query.ts +++ b/src/sql/recent-query.ts @@ -39,6 +39,7 @@ export class RecentQuery { readonly isIntrospection: boolean; readonly isTargetlessSelectQuery: boolean; readonly analysisSkipped: boolean; + optimization: LiveQueryOptimization; /** Use {@link RecentQuery.analyze} instead */ constructor( @@ -70,12 +71,39 @@ export class RecentQuery { this.isTargetlessSelectQuery = this.isSelectQuery ? RecentQuery.isTargetlessSelectQuery(tableReferences) : false; + this.optimization = { state: "waiting" }; } withOptimization(optimization: LiveQueryOptimization): OptimizedQuery { return Object.assign(this, { optimization }); } + toJSON() { + // TODO: these fields should be calling toJSON recursively maybe? + return JSON.parse(JSON.stringify({ + formattedQuery: this.formattedQuery, + displayQuery: this.displayQuery, + username: this.username, + query: this.query, + meanTime: this.meanTime, + calls: this.calls, + rows: this.rows, + topLevel: this.topLevel, + isSystemQuery: this.isSystemQuery, + isSelectQuery: this.isSelectQuery, + isIntrospection: this.isIntrospection, + isTargetlessSelectQuery: this.isTargetlessSelectQuery, + analysisSkipped: this.analysisSkipped, + tableReferences: this.tableReferences, + columnReferences: this.columnReferences, + tags: this.tags, + nudges: this.nudges, + hash: this.hash, + seenAt: this.seenAt, + optimization: this.optimization, + })); + } + /** * Queries beyond this size are included in results but skip expensive * prettier.format() and Analyzer.analyze() to avoid OOM from massive diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index efadd5c..fb3c8e3 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -467,8 +467,6 @@ ORDER BY } private async getQuerySource(): Promise { - if (this.querySource) - return this.querySource const results = await this.db.exec<{ schema: string; extension: string }>(` SELECT e.extname as extension, n.nspname as schema FROM pg_extension e @@ -477,6 +475,7 @@ ORDER BY `); const firstResult = results[0]; if (!firstResult) { + this.querySource = null; throw new ExtensionNotInstalledError([ "pg_stat_statements", "pg_stat_monitor" diff --git a/src/test-setup.ts b/src/test-setup.ts new file mode 100644 index 0000000..1543b5d --- /dev/null +++ b/src/test-setup.ts @@ -0,0 +1 @@ +process.env.SOURCE_DATABASE_URL ??= "postgres://localhost/test"; diff --git a/vitest.config.ts b/vitest.config.ts index 6f7984b..395acdb 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -4,5 +4,6 @@ export default defineConfig({ test: { include: ["src/**/*.test.ts"], testTimeout: 120_000, + setupFiles: ["src/test-setup.ts"], }, });