Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 51 additions & 56 deletions src/cli/proxy.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import net from 'node:net';
import { Buffer } from 'node:buffer';
import { mkdtempSync, readdirSync, readFileSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
Expand Down Expand Up @@ -28,28 +27,20 @@ import { ClickHouseFormat } from '../core/types/formats';
const IMAGE = 'clickhouse/clickhouse-server:latest';

/**
* Reserve `n` *distinct* free ports by binding them all simultaneously, then
* releasing. Binding at once guarantees uniqueness (sequential bind+close can
* hand back the same ephemeral port twice). Each test then claims its own port,
* so a lingering socket from one test can't EADDRINUSE the next.
* Fixed candidate listen ports, chosen *below* the OS ephemeral range
* (typically 32768+). Docker publishes container ports from the ephemeral
* range, so picking from there (as OS-assigned free ports do) races with
* testcontainers' own port mappings / forwarder — which is exactly what made
* this suite flaky in CI. Low, explicit ports the OS won't auto-assign avoid
* that; the retry in `startProxy` covers the rare case one is occupied anyway.
*/
async function freePorts(n: number): Promise<number[]> {
const servers = await Promise.all(
Array.from(
{ length: n },
() =>
new Promise<net.Server>((resolve) => {
const s = net.createServer();
s.listen(0, '127.0.0.1', () => resolve(s));
}),
),
);
const ports = servers.map((s) => (s.address() as net.AddressInfo).port);
await Promise.all(servers.map((s) => new Promise<void>((r) => s.close(() => r()))));
return ports;
const CANDIDATE_PORTS = Array.from({ length: 24 }, (_, i) => 14100 + i);

function isAddrInUse(err: unknown): boolean {
return /EADDRINUSE/.test(err instanceof Error ? err.message : String(err));
}

async function waitFor(pred: () => boolean, label: string, timeoutMs = 45000): Promise<void> {
async function waitFor(pred: () => boolean, label: string, timeoutMs = 20000): Promise<void> {
const start = Date.now();
while (!pred()) {
if (Date.now() - start > timeoutMs) throw new Error(`timeout waiting for: ${label}`);
Expand All @@ -63,19 +54,13 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', ()
let container: StartedClickHouseContainer;
let targetEndpoint: string;
let tmp: string;
let portPool: number[] = [];
/** Claim a fresh listen port for a test (distinct, pre-exposed to containers). */
const nextPort = (): number => {
const port = portPool.shift();
if (port === undefined) throw new Error('port pool exhausted — increase the count in beforeAll');
return port;
};
let portCursor = 0;

beforeAll(async () => {
// Reserve a distinct port per test and expose them ALL to containers BEFORE
// starting ClickHouse, so each can resolve host.testcontainers.internal:<port>.
portPool = await freePorts(12);
await TestContainers.exposeHostPorts(...portPool);
// Expose every candidate port to containers BEFORE starting ClickHouse, so
// each can resolve host.testcontainers.internal:<port> and so a retry can
// fall back to any of them.
await TestContainers.exposeHostPorts(...CANDIDATE_PORTS);
container = await new ClickHouseContainer(IMAGE).start();
targetEndpoint = `${container.getHost()}:${container.getPort()}`;
tmp = mkdtempSync(join(tmpdir(), 'chfx-proxy-'));
Expand Down Expand Up @@ -128,15 +113,43 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', ()
return { deps, text, diag, shutdown };
}

/**
* Start `proxyCommand` and wait until it's actually listening. If the chosen
* port is taken (EADDRINUSE), fall back to the next candidate and retry — so a
* transient port clash can't fail the test (and never hangs the full waitFor,
* since a bind failure is detected immediately). `listenForm` controls whether
* --listen is passed as a bare port or `host:port`.
*/
async function startProxy(args: string[], listenForm: 'bare' | 'hostport' = 'bare') {
let lastErr: unknown;
for (let i = 0; i < CANDIDATE_PORTS.length; i++) {
const port = CANDIDATE_PORTS[portCursor++ % CANDIDATE_PORTS.length];
const harness = makeDeps();
const listenArg = listenForm === 'hostport' ? `127.0.0.1:${port}` : String(port);
const pending = proxyCommand(['--listen', listenArg, '--target', targetEndpoint, ...args], harness.deps);
let earlyErr: unknown;
// Capture an early bind rejection so it neither becomes an unhandled
// rejection nor blocks the listening wait below.
pending.catch((err) => {
earlyErr = err;
});
await waitFor(
() => earlyErr !== undefined || harness.diag.some((d) => /listening on/.test(d)),
'proxy listening',
);
if (earlyErr === undefined) return { port, pending, ...harness };
if (!isAddrInUse(earlyErr)) throw earlyErr;
lastErr = earlyErr;
}
throw new Error(`no free candidate port for proxy (last: ${String(lastErr)})`);
}

/**
* Drive a single-shot proxy run with `args`, firing the client once the proxy
* is listening. Resolves with the command's output + captured diagnostics.
*/
async function runOnce(args: string[], sql: string) {
const port = nextPort();
const { deps, text, diag } = makeDeps();
const pending = proxyCommand(['--listen', String(port), '--target', targetEndpoint, ...args], deps);
await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening');
const { port, pending, text, diag } = await startProxy(args);
await clientQuery(sql, port);
const out = await pending;
return { out, text, diag };
Expand Down Expand Up @@ -198,28 +211,16 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', ()
}, 90000);

it('accepts a host:port form for --listen', async () => {
const port = nextPort();
const { deps, diag } = makeDeps();
const pending = proxyCommand(
['--listen', `127.0.0.1:${port}`, '--target', targetEndpoint, '--decode'],
deps,
);
await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening');
const { port, pending, diag } = await startProxy(['--decode'], 'hostport');
await clientQuery('SELECT 1', port);
const out = await pending;
expect(out.stdout).toBe('json');
expect(diag.some((d) => d.includes(`127.0.0.1:${port}`))).toBe(true);
}, 90000);

it('persistent --save-dir: writes one dump per connection until stopped', async () => {
const port = nextPort();
const dir = join(tmp, 'persist-dumps');
const { deps, diag, shutdown } = makeDeps();
const pending = proxyCommand(
['--listen', String(port), '--target', targetEndpoint, '--persistent', '--save-dir', dir],
deps,
);
await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening');
const { port, pending, diag, shutdown } = await startProxy(['--persistent', '--save-dir', dir]);

await clientQuery('SELECT 1 AS a', port);
await clientQuery('SELECT 2 AS b', port);
Expand All @@ -239,13 +240,7 @@ describe('chfx proxy — integration (real ClickHouse + real native client)', ()
}, 120000);

it('persistent --decode: streams one JSON envelope per connection', async () => {
const port = nextPort();
const { deps, text, diag, shutdown } = makeDeps();
const pending = proxyCommand(
['--listen', String(port), '--target', targetEndpoint, '--persistent', '--decode', '--compact'],
deps,
);
await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening');
const { port, pending, text, shutdown } = await startProxy(['--persistent', '--decode', '--compact']);

await clientQuery('SELECT 10 AS x', port);
await clientQuery('SELECT 20 AS y', port);
Expand Down