Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ src/
│ ├── commands/decode.ts# `decode` — decodeBuffer/decodeCaptureStreams + shared buildDecodeEnvelope
│ ├── commands/query.ts # `query` — capture (proxy) + decode in one step; --save keeps the dump
│ ├── commands/capture.ts# `capture` — capture to .chproto (file or raw stdout); `npm run capture` alias
│ ├── connection.ts # Shared --host/port/user/... resolution + env fallbacks + experimental settings
│ ├── commands/proxy.ts # `proxy` — standalone capturing TCP proxy any native client connects through (single-shot / --persistent)
│ ├── connection.ts # Shared --host/port/user/... resolution + env fallbacks + experimental settings + parseHostPort
│ ├── args.ts # Dependency-free arg parser (value + repeatable flags)
│ ├── output.ts # CliError, JSON-safe serializer (bigint→string, bytes→hex), CommandOutput
│ ├── registry.ts # Command metadata for --help
│ ├── version.ts # Build-injected version (esbuild define)
│ └── cli.test.ts # Vitest unit + tsx e2e tests (uses fixtures/protocol/*.chproto)
│ # query/capture reuse scripts/native-proxy.mjs (+ native-proxy.d.mts for types)
│ # query/capture/proxy reuse scripts/native-proxy.mjs (+ native-proxy.d.mts for types).
│ # proxy uses startCaptureProxy (fixed listen port, many connections); query/capture use startProxy (one-shot, ephemeral)
└── styles/ # CSS files
electron/
├── main.ts # Electron main process (window, IPC handlers)
Expand Down
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ decode error.
|---------|-------------|
| `chfx query --query "<sql>"` | Run a query **and decode it** in one step (no file). `--protocol tcp` (default) captures the native packet stream via `clickhouse-client`; `--protocol http` POSTs to ClickHouse HTTP and decodes the `--format` body. `--save <f>` keeps the `.chproto` dump (tcp). |
| `chfx capture --query "<sql>"` | Capture a query to a `.chproto` dump only (native protocol). Writes `--out <f>`, or streams raw bytes to stdout (so `chfx capture … \| chfx decode` works). `npm run capture` is an alias. |
| `chfx proxy --listen <port> --target <host:port>` | Listen as a capturing TCP proxy that **any** native client connects through (clickhouse-client, Go/JDBC/Python drivers, …). Single-shot by default; `--persistent` serves many connections. See below. |
| `chfx decode [file]` | Decode a `.chproto`, Native, or RowBinary dump to JSON. Reads stdin when no file (or `-`) is given. |
| `chfx --help` / `chfx <cmd> --help` | Human-readable help. |
| `chfx --version` | Print the version. |
Expand All @@ -102,6 +103,51 @@ decode error.
| `--client <path>` | Path to `clickhouse-client` (tcp only). Env: `CLICKHOUSE_CLIENT`. |
| `--out <file>` (`capture`) | Where to write the `.chproto` dump. |

### `proxy` — capture any native client

Unlike `query`/`capture` (which drive `clickhouse-client` for you), `proxy`
just **listens**: it forwards every connection to `--target` and tees the native
packet stream into a capture. Point any native client at the listen address —
the proxy never spawns one itself. Plaintext/uncompressed connections only (TLS
and compressed streams are unsupported, the same constraint as the other native
paths).

```bash
# Single-shot: capture the next connection, write a dump, exit.
chfx proxy --listen 9100 --target 127.0.0.1:9000 --out cap.chproto
clickhouse-client --port 9100 --query "SELECT 1" # in another shell

# Single-shot, decoded straight to JSON (no file):
chfx proxy --listen 9100 --target 127.0.0.1:9000 --decode

# Persistent: serve many connections, one dump per connection, until Ctrl-C.
chfx proxy --listen 9100 --target 127.0.0.1:9000 --persistent --save-dir ./caps
```

| Option | Description |
|--------|-------------|
| `--listen <[host:]port>` | Address to listen on (host defaults to `127.0.0.1`). Required. |
| `--target <host:port>` | Upstream ClickHouse native endpoint (default port `9000`). Required. |
| `--out <file>` (`-o`) | **Single-shot** — write the `.chproto` dump here; omit (and no `--decode`) to stream the raw dump to stdout. |
| `--decode` | Decode each capture to a JSON envelope on stdout (instead of writing/streaming the raw dump). |
| `--save-dir <dir>` | **Persistent** — write one `conn-NNNN.chproto` per connection into this directory. |
| `--persistent` / `--once` | Serve until Ctrl-C, or stop after the first connection (default `--once`). |
| `--no-node-bytes` / `--compact` | Same output controls as `decode` (apply when `--decode` is set). |

Diagnostics (the listen address, per-connection notices) go to **stderr**, so
stdout stays a clean dump or JSON stream.

Notes:
- A capture completes when the **client closes its connection** (`clickhouse-client`
does after each `--query`). For long-lived/pooled driver connections that stay
open, press **Ctrl-C** — single-shot finalizes the partial capture and exits;
persistent stops and flushes any still-open connection.
- In `--persistent --save-dir`, files are named `conn-0001.chproto`, `conn-0002…`
per run; the counter resets each run, so re-running against the same directory
**overwrites** earlier files. Use a fresh `--save-dir` per run to keep history.
- For `--persistent --decode`, add `--compact` to emit one JSON document per line
(newline-delimited JSON), which is what stream consumers expect.

### `decode` options

| Option | Description |
Expand Down
23 changes: 15 additions & 8 deletions docs/cli-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,27 @@ writes the dump; omitted, it streams the raw dump bytes to stdout so
**`npm run capture` is a thin alias** to `chfx capture` (the standalone
`scripts/capture-native.mjs` was folded in and removed).

#### `chfx proxy` (item 5 — standalone capture proxy)
#### `chfx proxy` (item 5 — standalone capture proxy, implemented)
A listener that forwards to a target server and captures the native TCP stream.
**Any** native client (clickhouse-client, Go/JDBC/Python drivers, …) connects
through it — the proxy does not spawn the client itself.
- Configurable lifecycle:
- **Default: single-shot** — accept one connection, capture, write a
`.chproto` (and decode if asked), then exit.
- Flags opt into **persistent** mode (long-running, one `.chproto` per
connection to an output dir) and **live decode** (`--decode` streams decoded
JSON per connection to stdout).
- Flags: `--listen`, `--target host:port`, `--out`/`--save-dir`, `--decode`,
`--persistent`/`--once`, plus remote `--user/--password`/TLS where applicable.
`.chproto` (`--out`), decode to JSON (`--decode`), or stream the raw dump to
stdout, then exit.
- `--persistent` opts into long-running mode: one `.chproto` per connection to
`--save-dir`, and/or **live decode** (`--decode` streams a decoded JSON doc
per connection to stdout). Stops on Ctrl-C.
- Flags: `--listen [host:]port`, `--target host:port`, `--out`/`--save-dir`,
`--decode`, `--persistent`/`--once`, `--no-node-bytes`/`--compact`.
- No `--user/--password`: the proxy is transparent, so the client authenticates
end-to-end against the target through the forwarded handshake.
- **Plaintext/uncompressed only** (same constraint as today). TLS/compressed
streams are unsupported — error clearly and document it.
streams are unsupported — out of scope (clickhouse-client disables compression
on localhost, which is the intended setup).
- Backed by `startCaptureProxy` in `scripts/native-proxy.mjs` (fixed listen port,
many connections), distinct from `startProxy` (one-shot, ephemeral) used by
`query`/`capture`.

#### `--help`
Human-readable help (`chfx --help`, `chfx <command> --help`). A standalone
Expand Down
17 changes: 17 additions & 0 deletions scripts/native-proxy.d.mts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ export function startProxy(opts: {
listenHost?: string;
}): Promise<{ port: number; done: Promise<Segment[]>; close: () => void }>;

export interface StartCaptureProxyOptions {
targetHost: string;
targetPort: number;
listenHost?: string;
listenPort?: number;
once?: boolean;
onCapture?: (capture: Capture) => void;
onError?: (err: Error) => void;
}

export function startCaptureProxy(opts: StartCaptureProxyOptions): Promise<{
host: string;
port: number;
done: Promise<void>;
close: () => void;
}>;

export function splitStreams(segments: Segment[]): { c2s: Buffer; s2c: Buffer };
export function captureQuery(opts: CaptureQueryOptions): Promise<Capture>;
export function encodeDump(capture: Capture): Buffer;
Expand Down
140 changes: 140 additions & 0 deletions scripts/native-proxy.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,146 @@ export function startProxy({ targetHost, targetPort, listenHost = '127.0.0.1' })
});
}

/**
* Start a capturing proxy that forwards every accepted connection to
* (targetHost, targetPort) and invokes onCapture(capture) once that connection
* closes. Unlike startProxy (single-shot, ephemeral port, resolves with the raw
* segment log), this binds a caller-chosen address and can serve many
* connections — it backs `chfx proxy`, where an external native client connects
* through it. In `once` mode it stops itself after the first completed
* connection; otherwise it runs until close() is called. close() (and `once`'s
* self-stop) flush a partial capture for any connection still open, so a client
* that holds its connection alive (pooled drivers, idle sessions) is captured
* up to the point of shutdown rather than dropped.
*
* @param {object} opts
* @param {string} opts.targetHost
* @param {number} opts.targetPort
* @param {string} [opts.listenHost]
* @param {number} [opts.listenPort] default 0 (ephemeral)
* @param {boolean} [opts.once] stop after the first completed connection
* @param {(capture: Capture) => void} [opts.onCapture]
* @param {(err: Error) => void} [opts.onError]
* @returns {Promise<{ host: string, port: number, done: Promise<void>, close: () => void }>}
*/
export function startCaptureProxy({
targetHost,
targetPort,
listenHost = '127.0.0.1',
listenPort = 0,
once = false,
onCapture,
onError,
}) {
return new Promise((resolve, reject) => {
/** @type {() => void} */
let resolveDone;
const done = new Promise((res) => { resolveDone = res; });
let settled = false;
let connectionCount = 0;
/** @type {Set<import('node:net').Socket>} */
const sockets = new Set();
// report() callbacks for connections that haven't emitted a capture yet, so
// close() can flush a partial capture (e.g. Ctrl-C while a connection is
// still open, or a long-lived pooled client) instead of dropping it.
/** @type {Set<() => void>} */
const reporters = new Set();

const finish = () => {
if (settled) return;
settled = true;
try { server.close(); } catch { /* ignore */ }
// Flush whatever each still-open connection captured before tearing down.
for (const report of [...reporters]) report();
for (const s of sockets) s.destroy();
resolveDone();
};

const server = net.createServer((client) => {
const id = ++connectionCount;
sockets.add(client);
const upstream = net.connect(targetPort, targetHost);
sockets.add(upstream);

/** @type {Segment[]} */
const segments = [];
let openEnds = 2;
let reported = false;

const report = () => {
if (reported) return;
reported = true;
reporters.delete(report);
const { c2s, s2c } = splitStreams(segments);
/** @type {Capture} */
const capture = {
c2s,
s2c,
segments,
meta: { source: 'proxy', target: `${targetHost}:${targetPort}`, connection: id },
};
try {
onCapture?.(capture);
} catch (err) {
onError?.(/** @type {Error} */ (err));
}
if (once) finish();
};
reporters.add(report);

const closeOne = () => {
openEnds -= 1;
if (openEnds === 0) report();
};

// Forward with backpressure so a slow consumer can't make us buffer the
// whole stream in the socket layer (we already retain it in `segments`).
client.on('data', (chunk) => {
segments.push({ dir: DIR_C2S, data: Buffer.from(chunk) });
if (upstream.write(chunk) === false) client.pause();
});
upstream.on('drain', () => client.resume());
upstream.on('data', (chunk) => {
segments.push({ dir: DIR_S2C, data: Buffer.from(chunk) });
if (client.write(chunk) === false) upstream.pause();
});
client.on('drain', () => upstream.resume());
client.on('end', () => upstream.end());
upstream.on('end', () => client.end());
client.on('close', () => { sockets.delete(client); closeOne(); });
upstream.on('close', () => { sockets.delete(upstream); closeOne(); });

const fail = (/** @type {Error} */ err) => {
onError?.(err);
// Tear both ends down; their 'close' events drive report()/closeOne(),
// so a failed upstream connect can't leave `once` mode hanging.
client.destroy();
upstream.destroy();
};
client.on('error', fail);
upstream.on('error', fail);
});

// Until we're listening, an error is a startup/bind failure → reject the
// returned promise. Once listening (promise already resolved), reject is a
// no-op, so route later server errors to onError instead of dropping them.
let listening = false;
server.on('error', (err) => {
if (!listening) reject(err);
else onError?.(/** @type {Error} */ (err));
});
server.listen(listenPort, listenHost, () => {
const addr = server.address();
if (addr === null || typeof addr === 'string') {
reject(new Error('proxy failed to bind a port'));
return;
}
listening = true;
resolve({ host: listenHost, port: addr.port, done, close: finish });
});
});
}

/**
* Split an ordered segment log into the two concatenated per-direction streams.
* @param {Segment[]} segments
Expand Down
Loading
Loading