diff --git a/AGENTS.md b/AGENTS.md index 11bf3b6..f9a7b97 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -89,13 +89,15 @@ src/ │ ├── commands/decode.ts# `decode` — decodeBuffer/decodeCaptureStreams + shared buildDecodeEnvelope │ ├── commands/query.ts # `query` — capture (proxy) + decode in one step; --save keeps the dump │ ├── commands/capture.ts# `capture` — capture to .chproto (file or raw stdout); `npm run capture` alias -│ ├── connection.ts # Shared --host/port/user/... resolution + env fallbacks + experimental settings +│ ├── commands/proxy.ts # `proxy` — standalone capturing TCP proxy any native client connects through (single-shot / --persistent) +│ ├── connection.ts # Shared --host/port/user/... resolution + env fallbacks + experimental settings + parseHostPort │ ├── args.ts # Dependency-free arg parser (value + repeatable flags) │ ├── output.ts # CliError, JSON-safe serializer (bigint→string, bytes→hex), CommandOutput │ ├── registry.ts # Command metadata for --help │ ├── version.ts # Build-injected version (esbuild define) │ └── cli.test.ts # Vitest unit + tsx e2e tests (uses fixtures/protocol/*.chproto) -│ # query/capture reuse scripts/native-proxy.mjs (+ native-proxy.d.mts for types) +│ # query/capture/proxy reuse scripts/native-proxy.mjs (+ native-proxy.d.mts for types). +│ # proxy uses startCaptureProxy (fixed listen port, many connections); query/capture use startProxy (one-shot, ephemeral) └── styles/ # CSS files electron/ ├── main.ts # Electron main process (window, IPC handlers) diff --git a/README.md b/README.md index 8412583..2ce72f1 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ decode error. |---------|-------------| | `chfx query --query ""` | Run a query **and decode it** in one step (no file). `--protocol tcp` (default) captures the native packet stream via `clickhouse-client`; `--protocol http` POSTs to ClickHouse HTTP and decodes the `--format` body. `--save ` keeps the `.chproto` dump (tcp). | | `chfx capture --query ""` | Capture a query to a `.chproto` dump only (native protocol). Writes `--out `, or streams raw bytes to stdout (so `chfx capture … \| chfx decode` works). `npm run capture` is an alias. | +| `chfx proxy --listen --target ` | Listen as a capturing TCP proxy that **any** native client connects through (clickhouse-client, Go/JDBC/Python drivers, …). Single-shot by default; `--persistent` serves many connections. See below. | | `chfx decode [file]` | Decode a `.chproto`, Native, or RowBinary dump to JSON. Reads stdin when no file (or `-`) is given. | | `chfx --help` / `chfx --help` | Human-readable help. | | `chfx --version` | Print the version. | @@ -102,6 +103,51 @@ decode error. | `--client ` | Path to `clickhouse-client` (tcp only). Env: `CLICKHOUSE_CLIENT`. | | `--out ` (`capture`) | Where to write the `.chproto` dump. | +### `proxy` — capture any native client + +Unlike `query`/`capture` (which drive `clickhouse-client` for you), `proxy` +just **listens**: it forwards every connection to `--target` and tees the native +packet stream into a capture. Point any native client at the listen address — +the proxy never spawns one itself. Plaintext/uncompressed connections only (TLS +and compressed streams are unsupported, the same constraint as the other native +paths). + +```bash +# Single-shot: capture the next connection, write a dump, exit. +chfx proxy --listen 9100 --target 127.0.0.1:9000 --out cap.chproto +clickhouse-client --port 9100 --query "SELECT 1" # in another shell + +# Single-shot, decoded straight to JSON (no file): +chfx proxy --listen 9100 --target 127.0.0.1:9000 --decode + +# Persistent: serve many connections, one dump per connection, until Ctrl-C. +chfx proxy --listen 9100 --target 127.0.0.1:9000 --persistent --save-dir ./caps +``` + +| Option | Description | +|--------|-------------| +| `--listen <[host:]port>` | Address to listen on (host defaults to `127.0.0.1`). Required. | +| `--target ` | Upstream ClickHouse native endpoint (default port `9000`). Required. | +| `--out ` (`-o`) | **Single-shot** — write the `.chproto` dump here; omit (and no `--decode`) to stream the raw dump to stdout. | +| `--decode` | Decode each capture to a JSON envelope on stdout (instead of writing/streaming the raw dump). | +| `--save-dir ` | **Persistent** — write one `conn-NNNN.chproto` per connection into this directory. | +| `--persistent` / `--once` | Serve until Ctrl-C, or stop after the first connection (default `--once`). | +| `--no-node-bytes` / `--compact` | Same output controls as `decode` (apply when `--decode` is set). | + +Diagnostics (the listen address, per-connection notices) go to **stderr**, so +stdout stays a clean dump or JSON stream. + +Notes: +- A capture completes when the **client closes its connection** (`clickhouse-client` + does after each `--query`). For long-lived/pooled driver connections that stay + open, press **Ctrl-C** — single-shot finalizes the partial capture and exits; + persistent stops and flushes any still-open connection. +- In `--persistent --save-dir`, files are named `conn-0001.chproto`, `conn-0002…` + per run; the counter resets each run, so re-running against the same directory + **overwrites** earlier files. Use a fresh `--save-dir` per run to keep history. +- For `--persistent --decode`, add `--compact` to emit one JSON document per line + (newline-delimited JSON), which is what stream consumers expect. + ### `decode` options | Option | Description | diff --git a/docs/cli-spec.md b/docs/cli-spec.md index c103b32..13286b9 100644 --- a/docs/cli-spec.md +++ b/docs/cli-spec.md @@ -64,20 +64,27 @@ writes the dump; omitted, it streams the raw dump bytes to stdout so **`npm run capture` is a thin alias** to `chfx capture` (the standalone `scripts/capture-native.mjs` was folded in and removed). -#### `chfx proxy` (item 5 — standalone capture proxy) +#### `chfx proxy` (item 5 — standalone capture proxy, implemented) A listener that forwards to a target server and captures the native TCP stream. **Any** native client (clickhouse-client, Go/JDBC/Python drivers, …) connects through it — the proxy does not spawn the client itself. - Configurable lifecycle: - **Default: single-shot** — accept one connection, capture, write a - `.chproto` (and decode if asked), then exit. - - Flags opt into **persistent** mode (long-running, one `.chproto` per - connection to an output dir) and **live decode** (`--decode` streams decoded - JSON per connection to stdout). -- Flags: `--listen`, `--target host:port`, `--out`/`--save-dir`, `--decode`, - `--persistent`/`--once`, plus remote `--user/--password`/TLS where applicable. + `.chproto` (`--out`), decode to JSON (`--decode`), or stream the raw dump to + stdout, then exit. + - `--persistent` opts into long-running mode: one `.chproto` per connection to + `--save-dir`, and/or **live decode** (`--decode` streams a decoded JSON doc + per connection to stdout). Stops on Ctrl-C. +- Flags: `--listen [host:]port`, `--target host:port`, `--out`/`--save-dir`, + `--decode`, `--persistent`/`--once`, `--no-node-bytes`/`--compact`. +- No `--user/--password`: the proxy is transparent, so the client authenticates + end-to-end against the target through the forwarded handshake. - **Plaintext/uncompressed only** (same constraint as today). TLS/compressed - streams are unsupported — error clearly and document it. + streams are unsupported — out of scope (clickhouse-client disables compression + on localhost, which is the intended setup). +- Backed by `startCaptureProxy` in `scripts/native-proxy.mjs` (fixed listen port, + many connections), distinct from `startProxy` (one-shot, ephemeral) used by + `query`/`capture`. #### `--help` Human-readable help (`chfx --help`, `chfx --help`). A standalone diff --git a/scripts/native-proxy.d.mts b/scripts/native-proxy.d.mts index 7270ead..39b7bbc 100644 --- a/scripts/native-proxy.d.mts +++ b/scripts/native-proxy.d.mts @@ -36,6 +36,23 @@ export function startProxy(opts: { listenHost?: string; }): Promise<{ port: number; done: Promise; close: () => void }>; +export interface StartCaptureProxyOptions { + targetHost: string; + targetPort: number; + listenHost?: string; + listenPort?: number; + once?: boolean; + onCapture?: (capture: Capture) => void; + onError?: (err: Error) => void; +} + +export function startCaptureProxy(opts: StartCaptureProxyOptions): Promise<{ + host: string; + port: number; + done: Promise; + close: () => void; +}>; + export function splitStreams(segments: Segment[]): { c2s: Buffer; s2c: Buffer }; export function captureQuery(opts: CaptureQueryOptions): Promise; export function encodeDump(capture: Capture): Buffer; diff --git a/scripts/native-proxy.mjs b/scripts/native-proxy.mjs index cd5b6db..a96d971 100644 --- a/scripts/native-proxy.mjs +++ b/scripts/native-proxy.mjs @@ -132,6 +132,146 @@ export function startProxy({ targetHost, targetPort, listenHost = '127.0.0.1' }) }); } +/** + * Start a capturing proxy that forwards every accepted connection to + * (targetHost, targetPort) and invokes onCapture(capture) once that connection + * closes. Unlike startProxy (single-shot, ephemeral port, resolves with the raw + * segment log), this binds a caller-chosen address and can serve many + * connections — it backs `chfx proxy`, where an external native client connects + * through it. In `once` mode it stops itself after the first completed + * connection; otherwise it runs until close() is called. close() (and `once`'s + * self-stop) flush a partial capture for any connection still open, so a client + * that holds its connection alive (pooled drivers, idle sessions) is captured + * up to the point of shutdown rather than dropped. + * + * @param {object} opts + * @param {string} opts.targetHost + * @param {number} opts.targetPort + * @param {string} [opts.listenHost] + * @param {number} [opts.listenPort] default 0 (ephemeral) + * @param {boolean} [opts.once] stop after the first completed connection + * @param {(capture: Capture) => void} [opts.onCapture] + * @param {(err: Error) => void} [opts.onError] + * @returns {Promise<{ host: string, port: number, done: Promise, close: () => void }>} + */ +export function startCaptureProxy({ + targetHost, + targetPort, + listenHost = '127.0.0.1', + listenPort = 0, + once = false, + onCapture, + onError, +}) { + return new Promise((resolve, reject) => { + /** @type {() => void} */ + let resolveDone; + const done = new Promise((res) => { resolveDone = res; }); + let settled = false; + let connectionCount = 0; + /** @type {Set} */ + const sockets = new Set(); + // report() callbacks for connections that haven't emitted a capture yet, so + // close() can flush a partial capture (e.g. Ctrl-C while a connection is + // still open, or a long-lived pooled client) instead of dropping it. + /** @type {Set<() => void>} */ + const reporters = new Set(); + + const finish = () => { + if (settled) return; + settled = true; + try { server.close(); } catch { /* ignore */ } + // Flush whatever each still-open connection captured before tearing down. + for (const report of [...reporters]) report(); + for (const s of sockets) s.destroy(); + resolveDone(); + }; + + const server = net.createServer((client) => { + const id = ++connectionCount; + sockets.add(client); + const upstream = net.connect(targetPort, targetHost); + sockets.add(upstream); + + /** @type {Segment[]} */ + const segments = []; + let openEnds = 2; + let reported = false; + + const report = () => { + if (reported) return; + reported = true; + reporters.delete(report); + const { c2s, s2c } = splitStreams(segments); + /** @type {Capture} */ + const capture = { + c2s, + s2c, + segments, + meta: { source: 'proxy', target: `${targetHost}:${targetPort}`, connection: id }, + }; + try { + onCapture?.(capture); + } catch (err) { + onError?.(/** @type {Error} */ (err)); + } + if (once) finish(); + }; + reporters.add(report); + + const closeOne = () => { + openEnds -= 1; + if (openEnds === 0) report(); + }; + + // Forward with backpressure so a slow consumer can't make us buffer the + // whole stream in the socket layer (we already retain it in `segments`). + client.on('data', (chunk) => { + segments.push({ dir: DIR_C2S, data: Buffer.from(chunk) }); + if (upstream.write(chunk) === false) client.pause(); + }); + upstream.on('drain', () => client.resume()); + upstream.on('data', (chunk) => { + segments.push({ dir: DIR_S2C, data: Buffer.from(chunk) }); + if (client.write(chunk) === false) upstream.pause(); + }); + client.on('drain', () => upstream.resume()); + client.on('end', () => upstream.end()); + upstream.on('end', () => client.end()); + client.on('close', () => { sockets.delete(client); closeOne(); }); + upstream.on('close', () => { sockets.delete(upstream); closeOne(); }); + + const fail = (/** @type {Error} */ err) => { + onError?.(err); + // Tear both ends down; their 'close' events drive report()/closeOne(), + // so a failed upstream connect can't leave `once` mode hanging. + client.destroy(); + upstream.destroy(); + }; + client.on('error', fail); + upstream.on('error', fail); + }); + + // Until we're listening, an error is a startup/bind failure → reject the + // returned promise. Once listening (promise already resolved), reject is a + // no-op, so route later server errors to onError instead of dropping them. + let listening = false; + server.on('error', (err) => { + if (!listening) reject(err); + else onError?.(/** @type {Error} */ (err)); + }); + server.listen(listenPort, listenHost, () => { + const addr = server.address(); + if (addr === null || typeof addr === 'string') { + reject(new Error('proxy failed to bind a port')); + return; + } + listening = true; + resolve({ host: listenHost, port: addr.port, done, close: finish }); + }); + }); +} + /** * Split an ordered segment log into the two concatenated per-direction streams. * @param {Segment[]} segments diff --git a/src/cli/cli.test.ts b/src/cli/cli.test.ts index 2bd4550..6aa7f6e 100644 --- a/src/cli/cli.test.ts +++ b/src/cli/cli.test.ts @@ -4,16 +4,18 @@ import { fileURLToPath } from 'node:url'; import { execFileSync, execSync } from 'node:child_process'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; +import net from 'node:net'; import { decodeBuffer, decodeCommand } from './commands/decode'; import { queryCommand } from './commands/query'; import { captureCommand } from './commands/capture'; -import { resolveCaptureOptions, resolveHttpConnection } from './connection'; +import { proxyCommand, type ProxyDeps } from './commands/proxy'; +import { resolveCaptureOptions, resolveHttpConnection, parseHostPort } from './connection'; import { parseArgs, stringOption, boolOption, arrayOption } from './args'; import { stringify, CliError } from './output'; import { ClickHouseFormat } from '../core/types/formats'; import { parseChprotoDump } from '../core/decoder/protocol-dump'; -import { captureQuery } from '../../scripts/native-proxy.mjs'; +import { captureQuery, startCaptureProxy, encodeDump, type Capture } from '../../scripts/native-proxy.mjs'; /** Run `fn` with env vars temporarily set, restoring prior values after. */ function withEnv(vars: Record, fn: () => void): void { @@ -620,3 +622,252 @@ describe('end-to-end via tsx (entry, stdin, exit codes)', () => { } }, 30000); }); + +describe('parseHostPort', () => { + it('parses bare port, host:port, and bare host (with default port)', () => { + expect(parseHostPort('9000', { flag: 'listen' })).toEqual({ host: '127.0.0.1', port: 9000 }); + expect(parseHostPort('0.0.0.0:9100', { flag: 'listen' })).toEqual({ host: '0.0.0.0', port: 9100 }); + expect(parseHostPort('db.internal', { flag: 'target', defaultPort: 9000 })).toEqual({ host: 'db.internal', port: 9000 }); + expect(parseHostPort(':9000', { flag: 'listen' })).toEqual({ host: '127.0.0.1', port: 9000 }); + }); + + it('requires a port when no default and rejects out-of-range ports', () => { + expect(() => parseHostPort('myhost', { flag: 'listen' })).toThrow(/needs a port/); + expect(() => parseHostPort('host:0', { flag: 'listen' })).toThrow(/between 1 and 65535/); + expect(() => parseHostPort('99999', { flag: 'listen' })).toThrow(/between 1 and 65535/); + expect(() => parseHostPort('host:nope', { flag: 'target', defaultPort: 9000 })).toThrow(/between 1 and 65535/); + }); + + it('parses bracketed and bare IPv6 addresses', () => { + expect(parseHostPort('[::1]:9000', { flag: 'listen' })).toEqual({ host: '::1', port: 9000 }); + expect(parseHostPort('[2001:db8::1]:9100', { flag: 'target', defaultPort: 9000 })).toEqual({ host: '2001:db8::1', port: 9100 }); + expect(parseHostPort('::1', { flag: 'target', defaultPort: 9000 })).toEqual({ host: '::1', port: 9000 }); + expect(() => parseHostPort('::1', { flag: 'listen' })).toThrow(/needs a port/); + expect(() => parseHostPort('[::1', { flag: 'listen' })).toThrow(/unterminated IPv6/); + }); +}); + +/** A proxy capture from a real fixture, stamped with proxy meta (connection id). */ +function fakeProxyCapture(name: string, connection: number): Capture { + const c = fakeCaptureOf(name); + return { ...c, meta: { ...c.meta, source: 'proxy', target: '127.0.0.1:9000', connection } }; +} + +/** Injectable ProxyDeps that fire the given captures then resolve done. */ +function proxyHarness(captures: Capture[]) { + const text: string[] = []; + const diag: string[] = []; + const files: Record = {}; + const dirs: string[] = []; + const state: { listenOpts?: Record; shutdown?: () => void } = {}; + const deps: Partial = { + startCaptureProxy: async (opts) => { + state.listenOpts = opts as unknown as Record; + let resolveDone!: () => void; + const done = new Promise((r) => { + resolveDone = r; + }); + // Fire captures on a microtask, then resolve done (simulates the server + // closing — in `once` mode after the first, in persistent mode on Ctrl-C). + queueMicrotask(() => { + for (const c of captures) opts.onCapture?.(c); + resolveDone(); + }); + return { host: opts.listenHost ?? '127.0.0.1', port: opts.listenPort || 0, done, close: () => resolveDone() }; + }, + writeText: (t) => text.push(t), + writeDiag: (t) => diag.push(t), + writeFile: async (f, b) => { + files[f] = b; + }, + ensureDir: async (dir) => { + dirs.push(dir); + }, + registerShutdown: (h) => { + state.shutdown = h; + }, + }; + return { deps, text, diag, files, dirs, state }; +} + +describe('proxy — single-shot (injected server)', () => { + it('streams the raw .chproto dump to stdout when neither --out nor --decode is given', async () => { + const cap = fakeProxyCapture(fixtures[0], 1); + const h = proxyHarness([cap]); + const out = await proxyCommand(['--listen', '9000', '--target', '127.0.0.1:9000'], h.deps); + expect(out.stdout).toBe('raw'); + // The raw dump is returned as the command output (index.ts writes it). + const bytes = (out as { bytes: Uint8Array }).bytes; + const parsed = parseChprotoDump(bytes); + expect(parsed.meta?.connection).toBe(1); + expect(h.state.listenOpts).toMatchObject({ listenPort: 9000, targetHost: '127.0.0.1', targetPort: 9000, once: true }); + }); + + it('--decode emits the shared decode envelope with proxy source', async () => { + const cap = fakeProxyCapture(fixtures[0], 1); + const h = proxyHarness([cap]); + const out = await proxyCommand(['--listen', '9000', '--target', '9000', '--decode'], h.deps); + expect(out.stdout).toBe('json'); + const data = (out as { data: Record }).data; + expect(data.format).toBe(ClickHouseFormat.NativeProtocol); + expect((data.chfx as Record).command).toBe('proxy'); + expect(data.source).toMatchObject({ kind: 'proxy', connection: 1 }); + }); + + it('--out writes the dump file and returns a JSON summary', async () => { + const cap = fakeProxyCapture(fixtures[0], 1); + const h = proxyHarness([cap]); + const out = await proxyCommand(['--listen', '9000', '--target', '9000', '-o', '/tmp/cap.chproto'], h.deps); + expect(out.stdout).toBe('json'); + expect(h.files['/tmp/cap.chproto']).toBeInstanceOf(Uint8Array); + const data = (out as { data: Record }).data; + expect(data.saved).toBe('/tmp/cap.chproto'); + expect(data.segments).toBe(cap.segments.length); + }); + + it('errors when no connection was captured', async () => { + const h = proxyHarness([]); // fires nothing + await expect(proxyCommand(['--listen', '9000', '--target', '9000'], h.deps)).rejects.toThrow(/no connection captured/); + }); +}); + +describe('proxy — persistent (injected server)', () => { + it('--persistent --save-dir writes one dump per connection and a stop summary', async () => { + const caps = [fakeProxyCapture(fixtures[0], 1), fakeProxyCapture(fixtures[0], 2)]; + const h = proxyHarness(caps); + const out = await proxyCommand(['--listen', '9000', '--target', '9000', '--persistent', '--save-dir', '/tmp/caps'], h.deps); + expect(out.stdout).toBe('none'); + expect(h.dirs).toContain('/tmp/caps'); + expect(Object.keys(h.files).sort()).toEqual(['/tmp/caps/conn-0001.chproto', '/tmp/caps/conn-0002.chproto']); + expect(h.diag.some((d) => /stopped after 2 connection/.test(d))).toBe(true); + expect(typeof h.state.shutdown).toBe('function'); + }); + + it('--persistent --decode streams a JSON doc per connection to stdout', async () => { + const caps = [fakeProxyCapture(fixtures[0], 1), fakeProxyCapture(fixtures[0], 2)]; + const h = proxyHarness(caps); + const out = await proxyCommand(['--listen', '9000', '--target', '9000', '--persistent', '--decode', '--compact'], h.deps); + expect(out.stdout).toBe('none'); + expect(h.text).toHaveLength(2); + for (const doc of h.text) { + const parsed = JSON.parse(doc) as Record; + expect((parsed.chfx as Record).command).toBe('proxy'); + } + }); +}); + +describe('proxy — usage errors', () => { + const cases: Array<[string, string[], RegExp]> = [ + ['missing --listen', ['--target', '9000'], /--listen is required/], + ['missing --target', ['--listen', '9000'], /--target is required/], + ['--persistent + --once', ['--listen', '9000', '--target', '9000', '--persistent', '--once'], /mutually exclusive/], + ['--persistent without a sink', ['--listen', '9000', '--target', '9000', '--persistent'], /needs an output sink/], + ['--out in persistent mode', ['--listen', '9000', '--target', '9000', '--persistent', '--save-dir', 'd', '-o', 'f'], /single-shot only/], + ['--save-dir in once mode', ['--listen', '9000', '--target', '9000', '--save-dir', 'd'], /for --persistent mode/], + ['unknown flag', ['--listen', '9000', '--target', '9000', '--frob'], /unknown option/], + ]; + for (const [name, argv, re] of cases) { + it(`rejects ${name}`, async () => { + await expect(proxyCommand(argv, proxyHarness([]).deps)).rejects.toThrow(re); + }); + } +}); + +describe('startCaptureProxy — real sockets', () => { + it('forwards both directions and captures the stream', async () => { + // Upstream sends a greeting on connect, then echoes whatever it receives. + const upstream = net.createServer((sock) => { + sock.write(Buffer.from([0xaa, 0xbb])); + sock.on('data', (d) => sock.write(d)); + sock.on('end', () => sock.end()); + }); + await new Promise((resolve) => upstream.listen(0, '127.0.0.1', resolve)); + const upPort = (upstream.address() as net.AddressInfo).port; + + let captured: Capture | undefined; + const proxy = await startCaptureProxy({ + targetHost: '127.0.0.1', + targetPort: upPort, + once: true, + onCapture: (c) => { + captured = c; + }, + }); + + await new Promise((resolve, reject) => { + const client = net.connect(proxy.port, proxy.host, () => client.write(Buffer.from([0x01, 0x02, 0x03]))); + let received = 0; + client.on('data', (d) => { + received += d.length; + if (received >= 5) client.end(); // greeting(2) + echo(3) + }); + client.on('error', reject); + client.on('close', () => resolve()); + }); + + await proxy.done; + upstream.close(); + + expect(captured).toBeDefined(); + expect([...captured!.c2s]).toEqual([0x01, 0x02, 0x03]); + expect([...captured!.s2c]).toEqual([0xaa, 0xbb, 0x01, 0x02, 0x03]); + expect(captured!.meta).toMatchObject({ source: 'proxy', connection: 1, target: `127.0.0.1:${upPort}` }); + // A round-tripped dump preserves the captured streams. + const round = parseChprotoDump(new Uint8Array(encodeDump(captured!))); + expect([...round.c2s]).toEqual([0x01, 0x02, 0x03]); + }, 15000); + + it('does not hang in `once` mode when the upstream refuses the connection', async () => { + // Bind+close a port to get one that is (almost certainly) closed. + const tmp = net.createServer(); + await new Promise((resolve) => tmp.listen(0, '127.0.0.1', resolve)); + const deadPort = (tmp.address() as net.AddressInfo).port; + await new Promise((resolve) => tmp.close(() => resolve())); + + const errors: Error[] = []; + const proxy = await startCaptureProxy({ + targetHost: '127.0.0.1', + targetPort: deadPort, + once: true, + onError: (e) => errors.push(e), + }); + + const client = net.connect(proxy.port, proxy.host, () => client.write(Buffer.from([0x01]))); + client.on('error', () => {}); + + // `done` must resolve even though the upstream connect failed. + await proxy.done; + proxy.close(); + expect(errors.length).toBeGreaterThan(0); + }, 15000); + + it('flushes a partial capture when closed while a connection is still open', async () => { + // Upstream that accepts and stays open (never closes the connection). + const upstream = net.createServer((sock) => sock.on('data', () => {})); + await new Promise((resolve) => upstream.listen(0, '127.0.0.1', resolve)); + const upPort = (upstream.address() as net.AddressInfo).port; + + let captured: Capture | undefined; + const proxy = await startCaptureProxy({ + targetHost: '127.0.0.1', + targetPort: upPort, + once: true, + onCapture: (c) => { + captured = c; + }, + }); + + const client = net.connect(proxy.port, proxy.host, () => client.write(Buffer.from([0x09, 0x08, 0x07]))); + client.on('error', () => {}); + // Wait until the bytes have traversed the proxy, with the connection still open. + await new Promise((resolve) => setTimeout(resolve, 150)); + + proxy.close(); // simulate Ctrl-C: must flush the in-flight capture, not drop it. + await proxy.done; + client.destroy(); + upstream.close(); + + expect(captured).toBeDefined(); + expect([...captured!.c2s]).toEqual([0x09, 0x08, 0x07]); + }, 15000); +}); diff --git a/src/cli/commands/proxy.ts b/src/cli/commands/proxy.ts new file mode 100644 index 0000000..0890d3d --- /dev/null +++ b/src/cli/commands/proxy.ts @@ -0,0 +1,263 @@ +import process from 'node:process'; +import { mkdir, writeFile } from 'node:fs/promises'; +import path from 'node:path'; + +import { + startCaptureProxy as defaultStartCaptureProxy, + encodeDump, + type Capture, + type StartCaptureProxyOptions, +} from '../../../scripts/native-proxy.mjs'; + +import { parseArgs, stringOption, boolOption, rejectUnknownArgs } from '../args'; +import { CliError, stringify, writeStderr, type CommandOutput } from '../output'; +import { CHFX_VERSION, CLI_SCHEMA_VERSION } from '../version'; +import { parseHostPort } from '../connection'; +import { decodeCaptureStreams, buildDecodeEnvelope, type DecodeResult } from './decode'; + +export interface ProxyServerHandle { + host: string; + port: number; + done: Promise; + close: () => void; +} + +export interface ProxyDeps { + startCaptureProxy: (opts: StartCaptureProxyOptions) => Promise; + /** Write one decoded JSON document to stdout (persistent --decode mode). */ + writeText: (text: string) => void; + /** Write a diagnostic line to stderr (keeps stdout clean for dumps/JSON). */ + writeDiag: (text: string) => void; + writeFile: (file: string, bytes: Uint8Array) => Promise; + ensureDir: (dir: string) => Promise; + /** Register a shutdown handler (SIGINT/SIGTERM) for persistent mode. */ + registerShutdown: (handler: () => void) => void; +} + +const DEFAULT_DEPS: ProxyDeps = { + startCaptureProxy: defaultStartCaptureProxy, + writeText: (text) => process.stdout.write(text.endsWith('\n') ? text : `${text}\n`), + writeDiag: writeStderr, + writeFile: async (file, bytes) => { + await writeFile(file, bytes); + }, + ensureDir: async (dir) => { + await mkdir(dir, { recursive: true }); + }, + registerShutdown: (handler) => { + process.once('SIGINT', handler); + process.once('SIGTERM', handler); + }, +}; + +interface ProxyConfig { + listen: { host: string; port: number }; + target: { host: string; port: number }; + decode: boolean; + includeNodeBytes: boolean; + compact: boolean; + out?: string; + saveDir?: string; +} + +const ALLOWED = ['listen', 'target', 'out', 'save-dir', 'decode', 'persistent', 'once', 'compact', 'no-node-bytes']; + +/** + * Run a standalone capturing proxy. It listens on `--listen`, forwards every + * connection to `--target`, and records the native packet stream — any native + * client (clickhouse-client, Go/JDBC/Python drivers, …) connects through it; the + * proxy never spawns a client itself. + * + * - Default **single-shot**: capture the first connection, then write a + * `.chproto` (`--out`), or decode it to JSON (`--decode`), or stream the raw + * dump to stdout, and exit. + * - **`--persistent`**: keep serving; write one `.chproto` per connection to + * `--save-dir` and/or stream decoded JSON per connection (`--decode`). Stops + * on Ctrl-C. + * + * Plaintext/uncompressed streams only (same constraint as `capture`/`query`); + * TLS and compressed connections are unsupported. + */ +export async function proxyCommand(rest: string[], deps: Partial = {}): Promise { + const d: ProxyDeps = { ...DEFAULT_DEPS, ...deps }; + const args = parseArgs(rest, { + valueFlags: ['listen', 'target', 'out', 'save-dir'], + aliases: { o: 'out' }, + }); + rejectUnknownArgs(args, ALLOWED); + + const listenRaw = stringOption(args, 'listen'); + if (!listenRaw) throw new CliError('usage', '--listen is required, e.g. --listen 9000 or --listen 0.0.0.0:9000'); + const targetRaw = stringOption(args, 'target'); + if (!targetRaw) throw new CliError('usage', '--target is required, e.g. --target 127.0.0.1:9000'); + + const persistent = boolOption(args, 'persistent'); + const once = boolOption(args, 'once'); + if (persistent && once) throw new CliError('usage', '--persistent and --once are mutually exclusive'); + + const out = stringOption(args, 'out'); + const saveDir = stringOption(args, 'save-dir'); + const config: ProxyConfig = { + listen: parseHostPort(listenRaw, { flag: 'listen' }), + target: parseHostPort(targetRaw, { flag: 'target', defaultPort: 9000 }), + decode: boolOption(args, 'decode'), + includeNodeBytes: !boolOption(args, 'no-node-bytes'), + compact: boolOption(args, 'compact'), + out, + saveDir, + }; + + if (persistent) { + if (out) throw new CliError('usage', '--out is single-shot only; use --save-dir in --persistent mode'); + if (!saveDir && !config.decode) { + throw new CliError('usage', '--persistent needs an output sink: pass --save-dir and/or --decode'); + } + return runPersistent(d, config); + } + + if (saveDir) throw new CliError('usage', '--save-dir is for --persistent mode; use --out for single-shot'); + return runOnce(d, config); +} + +/** Decode a captured connection into the shared decode envelope. */ +function captureToEnvelope(capture: Capture, config: ProxyConfig): Record { + const result: DecodeResult = { + ...decodeCaptureStreams(capture.c2s, capture.s2c, capture.meta), + formatDetected: true, + }; + const source: Record = { + kind: 'proxy', + target: `${config.target.host}:${config.target.port}`, + connection: capture.meta.connection, + }; + return buildDecodeEnvelope(result, source, { command: 'proxy', includeNodeBytes: config.includeNodeBytes }); +} + +async function startServer( + d: ProxyDeps, + config: ProxyConfig, + handlers: Pick, +): Promise { + try { + return await d.startCaptureProxy({ + targetHost: config.target.host, + targetPort: config.target.port, + listenHost: config.listen.host, + listenPort: config.listen.port, + ...handlers, + }); + } catch (err) { + throw new CliError('io', `cannot listen on ${config.listen.host}:${config.listen.port}: ${(err as Error).message}`); + } +} + +async function runOnce(d: ProxyDeps, config: ProxyConfig): Promise { + let captured: Capture | undefined; + const errors: Error[] = []; + + const server = await startServer(d, config, { + once: true, + onCapture: (c) => { + captured = c; + }, + onError: (e) => errors.push(e), + }); + + d.writeDiag( + `chfx proxy: listening on ${server.host}:${server.port} → ${config.target.host}:${config.target.port} ` + + `(single-shot). Point your ClickHouse client at it; Ctrl-C finalizes a still-open connection.`, + ); + // Ctrl-C closes the proxy, which flushes a partial capture for a connection + // that is still open (e.g. a pooled driver that never closes its socket). + d.registerShutdown(() => server.close()); + await server.done; + + if (!captured) { + throw new CliError('io', `no connection captured${errors.length ? `: ${errors[0].message}` : ''}`); + } + + const dump = encodeDump(captured); + if (config.out) { + try { + await d.writeFile(config.out, new Uint8Array(dump)); + } catch (err) { + throw new CliError('io', `cannot write --out file: ${config.out}`, { cause: (err as Error).message }); + } + d.writeDiag(`chfx proxy: captured connection → ${config.out} (${dump.length} bytes)`); + } + + if (config.decode) { + return { stdout: 'json', data: captureToEnvelope(captured, config), compact: config.compact }; + } + if (config.out) { + const data = { + chfx: { tool: 'chfx', version: CHFX_VERSION, schemaVersion: CLI_SCHEMA_VERSION, command: 'proxy' }, + target: `${config.target.host}:${config.target.port}`, + saved: config.out, + bytes: dump.length, + c2sBytes: captured.c2s.length, + s2cBytes: captured.s2c.length, + segments: captured.segments.length, + }; + return { stdout: 'json', data, compact: config.compact }; + } + // No --out and no --decode: stream the raw dump so `chfx proxy … | chfx decode` works. + return { stdout: 'raw', bytes: new Uint8Array(dump) }; +} + +async function runPersistent(d: ProxyDeps, config: ProxyConfig): Promise { + if (config.saveDir) await d.ensureDir(config.saveDir); + + let count = 0; + // Track only in-flight handlers; each removes itself on settlement so the set + // stays bounded no matter how many connections a long-running proxy serves. + const pending = new Set>(); + + const handle = await startServer(d, config, { + once: false, + onCapture: (capture) => { + count += 1; + const task = handleConnection(d, capture, config); + pending.add(task); + void task.finally(() => pending.delete(task)); + }, + onError: (e) => d.writeDiag(`chfx proxy: connection error: ${e.message}`), + }); + + d.writeDiag( + `chfx proxy: listening on ${handle.host}:${handle.port} → ${config.target.host}:${config.target.port} ` + + `(persistent). Ctrl-C to stop.`, + ); + d.registerShutdown(() => handle.close()); + + await handle.done; + await Promise.allSettled([...pending]); + d.writeDiag(`chfx proxy: stopped after ${count} connection(s).`); + return { stdout: 'none' }; +} + +/** Persist and/or decode one captured connection (used in persistent mode). */ +async function handleConnection(d: ProxyDeps, capture: Capture, config: ProxyConfig): Promise { + const id = typeof capture.meta.connection === 'number' ? capture.meta.connection : 0; + const dump = encodeDump(capture); + + if (config.saveDir) { + const file = path.join(config.saveDir, `conn-${String(id).padStart(4, '0')}.chproto`); + try { + await d.writeFile(file, new Uint8Array(dump)); + d.writeDiag(`chfx proxy: connection ${id} → ${file} (${dump.length} bytes)`); + } catch (err) { + d.writeDiag(`chfx proxy: connection ${id}: cannot write ${file}: ${(err as Error).message}`); + } + } else { + d.writeDiag(`chfx proxy: connection ${id} captured (${dump.length} bytes)`); + } + + if (config.decode) { + try { + d.writeText(stringify(captureToEnvelope(capture, config), config.compact)); + } catch (err) { + d.writeDiag(`chfx proxy: connection ${id}: decode failed: ${(err as Error).message}`); + } + } +} diff --git a/src/cli/connection.ts b/src/cli/connection.ts index 5955299..344d3ba 100644 --- a/src/cli/connection.ts +++ b/src/cli/connection.ts @@ -65,6 +65,57 @@ export function resolveQueryBase(args: ParsedArgs): QueryBase { }; } +/** + * Parse a `[host:]port` (or bare `port`) endpoint, used by `chfx proxy` for + * --listen / --target. A leading host is optional (defaults to `defaultHost`); + * the port may default via `defaultPort`, otherwise it is required. `flag` only + * shapes error messages. + */ +export function parseHostPort( + raw: string, + opts: { flag: string; defaultHost?: string; defaultPort?: number }, +): { host: string; port: number } { + const { flag, defaultHost = '127.0.0.1', defaultPort } = opts; + let host = defaultHost; + let portRaw: string | undefined; + + if (raw.startsWith('[')) { + // Bracketed IPv6: [::1] or [::1]:9000 + const end = raw.indexOf(']'); + if (end === -1) throw new CliError('usage', `--${flag}: unterminated IPv6 address: ${raw}`); + host = raw.slice(1, end); + const rest = raw.slice(end + 1); + if (rest.startsWith(':')) portRaw = rest.slice(1); + else if (rest !== '') throw new CliError('usage', `--${flag}: unexpected text after IPv6 address: ${raw}`); + } else { + const firstColon = raw.indexOf(':'); + const lastColon = raw.lastIndexOf(':'); + if (firstColon !== -1 && firstColon === lastColon) { + // Exactly one colon → host:port (host may be empty → default). + const left = raw.slice(0, lastColon); + if (left) host = left; + portRaw = raw.slice(lastColon + 1); + } else if (firstColon !== -1) { + // Multiple colons, unbracketed → a bare IPv6 literal with no port. + host = raw; + } else if (/^\d+$/.test(raw)) { + portRaw = raw; + } else { + host = raw; + } + } + + if (portRaw === undefined || portRaw === '') { + if (defaultPort !== undefined) return { host, port: defaultPort }; + throw new CliError('usage', `--${flag} needs a port, e.g. --${flag} 9000 or --${flag} 0.0.0.0:9000`); + } + const port = Number(portRaw); + if (!Number.isInteger(port) || port < 1 || port > 65535) { + throw new CliError('usage', `--${flag} port must be an integer between 1 and 65535, got: ${portRaw}`); + } + return { host, port }; +} + function parsePort(raw: string | undefined): number | undefined { if (raw === undefined) return undefined; const port = Number(raw); diff --git a/src/cli/index.ts b/src/cli/index.ts index 27381e6..d6ca6f0 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -6,6 +6,7 @@ import { COMMANDS, findCommand, type CommandDoc } from './registry'; import { decodeCommand } from './commands/decode'; import { queryCommand } from './commands/query'; import { captureCommand } from './commands/capture'; +import { proxyCommand } from './commands/proxy'; function generalHelp(): string { const lines = [ @@ -70,15 +71,19 @@ async function run(argv: string[]): Promise { case 'capture': out = await captureCommand(rest); break; + case 'proxy': + out = await proxyCommand(rest); + break; default: throw new CliError('usage', `unknown command: ${command} (try: chfx --help)`); } if (out.stdout === 'json') { writeStdout(stringify(out.data, out.compact)); - } else { + } else if (out.stdout === 'raw') { process.stdout.write(out.bytes); } + // 'none': the command already wrote its own output (e.g. the streaming proxy). return 0; } diff --git a/src/cli/output.ts b/src/cli/output.ts index e1b97ef..edf5318 100644 --- a/src/cli/output.ts +++ b/src/cli/output.ts @@ -35,7 +35,11 @@ export interface RawOutput { stdout: 'raw'; bytes: Uint8Array; } -export type CommandOutput = JsonOutput | RawOutput; +/** The command has already written everything itself (e.g. a streaming server). */ +export interface NoneOutput { + stdout: 'none'; +} +export type CommandOutput = JsonOutput | RawOutput | NoneOutput; /** * JSON.stringify replacer that makes decoded values safe to serialize: diff --git a/src/cli/proxy.integration.test.ts b/src/cli/proxy.integration.test.ts new file mode 100644 index 0000000..0d6b5c3 --- /dev/null +++ b/src/cli/proxy.integration.test.ts @@ -0,0 +1,265 @@ +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import net from 'node:net'; +import { Buffer } from 'node:buffer'; +import { mkdtempSync, readdirSync, readFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { ClickHouseContainer, StartedClickHouseContainer } from '@testcontainers/clickhouse'; +import { TestContainers } from 'testcontainers'; + +import { proxyCommand, type ProxyDeps } from './commands/proxy'; +import { decodeBuffer } from './commands/decode'; +import { ClickHouseFormat } from '../core/types/formats'; + +/** + * End-to-end integration tests for `chfx proxy`. + * + * Topology: a real ClickHouse container runs the server. The proxy runs on the + * host (the actual proxyCommand, with the real startCaptureProxy + filesystem), + * forwarding to the container's mapped native port. A real `clickhouse-client` + * inside the container connects *back* to the host proxy through + * `host.testcontainers.internal` (set up by TestContainers.exposeHostPorts). + * + * The in-container client is "remote", so it would compress by default — we pass + * `--compression 0` to keep the captured stream plaintext (the proxy only + * supports plaintext/uncompressed, the same constraint as the rest of chfx). + */ + +const IMAGE = 'clickhouse/clickhouse-server:latest'; + +/** + * Reserve `n` *distinct* free ports by binding them all simultaneously, then + * releasing. Binding at once guarantees uniqueness (sequential bind+close can + * hand back the same ephemeral port twice). Each test then claims its own port, + * so a lingering socket from one test can't EADDRINUSE the next. + */ +async function freePorts(n: number): Promise { + const servers = await Promise.all( + Array.from( + { length: n }, + () => + new Promise((resolve) => { + const s = net.createServer(); + s.listen(0, '127.0.0.1', () => resolve(s)); + }), + ), + ); + const ports = servers.map((s) => (s.address() as net.AddressInfo).port); + await Promise.all(servers.map((s) => new Promise((r) => s.close(() => r())))); + return ports; +} + +async function waitFor(pred: () => boolean, label: string, timeoutMs = 45000): Promise { + const start = Date.now(); + while (!pred()) { + if (Date.now() - start > timeoutMs) throw new Error(`timeout waiting for: ${label}`); + await new Promise((r) => setTimeout(r, 50)); + } +} + +const hexOf = (s: string) => Buffer.from(s, 'utf-8').toString('hex'); + +describe('chfx proxy — integration (real ClickHouse + real native client)', () => { + let container: StartedClickHouseContainer; + let targetEndpoint: string; + let tmp: string; + let portPool: number[] = []; + /** Claim a fresh listen port for a test (distinct, pre-exposed to containers). */ + const nextPort = (): number => { + const port = portPool.shift(); + if (port === undefined) throw new Error('port pool exhausted — increase the count in beforeAll'); + return port; + }; + + beforeAll(async () => { + // Reserve a distinct port per test and expose them ALL to containers BEFORE + // starting ClickHouse, so each can resolve host.testcontainers.internal:. + portPool = await freePorts(12); + await TestContainers.exposeHostPorts(...portPool); + container = await new ClickHouseContainer(IMAGE).start(); + targetEndpoint = `${container.getHost()}:${container.getPort()}`; + tmp = mkdtempSync(join(tmpdir(), 'chfx-proxy-')); + }, 120000); + + afterAll(async () => { + if (tmp) rmSync(tmp, { recursive: true, force: true }); + if (container) await container.stop(); + }); + + /** + * Run `clickhouse-client` inside the container, connecting back through the + * host proxy. Returns when the client exits (so the proxied connection closes). + */ + async function clientQuery(sql: string, port: number): Promise { + const result = await container.exec([ + 'clickhouse-client', + '--host', + 'host.testcontainers.internal', + '--port', + String(port), + '--user', + 'test', + '--password', + 'test', + '--database', + 'test', + '--compression', + '0', + '--query', + sql, + ]); + if (result.exitCode !== 0) { + throw new Error(`clickhouse-client exited ${result.exitCode}: ${result.output}`); + } + } + + /** Real proxy deps: real server + real fs, but capture stdout/stderr text. */ + function makeDeps() { + const text: string[] = []; + const diag: string[] = []; + const shutdown: { fn?: () => void } = {}; + const deps: Partial = { + writeText: (t) => text.push(t), + writeDiag: (t) => diag.push(t), + registerShutdown: (fn) => { + shutdown.fn = fn; + }, + }; + return { deps, text, diag, shutdown }; + } + + /** + * Drive a single-shot proxy run with `args`, firing the client once the proxy + * is listening. Resolves with the command's output + captured diagnostics. + */ + async function runOnce(args: string[], sql: string) { + const port = nextPort(); + const { deps, text, diag } = makeDeps(); + const pending = proxyCommand(['--listen', String(port), '--target', targetEndpoint, ...args], deps); + await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening'); + await clientQuery(sql, port); + const out = await pending; + return { out, text, diag }; + } + + it('single-shot: streams a raw .chproto dump (default) that decodes to NativeProtocol', async () => { + const marker = 'mark_raw_42'; + const { out } = await runOnce([], `SELECT '${marker}' AS m`); + expect(out.stdout).toBe('raw'); + const bytes = (out as { bytes: Uint8Array }).bytes; + expect(Buffer.from(bytes.subarray(0, 8)).toString()).toBe('CHPROTO1'); + const decoded = decodeBuffer(bytes, { format: 'chproto' }); + expect(decoded.format).toBe(ClickHouseFormat.NativeProtocol); + // The captured stream carries both the query (c2s) and the result (s2c). + expect(Buffer.from(decoded.outputBytes).toString('hex')).toContain(hexOf(marker)); + }, 90000); + + it('single-shot --decode: returns the proxy decode envelope', async () => { + const marker = 'mark_decode_7'; + const { out } = await runOnce(['--decode'], `SELECT '${marker}' AS m, 1 AS n`); + expect(out.stdout).toBe('json'); + const data = (out as { data: Record }).data; + expect(data.format).toBe(ClickHouseFormat.NativeProtocol); + expect((data.chfx as Record).command).toBe('proxy'); + expect(data.source).toMatchObject({ kind: 'proxy', target: targetEndpoint }); + expect(typeof data.protocolVersion).toBe('number'); + expect(data.nodeBytes).toBe(true); + expect(data.bytesHex).toContain(hexOf(marker)); + }, 90000); + + it('single-shot --out: writes the dump file and returns a JSON summary', async () => { + const file = join(tmp, 'once.chproto'); + const { out } = await runOnce(['--out', file], 'SELECT 1 AS one'); + expect(out.stdout).toBe('json'); + const data = (out as { data: Record }).data; + expect(data.saved).toBe(file); + expect(data.bytes).toBeGreaterThan(0); + // The written file is a real, decodable dump. + const onDisk = new Uint8Array(readFileSync(file)); + expect(decodeBuffer(onDisk, { format: 'chproto' }).format).toBe(ClickHouseFormat.NativeProtocol); + }, 90000); + + it('single-shot --out + --decode: writes the file *and* returns the decode envelope', async () => { + const file = join(tmp, 'once-both.chproto'); + const { out } = await runOnce(['--out', file, '--decode'], 'SELECT 123 AS v'); + expect(out.stdout).toBe('json'); + const data = (out as { data: Record }).data; + expect(data.format).toBe(ClickHouseFormat.NativeProtocol); + const onDisk = new Uint8Array(readFileSync(file)); + expect(decodeBuffer(onDisk, { format: 'chproto' }).format).toBe(ClickHouseFormat.NativeProtocol); + }, 90000); + + it('single-shot --decode --no-node-bytes --compact: honours output controls', async () => { + const { out } = await runOnce(['--decode', '--no-node-bytes', '--compact'], 'SELECT 5 AS five'); + expect(out.stdout).toBe('json'); + const json = out as { data: Record; compact: boolean }; + expect(json.compact).toBe(true); + expect(json.data.nodeBytes).toBe(false); + }, 90000); + + it('accepts a host:port form for --listen', async () => { + const port = nextPort(); + const { deps, diag } = makeDeps(); + const pending = proxyCommand( + ['--listen', `127.0.0.1:${port}`, '--target', targetEndpoint, '--decode'], + deps, + ); + await waitFor(() => diag.some((d) => /listening on/.test(d)), 'proxy listening'); + await clientQuery('SELECT 1', port); + const out = await pending; + expect(out.stdout).toBe('json'); + expect(diag.some((d) => d.includes(`127.0.0.1:${port}`))).toBe(true); + }, 90000); + + it('persistent --save-dir: writes one dump per connection until stopped', async () => { + const port = nextPort(); + const dir = join(tmp, 'persist-dumps'); + const { deps, diag, shutdown } = makeDeps(); + const pending = proxyCommand( + ['--listen', String(port), '--target', targetEndpoint, '--persistent', '--save-dir', dir], + deps, + ); + await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening'); + + await clientQuery('SELECT 1 AS a', port); + await clientQuery('SELECT 2 AS b', port); + await waitFor(() => readdirSync(dir).filter((f) => f.endsWith('.chproto')).length >= 2, 'two dumps written'); + + shutdown.fn!(); // simulate Ctrl-C + const out = await pending; + + expect(out.stdout).toBe('none'); + const files = readdirSync(dir).filter((f) => f.endsWith('.chproto')).sort(); + expect(files).toEqual(['conn-0001.chproto', 'conn-0002.chproto']); + for (const f of files) { + const bytes = new Uint8Array(readFileSync(join(dir, f))); + expect(decodeBuffer(bytes, { format: 'chproto' }).format).toBe(ClickHouseFormat.NativeProtocol); + } + expect(diag.some((d) => /stopped after 2 connection/.test(d))).toBe(true); + }, 120000); + + it('persistent --decode: streams one JSON envelope per connection', async () => { + const port = nextPort(); + const { deps, text, diag, shutdown } = makeDeps(); + const pending = proxyCommand( + ['--listen', String(port), '--target', targetEndpoint, '--persistent', '--decode', '--compact'], + deps, + ); + await waitFor(() => diag.some((d) => /listening on/.test(d)) && typeof shutdown.fn === 'function', 'listening'); + + await clientQuery('SELECT 10 AS x', port); + await clientQuery('SELECT 20 AS y', port); + await waitFor(() => text.length >= 2, 'two decoded docs'); + + shutdown.fn!(); + const out = await pending; + + expect(out.stdout).toBe('none'); + expect(text).toHaveLength(2); + for (const doc of text) { + const parsed = JSON.parse(doc) as Record; + expect((parsed.chfx as Record).command).toBe('proxy'); + expect(parsed.format).toBe(ClickHouseFormat.NativeProtocol); + } + }, 120000); +}); diff --git a/src/cli/registry.ts b/src/cli/registry.ts index 3232301..13c3315 100644 --- a/src/cli/registry.ts +++ b/src/cli/registry.ts @@ -75,6 +75,24 @@ export const COMMANDS: CommandDoc[] = [ { flag: '--help, -h', description: 'Show help for this command.' }, ], }, + { + name: 'proxy', + summary: 'Listen as a capturing proxy any native client connects through (no client spawned).', + usage: 'chfx proxy --listen [host:]port --target host:port [--out file | --save-dir dir] [--decode] [--persistent]', + details: + 'Forwards every connection to --target and records the native stream. Single-shot by default (capture ' + + 'the first connection, then exit); --persistent keeps serving until Ctrl-C. Plaintext/uncompressed only.', + options: [ + { flag: '--listen', value: '[host:]port', description: 'Address to listen on (host defaults to 127.0.0.1).' }, + { flag: '--target', value: 'host:port', description: 'Upstream ClickHouse native endpoint (default port 9000).' }, + { flag: '--out, -o', value: 'file', description: 'Single-shot: write the .chproto dump here (else raw dump streams to stdout).' }, + { flag: '--decode', description: 'Decode each capture to a JSON envelope on stdout (instead of the raw dump; combinable with --out, which still writes the dump file).' }, + { flag: '--save-dir', value: 'dir', description: 'Persistent: write one conn-NNNN.chproto per connection into this dir.' }, + { flag: '--persistent / --once', description: 'Serve many connections until Ctrl-C, or stop after the first (default --once).' }, + { flag: '--no-node-bytes / --compact', description: 'Same output controls as decode (apply when --decode is set).' }, + { flag: '--help, -h', description: 'Show help for this command.' }, + ], + }, ]; export function findCommand(name: string): CommandDoc | undefined { diff --git a/todo.md b/todo.md index 9a37fd4..8eed48e 100644 --- a/todo.md +++ b/todo.md @@ -22,11 +22,13 @@ Spec for items 1, 2, 4, 5: [docs/cli-spec.md](docs/cli-spec.md) clickhouse-client; `--protocol http` POSTs and decodes the `--format` (native | RowBinaryWithNamesAndTypes) body. `--save` keeps the dump (tcp). `chfx capture` writes a dump (file or raw stdout); `npm run capture` aliases it. -5. **Use with external clients, in the CLI** — `chfx proxy`: standalone capture - proxy any native client connects through. Single-shot by default; persistent - and live-decode via flags. Plaintext/uncompressed only. _(Not yet built — the - `query`/`capture` above are us being the client; the proxy captures other - clients.)_ +5. **Use with external clients, in the CLI** — _implemented (branch `chfx-proxy`)._ + `chfx proxy --listen <[host:]port> --target `: a capturing TCP proxy + any native client connects through (the proxy never spawns one). Single-shot by + default (`--out `, `--decode`, or raw dump to stdout, then exit); + `--persistent` serves many connections (`--save-dir` one dump per connection, + `--decode` streams JSON per connection) until Ctrl-C. Plaintext/uncompressed + only. Backed by `startCaptureProxy` in `scripts/native-proxy.mjs`. Cross-cutting: thorough CLI tests/fixtures; README quick-start + full options reference; remote auth/TLS flags. _(Native own-TCP-client to drop the