diff --git a/src/cli/proxy.integration.test.ts b/src/cli/proxy.integration.test.ts index 0d6b5c3..1dab229 100644 --- a/src/cli/proxy.integration.test.ts +++ b/src/cli/proxy.integration.test.ts @@ -1,5 +1,4 @@ import { afterAll, beforeAll, describe, expect, it } from 'vitest'; -import net from 'node:net'; import { Buffer } from 'node:buffer'; import { mkdtempSync, readdirSync, readFileSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; @@ -28,28 +27,20 @@ import { ClickHouseFormat } from '../core/types/formats'; const IMAGE = 'clickhouse/clickhouse-server:latest'; /** - * Reserve `n` *distinct* free ports by binding them all simultaneously, then - * releasing. Binding at once guarantees uniqueness (sequential bind+close can - * hand back the same ephemeral port twice). Each test then claims its own port, - * so a lingering socket from one test can't EADDRINUSE the next. + * Fixed candidate listen ports, chosen *below* the OS ephemeral range + * (typically 32768+). Docker publishes container ports from the ephemeral + * range, so picking from there (as OS-assigned free ports do) races with + * testcontainers' own port mappings / forwarder — which is exactly what made + * this suite flaky in CI. Low, explicit ports the OS won't auto-assign avoid + * that; the retry in `startProxy` covers the rare case one is occupied anyway. */ -async function freePorts(n: number): Promise { - const servers = await Promise.all( - Array.from( - { length: n }, - () => - new Promise((resolve) => { - const s = net.createServer(); - s.listen(0, '127.0.0.1', () => resolve(s)); - }), - ), - ); - const ports = servers.map((s) => (s.address() as net.AddressInfo).port); - await Promise.all(servers.map((s) => new Promise((r) => s.close(() => r())))); - return ports; +const CANDIDATE_PORTS = Array.from({ length: 24 }, (_, i) => 14100 + i); + +function isAddrInUse(err: unknown): boolean { + return /EADDRINUSE/.test(err instanceof Error ? err.message : String(err)); } -async function waitFor(pred: () => boolean, label: string, timeoutMs = 45000): Promise { +async function waitFor(pred: () => boolean, label: string, timeoutMs = 20000): Promise { const start = Date.now(); while (!pred()) { if (Date.now() - start > timeoutMs) throw new Error(`timeout waiting for: ${label}`); @@ -63,19 +54,13 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', () let container: StartedClickHouseContainer; let targetEndpoint: string; let tmp: string; - let portPool: number[] = []; - /** Claim a fresh listen port for a test (distinct, pre-exposed to containers). */ - const nextPort = (): number => { - const port = portPool.shift(); - if (port === undefined) throw new Error('port pool exhausted — increase the count in beforeAll'); - return port; - }; + let portCursor = 0; beforeAll(async () => { - // Reserve a distinct port per test and expose them ALL to containers BEFORE - // starting ClickHouse, so each can resolve host.testcontainers.internal:. - portPool = await freePorts(12); - await TestContainers.exposeHostPorts(...portPool); + // Expose every candidate port to containers BEFORE starting ClickHouse, so + // each can resolve host.testcontainers.internal: and so a retry can + // fall back to any of them. + await TestContainers.exposeHostPorts(...CANDIDATE_PORTS); container = await new ClickHouseContainer(IMAGE).start(); targetEndpoint = `${container.getHost()}:${container.getPort()}`; tmp = mkdtempSync(join(tmpdir(), 'chfx-proxy-')); @@ -128,15 +113,43 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', () return { deps, text, diag, shutdown }; } + /** + * Start `proxyCommand` and wait until it's actually listening. If the chosen + * port is taken (EADDRINUSE), fall back to the next candidate and retry — so a + * transient port clash can't fail the test (and never hangs the full waitFor, + * since a bind failure is detected immediately). `listenForm` controls whether + * --listen is passed as a bare port or `host:port`. + */ + async function startProxy(args: string[], listenForm: 'bare' | 'hostport' = 'bare') { + let lastErr: unknown; + for (let i = 0; i < CANDIDATE_PORTS.length; i++) { + const port = CANDIDATE_PORTS[portCursor++ % CANDIDATE_PORTS.length]; + const harness = makeDeps(); + const listenArg = listenForm === 'hostport' ? `127.0.0.1:${port}` : String(port); + const pending = proxyCommand(['--listen', listenArg, '--target', targetEndpoint, ...args], harness.deps); + let earlyErr: unknown; + // Capture an early bind rejection so it neither becomes an unhandled + // rejection nor blocks the listening wait below. + pending.catch((err) => { + earlyErr = err; + }); + await waitFor( + () => earlyErr !== undefined || harness.diag.some((d) => /listening on/.test(d)), + 'proxy listening', + ); + if (earlyErr === undefined) return { port, pending, ...harness }; + if (!isAddrInUse(earlyErr)) throw earlyErr; + lastErr = earlyErr; + } + throw new Error(`no free candidate port for proxy (last: ${String(lastErr)})`); + } + /** * Drive a single-shot proxy run with `args`, firing the client once the proxy * is listening. Resolves with the command's output + captured diagnostics. */ async function runOnce(args: string[], sql: string) { - const port = nextPort(); - const { deps, text, diag } = makeDeps(); - const pending = proxyCommand(['--listen', String(port), '--target', targetEndpoint, ...args], deps); - await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening'); + const { port, pending, text, diag } = await startProxy(args); await clientQuery(sql, port); const out = await pending; return { out, text, diag }; @@ -198,13 +211,7 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', () }, 90000); it('accepts a host:port form for --listen', async () => { - const port = nextPort(); - const { deps, diag } = makeDeps(); - const pending = proxyCommand( - ['--listen', `127.0.0.1:${port}`, '--target', targetEndpoint, '--decode'], - deps, - ); - await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening'); + const { port, pending, diag } = await startProxy(['--decode'], 'hostport'); await clientQuery('SELECT 1', port); const out = await pending; expect(out.stdout).toBe('json'); @@ -212,14 +219,8 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', () }, 90000); it('persistent --save-dir: writes one dump per connection until stopped', async () => { - const port = nextPort(); const dir = join(tmp, 'persist-dumps'); - const { deps, diag, shutdown } = makeDeps(); - const pending = proxyCommand( - ['--listen', String(port), '--target', targetEndpoint, '--persistent', '--save-dir', dir], - deps, - ); - await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening'); + const { port, pending, diag, shutdown } = await startProxy(['--persistent', '--save-dir', dir]); await clientQuery('SELECT 1 AS a', port); await clientQuery('SELECT 2 AS b', port); @@ -239,13 +240,7 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', () }, 120000); it('persistent --decode: streams one JSON envelope per connection', async () => { - const port = nextPort(); - const { deps, text, diag, shutdown } = makeDeps(); - const pending = proxyCommand( - ['--listen', String(port), '--target', targetEndpoint, '--persistent', '--decode', '--compact'], - deps, - ); - await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening'); + const { port, pending, text, shutdown } = await startProxy(['--persistent', '--decode', '--compact']); await clientQuery('SELECT 10 AS x', port); await clientQuery('SELECT 20 AS y', port);