diff --git a/Cargo.lock b/Cargo.lock index fbb74a9..211fdda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,65 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -112,6 +171,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + [[package]] name = "bumpalo" version = "3.20.2" @@ -146,6 +211,53 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" +[[package]] +name = "clap" +version = "4.5.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", + "terminal_size", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "colored" version = "3.1.1" @@ -176,6 +288,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -299,6 +421,12 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hex" version = "0.4.3" @@ -426,6 +554,12 @@ dependencies = [ "hashbrown 0.16.1", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.14.0" @@ -457,6 +591,22 @@ version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + +[[package]] +name = "ljx" +version = "0.1.0" +dependencies = [ + "clap", + "colored", + "logjet", + "regex", +] + [[package]] name = "log" version = "0.4.29" @@ -469,6 +619,8 @@ version = "0.1.0" dependencies = [ "crc32c", "lz4_flex", + "opentelemetry-proto", + "prost", ] [[package]] @@ -528,6 +680,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "opentelemetry" version = "0.28.0" @@ -709,6 +867,35 @@ dependencies = [ "getrandom", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "ring" version = "0.17.14" @@ -732,6 +919,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.37" @@ -888,6 +1088,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -911,6 +1117,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "terminal_size" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b8cb979cb11c32ce1603f8137b22262a9d131aaa5c37b5678025f22b8becd0" +dependencies = [ + "rustix", + "windows-sys 0.60.2", +] + [[package]] name = "thiserror" version = "2.0.18" @@ -1136,6 +1352,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "want" version = "0.3.1" @@ -1208,7 +1430,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", ] [[package]] @@ -1226,14 +1457,31 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", ] [[package]] @@ -1242,48 +1490,96 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "zerocopy" version = "0.8.40" diff --git a/Cargo.toml b/Cargo.toml index 527a982..c09338b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" license = "Apache-2.0" [workspace] -members = ["logjetd", "demo"] -default-members = [".", "logjetd", "demo"] +members = [".", "logjetd", "demo", "ljx"] +default-members = [".", "logjetd", "demo", "ljx"] [profile.release] lto = true @@ -19,3 +19,7 @@ incremental = false [dependencies] crc32c = "0.6" lz4_flex = { version = "0.11", default-features = false, features = ["std"] } + +[dev-dependencies] +opentelemetry-proto = { version = "0.28", features = ["gen-tonic", "logs"] } +prost = "0.13" diff --git a/Makefile b/Makefile index 6b746e8..8805bdb 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,11 @@ -.PHONY: build dev devel check fix test test-unit test-all test-integration setup clean stats arm-devel arm x86-devel x86 setup-arm setup-x86 demo man +.PHONY: build dev devel check fix test test-unit test-integration setup clean stats arm-devel arm x86-devel x86 setup-arm setup-x86 demo man DEFAULT_TARGET := build ARM_TARGET ?= aarch64-unknown-linux-musl X86_TARGET ?= x86_64-unknown-linux-musl CORE_WORKSPACE := --workspace --exclude otlp-demo +MANPAGE_MD := $(wildcard doc/manpage/*.1.md) +MANPAGE_OUT := $(MANPAGE_MD:.md=) build: setup cargo build $(CORE_WORKSPACE) --release @@ -29,23 +31,26 @@ test: setup test-unit: setup @if command -v cargo-nextest >/dev/null 2>&1; then \ - cargo nextest run -p logjet --lib -p logjetd --bins; \ + cargo nextest run -p logjet --lib -p logjetd --bins -p ljx --bin ljx; \ else \ echo "cargo-nextest not available, falling back to cargo test unit-only targets"; \ cargo test -p logjet --lib; \ cargo test -p logjetd --bin logjetd; \ + cargo test -p ljx --bin ljx; \ fi test-integration: setup + cargo build -p logjetd -p ljx + cargo build -p otlp-demo --bin otlp-bofh-emitter @if command -v cargo-nextest >/dev/null 2>&1; then \ cargo nextest run -p logjetd --test bridge_flows; \ + cargo nextest run -p logjet --test ljx_cli; \ else \ - echo "cargo-nextest not available, falling back to cargo test -p logjetd --test bridge_flows"; \ + echo "cargo-nextest not available, falling back to cargo test integration targets"; \ cargo test -p logjetd --test bridge_flows; \ + cargo test -p logjet --test ljx_cli; \ fi -test-all: test - arm-devel: setup setup-arm cargo build $(CORE_WORKSPACE) --target $(ARM_TARGET) @@ -61,10 +66,12 @@ x86: setup setup-x86 demo: devel cargo build -p otlp-demo -man: +man: $(MANPAGE_OUT) + +$(MANPAGE_OUT): doc/manpage/%.1: doc/manpage/%.1.md @command -v pandoc >/dev/null 2>&1 || { echo "pandoc not found. Install pandoc to build manpages."; exit 1; } @mkdir -p doc/manpage - pandoc --standalone --to man doc/manpage/logjetd.1.md -o doc/manpage/logjetd.1 + pandoc --standalone --to man $< -o $@ clean: cargo clean diff --git a/doc/README.md b/doc/README.md index 6699895..4627ad5 100644 --- a/doc/README.md +++ b/doc/README.md @@ -1,9 +1,11 @@ # Documentation - [overview.md](./overview.md): project overview +- [ljx.md](./ljx.md): `ljx` offline CLI scope and command plan - [daemon.md](./daemon.md): `logjetd` behaviour and current limits - [configuration.md](./configuration.md): YAML keys and defaults - [features.md](./features.md): current implemented daemon features and use cases - [config-samples.md](./config-samples.md): ready-to-use config examples - [protocol.md](./protocol.md): current TCP wire framing used by `logjetd` +- [manpage/ljx.1.md](./manpage/ljx.1.md): Markdown manpage source - [manpage/logjetd.1.md](./manpage/logjetd.1.md): Markdown manpage source diff --git a/doc/ljx.md b/doc/ljx.md new file mode 100644 index 0000000..815eecf --- /dev/null +++ b/doc/ljx.md @@ -0,0 +1,165 @@ +# `ljx` + +`ljx` is the offline command-line toolbox for `.logjet` files. + +It is separate from `logjetd` and must stay separate in purpose: + +- `logjet` is the Rust library and file format +- `logjetd` is the daemon for ingest, transport, replay, and spool management +- `ljx` is the standalone file tool for inspection and transformation + +`ljx` does not control the daemon and does not depend on daemon runtime state. +It operates on `.logjet` files directly. + +## Design Goals + +The tool is intended to feel closer to `jq`, `parquet-tools`, or plumbing-style +UNIX commands than to a daemon control plane. + +Core goals: + +- stream records instead of loading entire files into memory +- preserve record ordering +- operate on structured records, not raw bytes +- support stdout where it makes sense +- work cleanly in pipelines +- keep errors direct and actionable + +## Current Status + +`ljx` is being introduced incrementally. + +Documented command set: + +- `count` +- `filter` +- `stats` +- `cat` +- `split` +- `join` + +Current implementation status for release `0.1`: + +- implemented first: `count` +- implemented first: `filter` +- planned after that: `stats`, `cat`, `split`, `join` + +The CLI may already expose planned command names, but release `0.1` should only +promise the commands that are actually complete and tested. + +## Input and Output Model + +`.logjet` files are read using the streaming reader from the `logjet` crate. +That reader is sequential and corruption-tolerant, but the current API expects a +`Read + Seek` source. + +That means: + +- normal file paths are the primary input mode +- stdout output is straightforward for stream-producing commands +- stdin support needs explicit policy because generic pipes are not seekable + +If stdin is unsupported for a given release, `ljx` should fail loudly and say +why. If stdin is later supported by spooling to a temporary file, that behaviour +should be documented as an explicit implementation choice. + +## Command Intent + +## `ljx count` + +Count records in one `.logjet` file, optionally subject to a record-aware +predicate. + +Intended examples: + +```text +ljx count telemetry.logjet +ljx count telemetry.logjet --type logs +ljx count telemetry.logjet --seq-min 1000 --seq-max 2000 +ljx count telemetry.logjet -F error -i +ljx count telemetry.logjet -e 'java\..*\.bs' +``` + +Expected properties: + +- reads sequentially +- preserves file order even though output is only a number +- does not decode or inspect payload schema + +## `ljx filter` + +Write only matching records to another `.logjet` stream. + +Intended examples: + +```text +ljx filter telemetry.logjet -o errors.logjet --type logs +ljx filter telemetry.logjet -o - --ts-min 1700000000000000000 > tail.logjet +ljx filter telemetry.logjet -o only-errors.logjet -F error -i +ljx filter telemetry.logjet -o suspect.logjet -e 'java\..*\.bs' +``` + +Expected properties: + +- input order is preserved +- output stays valid `.logjet` +- matching is done per record, not by byte scanning the file + +Supported payload matching modes: + +- `-F`, `--fixed-string` for literal payload substring matching +- `-e`, `--grep` for grep-style regex matching +- `-i`, `--ignore-case` to make either payload matcher case-insensitive + +`ljx` uses one payload matcher at a time. `-F` and `-e` are mutually exclusive. + +## `ljx stats` + +Compute summary information for one file. + +Intended summary fields: + +- record count +- byte size +- timestamp range +- optional per-type or per-field summaries + +## `ljx cat` + +Render records in a human-readable form suitable for terminal inspection. + +Open questions: + +- whether payload bytes should default to hex, escaped text, or a compact mixed format +- how much payload to print before truncating + +## `ljx split` + +Split one `.logjet` input into multiple `.logjet` outputs. + +Target split modes: + +- by record count +- by byte budget +- by timestamp window, when the semantics are nailed down + +## `ljx join` + +Join multiple `.logjet` segments into one ordered output stream. + +Potential validation: + +- sequence continuity checks +- timestamp monotonicity checks + +## Implementation Notes + +The simplest useful internal shape for `ljx` is: + +- a thin `clap` CLI layer +- one module per subcommand +- small shared helpers for input/output handling +- small shared predicate parsing for record-aware matching + +This keeps the code close to the `logjet` reader and writer APIs and avoids +inventing a second abstraction stack before the command surface is proven. diff --git a/doc/manpage/ljx.1 b/doc/manpage/ljx.1 new file mode 100644 index 0000000..da138ea --- /dev/null +++ b/doc/manpage/ljx.1 @@ -0,0 +1,466 @@ +.\" Manpage source mirrored from doc/manpage/ljx.1.md +.TH "LJX" "1" "March 2026" "" "" +.hy +.SH NAME +.PP +ljx - offline toolbox for inspecting and transforming \f[C].logjet\f[R] +files +.SH SYNOPSIS +.PP +\f[C]ljx\f[R] \f[C]count\f[R] \f[I]input\f[R] +[\f[I]predicate-options\f[R]] +.PP +\f[C]ljx\f[R] \f[C]filter\f[R] \f[I]input\f[R] \f[C]-o\f[R] +\f[I]output\f[R] [\f[I]predicate-options\f[R]] [\f[C]--codec\f[R] +\f[I]codec\f[R]] [\f[C]--block-target-size\f[R] \f[I]bytes\f[R]] +.PP +\f[C]ljx\f[R] \f[C]stats\f[R] \f[I]input\f[R] +.PP +\f[C]ljx\f[R] \f[C]cat\f[R] \f[I]input\f[R] +.PP +\f[C]ljx\f[R] \f[C]split\f[R] \f[I]input\f[R] \f[I]output-prefix\f[R] +.PP +\f[C]ljx\f[R] \f[C]join\f[R] \f[I]input\f[R]... +.SH DESCRIPTION +.PP +\f[C]ljx\f[R] is the standalone file tool in the \f[C]logjet\f[R] +ecosystem. +.PP +It works directly on \f[C].logjet\f[R] files and is intentionally +separate from \f[C]logjetd\f[R]. +.PP +\f[C]ljx\f[R] does: +.IP \[bu] 2 +inspect \f[C].logjet\f[R] data offline +.IP \[bu] 2 +stream records sequentially +.IP \[bu] 2 +preserve record ordering +.IP \[bu] 2 +transform one \f[C].logjet\f[R] stream into another +.IP \[bu] 2 +fit into ordinary UNIX pipelines +.PP +\f[C]ljx\f[R] does not: +.IP \[bu] 2 +start or control \f[C]logjetd\f[R] +.IP \[bu] 2 +depend on daemon runtime state +.IP \[bu] 2 +grep raw bytes blindly +.PP +Operations are record-aware. Matching and transformation are applied to +decoded records with sequence number, timestamp, record type, and +payload bytes. +.SH COMMANDS +.SS count +.PP +Count records in one \f[C].logjet\f[R] file. +.PP +Examples: +.IP +.nf +\f[C] +ljx count telemetry.logjet +ljx count telemetry.logjet --type logs +ljx count telemetry.logjet --seq-min 100 --seq-max 1000 +\f[R] +.fi +.PP +\f[C]count\f[R] is part of the initial \f[C]0.1\f[R] release scope. +.SS filter +.PP +Write matching records to another \f[C].logjet\f[R] stream. +.PP +Examples: +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o only-logs.logjet --type logs +ljx filter telemetry.logjet -o - --ts-min 1700000000000000000 > recent.logjet +\f[R] +.fi +.PP +\f[C]filter\f[R] is part of the initial \f[C]0.1\f[R] release scope. +.SS stats +.PP +Compute summary information for one file. +.PP +Planned summaries include: +.IP \[bu] 2 +record count +.IP \[bu] 2 +byte size +.IP \[bu] 2 +timestamp range +.IP \[bu] 2 +optional per-type or field statistics +.PP +\f[C]stats\f[R] is planned but may not be complete in release +\f[C]0.1\f[R]. +.SS cat +.PP +Print records in a human-readable form for inspection. +.PP +\f[C]cat\f[R] is planned but may not be complete in release +\f[C]0.1\f[R]. +.SS split +.PP +Split one \f[C].logjet\f[R] file into multiple outputs. +.PP +Target split modes include record count, byte budget, and timestamp +range. +.PP +\f[C]split\f[R] is planned but may not be complete in release +\f[C]0.1\f[R]. +.SS join +.PP +Join multiple \f[C].logjet\f[R] segments into one ordered output stream. +.PP +Optional validation may include sequence continuity checks. +.PP +\f[C]join\f[R] is planned but may not be complete in release +\f[C]0.1\f[R]. +.SH OPTIONS +.SS \f[C]-h\f[R], \f[C]--help\f[R] +.PP +Print usage information. +.SS \f[C]-V\f[R], \f[C]--version\f[R] +.PP +Print version information. +.SS Predicate options +.PP +The initial predicate model is intentionally small. +.PP +Expected options: +.IP \[bu] 2 +\f[C]--type\f[R] \f[I]logs|metrics|traces\f[R] +.IP \[bu] 2 +\f[C]--seq-min\f[R] \f[I]n\f[R] +.IP \[bu] 2 +\f[C]--seq-max\f[R] \f[I]n\f[R] +.IP \[bu] 2 +\f[C]--ts-min\f[R] \f[I]unix-ns\f[R] +.IP \[bu] 2 +\f[C]--ts-max\f[R] \f[I]unix-ns\f[R] +.IP \[bu] 2 +\f[C]-e\f[R], \f[C]--grep\f[R] \f[I]pattern\f[R] +.IP \[bu] 2 +\f[C]-F\f[R], \f[C]--fixed-string\f[R] \f[I]text\f[R] +.IP \[bu] 2 +\f[C]-i\f[R], \f[C]--ignore-case\f[R] +.PP +\f[C]-e\f[R] and \f[C]-F\f[R] are mutually exclusive. +.SS Filter output options +.SS \f[C]-o\f[R], \f[C]--output\f[R] \f[I]path\f[R] +.PP +Write filtered output to \f[I]path\f[R]. +.PP +Use \f[C]-\f[R] to write the resulting \f[C].logjet\f[R] stream to +stdout. +.SS \f[C]--codec\f[R] \f[I]none|lz4\f[R] +.PP +Select the output block compression codec. +.SS \f[C]--block-target-size\f[R] \f[I]bytes\f[R] +.PP +Target uncompressed payload size per output block. +.SH USAGE EXAMPLES +.SS 1. Count all records +.IP +.nf +\f[C] +ljx count telemetry.logjet +\f[R] +.fi +.PP +Use this for a quick cardinality check without printing records. +.SS 2. Count only logs +.IP +.nf +\f[C] +ljx count telemetry.logjet --type logs +\f[R] +.fi +.PP +Use this when one file contains mixed logs, metrics, and traces. +.SS 2a. Count a case-insensitive fixed string +.IP +.nf +\f[C] +ljx count telemetry.logjet -F error -i +\f[R] +.fi +.PP +Use this when you just want a plain "contains text" match. +.SS 3. Count a sequence window +.IP +.nf +\f[C] +ljx count telemetry.logjet --seq-min 100000 --seq-max 200000 +\f[R] +.fi +.PP +Use this to check whether a specific sequence span exists in one file. +.SS 4. Count a timestamp window +.IP +.nf +\f[C] +ljx count telemetry.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +\f[R] +.fi +.PP +Use this to quantify one incident or replay interval. +.SS 5. Copy all records to a new file +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o copy.logjet +\f[R] +.fi +.PP +Use this for a straight record-aware rewrite. +.SS 5a. Keep records containing a literal string +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o errors.logjet -F 'java.crap.failed' +\f[R] +.fi +.PP +Use this when literal dots should stay literal dots. +.SS 6. Keep only one record type +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o only-traces.logjet --type traces +\f[R] +.fi +.PP +Use this to split one mixed file into per-type files. +.SS 7. Keep only a sequence range +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o seq-slice.logjet --seq-min 5000 --seq-max 8000 +\f[R] +.fi +.PP +Use this to produce a narrow debugging or replay slice. +.SS 8. Keep only a timestamp range +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o one-hour.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +\f[R] +.fi +.PP +Use this to extract a specific time window into a new +\f[C].logjet\f[R] file. +.SS 9. Stream filtered output to stdout +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o - --type logs > only-logs.logjet +\f[R] +.fi +.PP +Use this when \f[C]ljx\f[R] is one stage in a shell pipeline. +.SS 10. Rewrite with explicit output block settings +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o compact.logjet --codec lz4 --block-target-size 262144 +\f[R] +.fi +.PP +Use this when you want explicit output compression and block sizing. +.SS 11. Filtering design +.PP +Filtering is the most important part of \f[C]ljx\f[R]. +.PP +.PP +Payload matching is applied to each record payload, not to raw +\f[C].logjet\f[R] container bytes and not to rendered \f[C]cat\f[R] +output. +.PP +There are two user-facing modes: +.IP \[bu] 2 +\f[C]-F\f[R], \f[C]--fixed-string\f[R] for literal substring search +.IP \[bu] 2 +\f[C]-e\f[R], \f[C]--grep\f[R] for grep-style regex search +.PP +Case-insensitive matching is enabled with +\f[C]-i\f[R], \f[C]--ignore-case\f[R]. +.SS 12. Regex payload match: wildcard in the middle +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o suspect.logjet -e 'java\..*\.bs' +\f[R] +.fi +.PP +Use this when the middle of the payload text varies. +.SS 13. Regex payload match: case-insensitive +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o errors.logjet -e 'error|fatal|panic' -i +\f[R] +.fi +.PP +Use this when payload text may contain \f[C]ERROR\f[R], +\f[C]error\f[R], or mixed case variants. +.SS 14. Count regex matches +.IP +.nf +\f[C] +ljx count telemetry.logjet -e 'timeout|deadline exceeded' -i +\f[R] +.fi +.PP +Use this when you need cardinality instead of an output file. +.SS 15. Print records for terminal inspection +.IP +.nf +\f[C] +ljx cat telemetry.logjet +\f[R] +.fi +.PP +Use this when you want a human-readable record listing. +.SS 16. Print records with hex payload rendering +.IP +.nf +\f[C] +ljx cat telemetry.logjet --hex-payload +\f[R] +.fi +.PP +Use this when payload bytes are binary and text rendering is +misleading. +.SS 17. Compute file summary statistics +.IP +.nf +\f[C] +ljx stats telemetry.logjet +\f[R] +.fi +.PP +Intended output includes record count, byte size, and timestamp range. +.SS 18. Compute per-type or field statistics +.IP +.nf +\f[C] +ljx stats telemetry.logjet --field-stats +\f[R] +.fi +.PP +Use this for a quick operational summary before deeper analysis. +.SS 19. Split by record count +.IP +.nf +\f[C] +ljx split telemetry.logjet chunk --max-records 100000 +\f[R] +.fi +.PP +Use this when large files need to be broken into smaller ordered +chunks. +.SS 20. Split by byte budget +.IP +.nf +\f[C] +ljx split telemetry.logjet shard --max-bytes 268435456 +\f[R] +.fi +.PP +Use this when files must fit under a transfer or storage ceiling. +.SS 21. Split by timestamp range +.IP +.nf +\f[C] +ljx split telemetry.logjet hour --timestamp-range 1h +\f[R] +.fi +.PP +This use case needs exact window semantics before it should be treated +as stable. +.SS 22. Join ordered segments +.IP +.nf +\f[C] +ljx join a.logjet b.logjet c.logjet -o merged.logjet +\f[R] +.fi +.PP +Use this when reconstructing one logical stream from pre-split +segments. +.SS 23. Join and validate continuity +.IP +.nf +\f[C] +ljx join part-1.logjet part-2.logjet -o merged.logjet --validate-sequence-continuity +\f[R] +.fi +.PP +Use this when gaps or overlaps should be treated as an operator-visible +problem. +.SS 24. Count a window, then extract it +.IP +.nf +\f[C] +ljx count telemetry.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +ljx filter telemetry.logjet -o incident.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +\f[R] +.fi +.PP +Use this to measure an interval first and extract it second. +.SS 25. Recompress through stdout +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o - --codec lz4 > rewritten.logjet +\f[R] +.fi +.PP +Use this when the shell should control the final destination. +.SS 26. Produce a small reproduction file +.IP +.nf +\f[C] +ljx filter telemetry.logjet -o repro.logjet --type logs --seq-min 120000 --seq-max 121000 +\f[R] +.fi +.PP +Use this to create a minimal bug-report or regression-test input. +.SH FILES +.PP +\f[C]ljx\f[R] reads and writes \f[C].logjet\f[R] files. +.PP +Example names: +.IP \[bu] 2 +\f[C]telemetry.logjet\f[R] +.IP \[bu] 2 +\f[C]telemetry-1.logjet\f[R] +.IP \[bu] 2 +\f[C]replay.logjet\f[R] +.SH EXIT STATUS +.TP +\f[C]0\f[R] +success +.TP +non-zero +invalid arguments, read failure, corrupt input handling failure, or +write failure +.SH NOTES +.PP +The current \f[C]logjet\f[R] reader API expects a seekable input source. +.PP +That means stdin support must either be: +.IP \[bu] 2 +unsupported with a clear error, or +.IP \[bu] 2 +implemented by spooling stdin to a temporary file first +.PP +The exact \f[C]0.1\f[R] behaviour should be documented in the CLI help +and release notes. diff --git a/doc/manpage/ljx.1.md b/doc/manpage/ljx.1.md new file mode 100644 index 0000000..56dbd24 --- /dev/null +++ b/doc/manpage/ljx.1.md @@ -0,0 +1,414 @@ +% LJX(1) +% Bo Maryniuk +% March 2026 + +# NAME + +ljx - offline toolbox for inspecting and transforming `.logjet` files + +# SYNOPSIS + +`ljx` `count` *input* [*predicate-options*] + +`ljx` `filter` *input* `-o` *output* [*predicate-options*] [`--codec` *codec*] [`--block-target-size` *bytes*] + +`ljx` `stats` *input* + +`ljx` `cat` *input* + +`ljx` `split` *input* *output-prefix* + +`ljx` `join` *input*... + +# DESCRIPTION + +`ljx` is the standalone file tool in the `logjet` ecosystem. + +It works directly on `.logjet` files and is intentionally separate from +`logjetd`. + +`ljx` does: + +- inspect `.logjet` data offline +- stream records sequentially +- preserve record ordering +- transform one `.logjet` stream into another +- fit into ordinary UNIX pipelines + +`ljx` does not: + +- start or control `logjetd` +- depend on daemon runtime state +- grep raw bytes blindly + +Operations are record-aware. Matching and transformation are applied to decoded +records with sequence number, timestamp, record type, and payload bytes. + +# COMMANDS + +## count + +Count records in one `.logjet` file. + +Examples: + +```text +ljx count telemetry.logjet +ljx count telemetry.logjet --type logs +ljx count telemetry.logjet --seq-min 100 --seq-max 1000 +``` + +`count` is part of the initial `0.1` release scope. + +## filter + +Write matching records to another `.logjet` stream. + +Examples: + +```text +ljx filter telemetry.logjet -o only-logs.logjet --type logs +ljx filter telemetry.logjet -o - --ts-min 1700000000000000000 > recent.logjet +``` + +`filter` is part of the initial `0.1` release scope. + +## stats + +Compute summary information for one file. + +Planned summaries include: + +- record count +- byte size +- timestamp range +- optional per-type or field statistics + +`stats` is planned but may not be complete in release `0.1`. + +## cat + +Print records in a human-readable form for inspection. + +`cat` is planned but may not be complete in release `0.1`. + +## split + +Split one `.logjet` file into multiple outputs. + +Target split modes include record count, byte budget, and timestamp range. + +`split` is planned but may not be complete in release `0.1`. + +## join + +Join multiple `.logjet` segments into one ordered output stream. + +Optional validation may include sequence continuity checks. + +`join` is planned but may not be complete in release `0.1`. + +# OPTIONS + +## `-h`, `--help` + +Print usage information. + +## `-V`, `--version` + +Print version information. + +## Predicate options + +The initial predicate model is intentionally small. + +Expected options: + +- `--type` *logs|metrics|traces* +- `--seq-min` *n* +- `--seq-max` *n* +- `--ts-min` *unix-ns* +- `--ts-max` *unix-ns* +- `-e`, `--grep` *pattern* +- `-F`, `--fixed-string` *text* +- `-i`, `--ignore-case` + +`-e` and `-F` are mutually exclusive. + +## Filter output options + +## `-o`, `--output` *path* + +Write filtered output to *path*. + +Use `-` to write the resulting `.logjet` stream to stdout. + +## `--codec` *none|lz4* + +Select the output block compression codec. + +## `--block-target-size` *bytes* + +Target uncompressed payload size per output block. + +# USAGE EXAMPLES + +## 1. Count all records + +```text +ljx count telemetry.logjet +``` + +Use this for a quick cardinality check without printing records. + +## 2. Count only logs + +```text +ljx count telemetry.logjet --type logs +``` + +Use this when one file contains mixed logs, metrics, and traces. + +## 2a. Count a case-insensitive fixed string + +```text +ljx count telemetry.logjet -F error -i +``` + +Use this when you just want a plain “contains text” match. + +## 3. Count a sequence window + +```text +ljx count telemetry.logjet --seq-min 100000 --seq-max 200000 +``` + +Use this to check whether a specific sequence span exists in one file. + +## 4. Count a timestamp window + +```text +ljx count telemetry.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +``` + +Use this to quantify one incident or replay interval. + +## 5. Copy all records to a new file + +```text +ljx filter telemetry.logjet -o copy.logjet +``` + +Use this for a straight record-aware rewrite. + +## 5a. Keep records containing a literal string + +```text +ljx filter telemetry.logjet -o errors.logjet -F 'java.crap.failed' +``` + +Use this when literal dots should stay literal dots. + +## 6. Keep only one record type + +```text +ljx filter telemetry.logjet -o only-traces.logjet --type traces +``` + +Use this to split one mixed file into per-type files. + +## 7. Keep only a sequence range + +```text +ljx filter telemetry.logjet -o seq-slice.logjet --seq-min 5000 --seq-max 8000 +``` + +Use this to produce a narrow debugging or replay slice. + +## 8. Keep only a timestamp range + +```text +ljx filter telemetry.logjet -o one-hour.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +``` + +Use this to extract a specific time window into a new `.logjet` file. + +## 9. Stream filtered output to stdout + +```text +ljx filter telemetry.logjet -o - --type logs > only-logs.logjet +``` + +Use this when `ljx` is one stage in a shell pipeline. + +## 10. Rewrite with explicit output block settings + +```text +ljx filter telemetry.logjet -o compact.logjet --codec lz4 --block-target-size 262144 +``` + +Use this when you want explicit output compression and block sizing. + +## 11. Filtering design + +Filtering is the most important part of `ljx`. + +Payload matching is applied to each record payload, not to raw `.logjet` +container bytes and not to rendered `cat` output. + +There are two user-facing modes: + +- `-F`, `--fixed-string` for literal substring search +- `-e`, `--grep` for grep-style regex search + +Case-insensitive matching is enabled with `-i`, `--ignore-case`. + +## 12. Regex payload match: wildcard in the middle + +```text +ljx filter telemetry.logjet -o suspect.logjet -e 'java\..*\.bs' +``` + +Use this when the middle of the payload text varies. + +## 13. Regex payload match: case-insensitive + +```text +ljx filter telemetry.logjet -o errors.logjet -e 'error|fatal|panic' -i +``` + +Use this when payload text may contain `ERROR`, `error`, or mixed case variants. + +## 14. Count regex matches + +```text +ljx count telemetry.logjet -e 'timeout|deadline exceeded' -i +``` + +Use this when you need cardinality instead of an output file. + +## 15. Print records for terminal inspection + +```text +ljx cat telemetry.logjet +``` + +Use this when you want a human-readable record listing. + +## 16. Print records with hex payload rendering + +```text +ljx cat telemetry.logjet --hex-payload +``` + +Use this when payload bytes are binary and text rendering is misleading. + +## 17. Compute file summary statistics + +```text +ljx stats telemetry.logjet +``` + +Intended output includes record count, byte size, and timestamp range. + +## 18. Compute per-type or field statistics + +```text +ljx stats telemetry.logjet --field-stats +``` + +Use this for a quick operational summary before deeper analysis. + +## 19. Split by record count + +```text +ljx split telemetry.logjet chunk --max-records 100000 +``` + +Use this when large files need to be broken into smaller ordered chunks. + +## 20. Split by byte budget + +```text +ljx split telemetry.logjet shard --max-bytes 268435456 +``` + +Use this when files must fit under a transfer or storage ceiling. + +## 21. Split by timestamp range + +```text +ljx split telemetry.logjet hour --timestamp-range 1h +``` + +This use case needs exact window semantics before it should be treated as stable. + +## 22. Join ordered segments + +```text +ljx join a.logjet b.logjet c.logjet -o merged.logjet +``` + +Use this when reconstructing one logical stream from pre-split segments. + +## 23. Join and validate continuity + +```text +ljx join part-1.logjet part-2.logjet -o merged.logjet --validate-sequence-continuity +``` + +Use this when gaps or overlaps should be treated as an operator-visible problem. + +## 24. Count a window, then extract it + +```text +ljx count telemetry.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +ljx filter telemetry.logjet -o incident.logjet --ts-min 1700000000000000000 --ts-max 1700003600000000000 +``` + +Use this to measure an interval first and extract it second. + +## 25. Recompress through stdout + +```text +ljx filter telemetry.logjet -o - --codec lz4 > rewritten.logjet +``` + +Use this when the shell should control the final destination. + +## 26. Produce a small reproduction file + +```text +ljx filter telemetry.logjet -o repro.logjet --type logs --seq-min 120000 --seq-max 121000 +``` + +Use this to create a minimal bug-report or regression-test input. + +# FILES + +`ljx` reads and writes `.logjet` files. + +Example names: + +- `telemetry.logjet` +- `telemetry-1.logjet` +- `replay.logjet` + +# EXIT STATUS + +`0` +: success + +non-zero +: invalid arguments, read failure, corrupt input handling failure, or write failure + +# NOTES + +The current `logjet` reader API expects a seekable input source. + +That means stdin support must either be: + +- unsupported with a clear error, or +- implemented by spooling stdin to a temporary file first + +The exact `0.1` behaviour should be documented in the CLI help and release notes. diff --git a/ljx/Cargo.toml b/ljx/Cargo.toml new file mode 100644 index 0000000..3fb51fc --- /dev/null +++ b/ljx/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ljx" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" + +[dependencies] +clap = { version = "4.5", features = ["derive", "wrap_help"] } +colored = "3" +logjet = { path = ".." } +regex = "1.12" diff --git a/ljx/src/cli.rs b/ljx/src/cli.rs new file mode 100644 index 0000000..c8e7418 --- /dev/null +++ b/ljx/src/cli.rs @@ -0,0 +1,151 @@ +use std::path::PathBuf; + +use clap::builder::styling; +use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum}; +use colored::Colorize; + +use crate::predicate::PredicateArgs; + +#[derive(Debug, Parser)] +#[command( + name = "ljx", + version, + about = "Offline toolbox for .logjet files", + long_about = None +)] +pub struct Cli { + #[command(subcommand)] + pub command: Command, +} + +pub fn build_cli() -> clap::Command { + let appname = "ljx"; + let styles = styling::Styles::styled() + .header(styling::AnsiColor::Yellow.on_default()) + .usage(styling::AnsiColor::Yellow.on_default()) + .literal(styling::AnsiColor::BrightGreen.on_default()) + .placeholder(styling::AnsiColor::BrightMagenta.on_default()); + + Cli::command() + .styles(styles) + .about(format!( + "{} - {}", + appname.bright_magenta().bold(), + "offline toolbox for .logjet streams" + )) + .override_usage(format!( + "{appname} [OPTIONS] [ARGS]" + )) + .after_help( + "Examples:\n ljx count telemetry.logjet -F error -i\n ljx filter telemetry.logjet -o only-logs.logjet -e 'java\\..*\\.bs'", + ) +} + +#[derive(Debug, Subcommand)] +pub enum Command { + Split(SplitArgs), + Join(JoinArgs), + Filter(FilterArgs), + Count(CountArgs), + Stats(StatsArgs), + Cat(CatArgs), +} + +#[derive(Debug, Clone, Args)] +pub struct CountArgs { + #[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")] + pub input: PathBuf, + + #[command(flatten)] + pub predicate: PredicateArgs, +} + +#[derive(Debug, Clone, Args)] +pub struct FilterArgs { + #[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")] + pub input: PathBuf, + + #[arg( + short, + long, + value_name = "OUTPUT", + help = "Output .logjet file or - for stdout" + )] + pub output: PathBuf, + + #[arg(long, value_enum, default_value_t = OutputCodec::Lz4)] + pub codec: OutputCodec, + + #[arg( + long, + default_value_t = logjet::DEFAULT_BLOCK_TARGET_SIZE, + help = "Target uncompressed bytes per output block" + )] + pub block_target_size: usize, + + #[command(flatten)] + pub predicate: PredicateArgs, +} + +#[derive(Debug, Clone, Args)] +pub struct SplitArgs { + #[arg(value_name = "INPUT")] + pub input: PathBuf, + + #[arg(value_name = "OUTPUT_PREFIX")] + pub output_prefix: PathBuf, + + #[arg(long)] + pub max_bytes: Option, + + #[arg(long)] + pub max_records: Option, + + #[arg(long, help = "RFC 3339 or unix-ns range support to be added")] + pub timestamp_range: Option, +} + +#[derive(Debug, Clone, Args)] +pub struct JoinArgs { + #[arg(value_name = "INPUT", required = true)] + pub inputs: Vec, + + #[arg(short, long, value_name = "OUTPUT")] + pub output: PathBuf, + + #[arg(long)] + pub validate_sequence_continuity: bool, +} + +#[derive(Debug, Clone, Args)] +pub struct StatsArgs { + #[arg(value_name = "INPUT")] + pub input: PathBuf, + + #[arg(long, help = "Compute payload size summaries by record type")] + pub field_stats: bool, +} + +#[derive(Debug, Clone, Args)] +pub struct CatArgs { + #[arg(value_name = "INPUT")] + pub input: PathBuf, + + #[arg(long, default_value_t = false)] + pub hex_payload: bool, +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +pub enum OutputCodec { + None, + Lz4, +} + +impl From for logjet::Codec { + fn from(value: OutputCodec) -> Self { + match value { + OutputCodec::None => Self::None, + OutputCodec::Lz4 => Self::Lz4, + } + } +} diff --git a/ljx/src/commands/cat.rs b/ljx/src/commands/cat.rs new file mode 100644 index 0000000..48e0450 --- /dev/null +++ b/ljx/src/commands/cat.rs @@ -0,0 +1,8 @@ +use crate::cli::CatArgs; +use crate::error::{Error, Result}; + +pub fn run(_args: CatArgs) -> Result<()> { + Err(Error::Unimplemented( + "cat is not implemented yet; the CLI shape is defined but record rendering still needs to be added", + )) +} diff --git a/ljx/src/commands/count.rs b/ljx/src/commands/count.rs new file mode 100644 index 0000000..a683a41 --- /dev/null +++ b/ljx/src/commands/count.rs @@ -0,0 +1,23 @@ +use logjet::LogjetReader; + +use crate::cli::CountArgs; +use crate::error::Result; +use crate::input::InputHandle; + +pub fn run(args: CountArgs) -> Result<()> { + let predicate = args.predicate.build()?; + let input = InputHandle::open(&args.input)?; + let mut reader = LogjetReader::new(input.into_buf_reader()); + + let mut count = 0u64; + while let Some(record) = reader.next_record()? { + if predicate.matches(&record) { + count = count + .checked_add(1) + .ok_or(logjet::Error::NumericOverflow("count"))?; + } + } + + println!("{count}"); + Ok(()) +} diff --git a/ljx/src/commands/filter.rs b/ljx/src/commands/filter.rs new file mode 100644 index 0000000..fa8bc20 --- /dev/null +++ b/ljx/src/commands/filter.rs @@ -0,0 +1,36 @@ +use std::io::Write; + +use logjet::{LogjetReader, LogjetWriter, WriterConfig}; + +use crate::cli::FilterArgs; +use crate::error::Result; +use crate::input::{InputHandle, open_output}; + +pub fn run(args: FilterArgs) -> Result<()> { + let predicate = args.predicate.build()?; + let input = InputHandle::open(&args.input)?; + let output = open_output(&args.output)?; + let config = WriterConfig { + block_target_size: args.block_target_size, + codec: args.codec.into(), + sync_marker: logjet::DEFAULT_SYNC_MARKER, + }; + + let mut reader = LogjetReader::new(input.into_buf_reader()); + let mut writer = LogjetWriter::with_config(output, config); + + while let Some(record) = reader.next_record()? { + if predicate.matches(&record) { + writer.push( + record.record_type, + record.seq, + record.ts_unix_ns, + &record.payload, + )?; + } + } + + let mut output = writer.into_inner()?; + output.flush()?; + Ok(()) +} diff --git a/ljx/src/commands/join.rs b/ljx/src/commands/join.rs new file mode 100644 index 0000000..b29a88c --- /dev/null +++ b/ljx/src/commands/join.rs @@ -0,0 +1,8 @@ +use crate::cli::JoinArgs; +use crate::error::{Error, Result}; + +pub fn run(_args: JoinArgs) -> Result<()> { + Err(Error::Unimplemented( + "join is not implemented yet; the CLI shape is defined but ordered multi-segment merging still needs to be added", + )) +} diff --git a/ljx/src/commands/mod.rs b/ljx/src/commands/mod.rs new file mode 100644 index 0000000..c4b9f75 --- /dev/null +++ b/ljx/src/commands/mod.rs @@ -0,0 +1,6 @@ +pub mod cat; +pub mod count; +pub mod filter; +pub mod join; +pub mod split; +pub mod stats; diff --git a/ljx/src/commands/split.rs b/ljx/src/commands/split.rs new file mode 100644 index 0000000..829e05d --- /dev/null +++ b/ljx/src/commands/split.rs @@ -0,0 +1,8 @@ +use crate::cli::SplitArgs; +use crate::error::{Error, Result}; + +pub fn run(_args: SplitArgs) -> Result<()> { + Err(Error::Unimplemented( + "split is not implemented yet; the CLI shape is defined but block-preserving file partitioning still needs to be added", + )) +} diff --git a/ljx/src/commands/stats.rs b/ljx/src/commands/stats.rs new file mode 100644 index 0000000..ada43dc --- /dev/null +++ b/ljx/src/commands/stats.rs @@ -0,0 +1,8 @@ +use crate::cli::StatsArgs; +use crate::error::{Error, Result}; + +pub fn run(_args: StatsArgs) -> Result<()> { + Err(Error::Unimplemented( + "stats is not implemented yet; the CLI shape is defined but streaming summaries still need to be added", + )) +} diff --git a/ljx/src/error.rs b/ljx/src/error.rs new file mode 100644 index 0000000..1bfae90 --- /dev/null +++ b/ljx/src/error.rs @@ -0,0 +1,37 @@ +use std::fmt::{Display, Formatter}; +use std::io; + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub enum Error { + Io(io::Error), + Logjet(logjet::Error), + Usage(String), + Unimplemented(&'static str), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Io(err) => write!(f, "{err}"), + Self::Logjet(err) => write!(f, "{err}"), + Self::Usage(msg) => write!(f, "{msg}"), + Self::Unimplemented(msg) => write!(f, "{msg}"), + } + } +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +impl From for Error { + fn from(value: logjet::Error) -> Self { + Self::Logjet(value) + } +} diff --git a/ljx/src/input.rs b/ljx/src/input.rs new file mode 100644 index 0000000..caaaeb0 --- /dev/null +++ b/ljx/src/input.rs @@ -0,0 +1,107 @@ +use std::fs::{File, OpenOptions}; +use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::error::{Error, Result}; + +pub struct InputHandle { + file: File, + temp_path: Option, +} + +impl InputHandle { + pub fn open(path: &Path) -> Result { + if path == Path::new("-") { + Self::from_stdin() + } else { + Ok(Self { + file: File::open(path)?, + temp_path: None, + }) + } + } + + pub fn into_buf_reader(self) -> BufReader { + BufReader::new(self) + } + + fn from_stdin() -> Result { + let path = create_temp_path()?; + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path)?; + + let mut writer = BufWriter::new(file); + let mut stdin = io::stdin().lock(); + io::copy(&mut stdin, &mut writer)?; + + let mut file = writer.into_inner().map_err(io::Error::other)?; + file.seek(SeekFrom::Start(0))?; + + Ok(Self { + file, + temp_path: Some(path), + }) + } +} + +impl Read for InputHandle { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.file.read(buf) + } +} + +impl Seek for InputHandle { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.file.seek(pos) + } +} + +impl Drop for InputHandle { + fn drop(&mut self) { + if let Some(path) = &self.temp_path { + let _ = std::fs::remove_file(path); + } + } +} + +pub fn open_output(path: &Path) -> Result> { + if path == Path::new("-") { + Ok(Box::new(BufWriter::new(io::stdout().lock()))) + } else { + Ok(Box::new(BufWriter::new(File::create(path)?))) + } +} + +fn create_temp_path() -> Result { + let mut attempt = 0u32; + let base = std::env::temp_dir(); + let pid = std::process::id(); + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|err| Error::Usage(format!("system clock error: {err}")))? + .as_nanos(); + + loop { + let candidate = base.join(format!("ljx-stdin-{pid}-{nanos}-{attempt}.logjet")); + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&candidate) + { + Ok(file) => { + drop(file); + return Ok(candidate); + } + Err(err) if err.kind() == io::ErrorKind::AlreadyExists => { + attempt = attempt + .checked_add(1) + .ok_or(Error::Usage("temporary file naming overflow".to_string()))?; + } + Err(err) => return Err(err.into()), + } + } +} diff --git a/ljx/src/main.rs b/ljx/src/main.rs new file mode 100644 index 0000000..a72a2cd --- /dev/null +++ b/ljx/src/main.rs @@ -0,0 +1,32 @@ +mod cli; +mod commands; +mod error; +mod input; +mod predicate; + +use clap::FromArgMatches; + +use crate::cli::{Cli, Command}; +use crate::error::Result; + +fn main() { + if let Err(err) = run() { + eprintln!("ljx: {err}"); + std::process::exit(1); + } +} + +fn run() -> Result<()> { + let mut command = cli::build_cli(); + let mut matches = command.get_matches_mut(); + let cli = Cli::from_arg_matches_mut(&mut matches) + .map_err(|err| crate::error::Error::Usage(err.to_string()))?; + match cli.command { + Command::Split(args) => commands::split::run(args), + Command::Join(args) => commands::join::run(args), + Command::Filter(args) => commands::filter::run(args), + Command::Count(args) => commands::count::run(args), + Command::Stats(args) => commands::stats::run(args), + Command::Cat(args) => commands::cat::run(args), + } +} diff --git a/ljx/src/predicate.rs b/ljx/src/predicate.rs new file mode 100644 index 0000000..5869849 --- /dev/null +++ b/ljx/src/predicate.rs @@ -0,0 +1,230 @@ +use clap::{ArgGroup, Args, ValueEnum}; +use logjet::{OwnedRecord, RecordType}; +use regex::bytes::{Regex, RegexBuilder}; + +use crate::error::{Error, Result}; + +#[derive(Debug, Clone, Args, Default)] +#[command(group( + ArgGroup::new("payload_match") + .args(["grep", "fixed_string"]) + .multiple(false) +))] +pub struct PredicateArgs { + #[arg(long = "type", value_enum)] + pub record_type: Option, + + #[arg(long)] + pub seq_min: Option, + + #[arg(long)] + pub seq_max: Option, + + #[arg(long)] + pub ts_min: Option, + + #[arg(long)] + pub ts_max: Option, + + #[arg(short = 'e', long = "grep", value_name = "PATTERN")] + pub grep: Option, + + #[arg(short = 'F', long = "fixed-string", value_name = "TEXT")] + pub fixed_string: Option, + + #[arg(short = 'i', long = "ignore-case")] + pub ignore_case: bool, +} + +#[derive(Debug, Clone)] +pub struct RecordPredicate { + record_type: Option, + seq_min: Option, + seq_max: Option, + ts_min: Option, + ts_max: Option, + payload_matcher: Option, +} + +#[derive(Debug, Clone)] +struct PayloadMatcher { + regex: Regex, +} + +impl PredicateArgs { + pub fn build(self) -> Result { + let payload_matcher = match (self.grep, self.fixed_string) { + (Some(pattern), None) => Some(PayloadMatcher::new(&pattern, false, self.ignore_case)?), + (None, Some(text)) => Some(PayloadMatcher::new(&text, true, self.ignore_case)?), + (None, None) => None, + (Some(_), Some(_)) => { + return Err(Error::Usage( + "choose either -e/--grep or -F/--fixed-string, not both".to_string(), + )); + } + }; + + Ok(RecordPredicate { + record_type: self.record_type.map(Into::into), + seq_min: self.seq_min, + seq_max: self.seq_max, + ts_min: self.ts_min, + ts_max: self.ts_max, + payload_matcher, + }) + } +} + +impl RecordPredicate { + pub fn matches(&self, record: &OwnedRecord) -> bool { + if let Some(expected) = self.record_type && record.record_type != expected { + return false; + } + if let Some(min) = self.seq_min && record.seq < min { + return false; + } + if let Some(max) = self.seq_max && record.seq > max { + return false; + } + if let Some(min) = self.ts_min && record.ts_unix_ns < min { + return false; + } + if let Some(max) = self.ts_max && record.ts_unix_ns > max { + return false; + } + if let Some(matcher) = &self.payload_matcher && !matcher.is_match(&record.payload) { + return false; + } + + true + } +} + +impl PayloadMatcher { + fn new(pattern: &str, fixed_string: bool, ignore_case: bool) -> Result { + let source = if fixed_string { + regex::escape(pattern) + } else { + pattern.to_string() + }; + let regex = RegexBuilder::new(&source) + .case_insensitive(ignore_case) + .build() + .map_err(|err| Error::Usage(format!("invalid payload matcher: {err}")))?; + Ok(Self { regex }) + } + + fn is_match(&self, payload: &[u8]) -> bool { + self.regex.is_match(payload) + } +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +pub enum RecordKind { + Logs, + Metrics, + Traces, +} + +impl From for RecordType { + fn from(value: RecordKind) -> Self { + match value { + RecordKind::Logs => Self::Logs, + RecordKind::Metrics => Self::Metrics, + RecordKind::Traces => Self::Traces, + } + } +} + +#[cfg(test)] +mod tests { + use super::{PredicateArgs, RecordKind}; + use logjet::{OwnedRecord, RecordType}; + + fn sample_record(payload: &[u8]) -> OwnedRecord { + OwnedRecord { + record_type: RecordType::Logs, + seq: 42, + ts_unix_ns: 1_700_000_000, + payload: payload.to_vec(), + } + } + + #[test] + fn fixed_string_match_is_literal() { + let predicate = PredicateArgs { + fixed_string: Some("java.crap.failed".to_string()), + ..PredicateArgs::default() + } + .build() + .unwrap(); + + assert!(predicate.matches(&sample_record(b"xxx java.crap.failed yyy"))); + assert!(!predicate.matches(&sample_record(b"javaXcrapXfailed"))); + } + + #[test] + fn regex_match_supports_wildcards() { + let predicate = PredicateArgs { + grep: Some(r"java\..*\.bs".to_string()), + ..PredicateArgs::default() + } + .build() + .unwrap(); + + assert!(predicate.matches(&sample_record(b"java.very.long.bs"))); + assert!(!predicate.matches(&sample_record(b"java.very.long.cs"))); + } + + #[test] + fn ignore_case_applies_to_fixed_string_and_regex() { + let fixed = PredicateArgs { + fixed_string: Some("error".to_string()), + ignore_case: true, + ..PredicateArgs::default() + } + .build() + .unwrap(); + let regex = PredicateArgs { + grep: Some("error".to_string()), + ignore_case: true, + ..PredicateArgs::default() + } + .build() + .unwrap(); + + let record = sample_record(b"prefix eRrOr suffix"); + assert!(fixed.matches(&record)); + assert!(regex.matches(&record)); + } + + #[test] + fn matcher_combines_with_record_fields() { + let predicate = PredicateArgs { + record_type: Some(RecordKind::Logs), + seq_min: Some(40), + seq_max: Some(45), + ts_min: Some(1_699_999_999), + ts_max: Some(1_700_000_001), + fixed_string: Some("hello".to_string()), + ..PredicateArgs::default() + } + .build() + .unwrap(); + + assert!(predicate.matches(&sample_record(b"hello world"))); + assert!(!predicate.matches(&sample_record(b"bye world"))); + } + + #[test] + fn invalid_regex_is_reported() { + let error = PredicateArgs { + grep: Some("(".to_string()), + ..PredicateArgs::default() + } + .build() + .unwrap_err(); + + assert!(error.to_string().contains("invalid payload matcher")); + } +} diff --git a/tests/ljx_cli.rs b/tests/ljx_cli.rs new file mode 100644 index 0000000..331aeb7 --- /dev/null +++ b/tests/ljx_cli.rs @@ -0,0 +1,427 @@ +use std::ffi::OsStr; +use std::fs::{self, File}; +use std::io::{self, BufReader}; +use std::net::{Shutdown, TcpListener, TcpStream}; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Output, Stdio}; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use logjet::LogjetReader; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use prost::Message; + +#[test] +fn ljx_filters_real_logjetd_output_from_mock_emitter() -> io::Result<()> { + ensure_test_binaries_exist()?; + + let dir = TestDir::new("ljx-cli")?; + let spool_dir = dir.path().join("spool"); + fs::create_dir_all(&spool_dir)?; + let ingest_port = free_port()?; + let spool_path = spool_dir.join("integration.logjet"); + let filtered_literal = dir.path().join("literal.logjet"); + let filtered_regex = dir.path().join("regex.logjet"); + let filtered_seq = dir.path().join("seq.logjet"); + let filtered_stdout = dir.path().join("stdout.logjet"); + + let config = dir.write( + "logjetd.conf", + &format!( + "output: file\nfile.path: {}\nfile.size: 1024\nfile.name: integration.logjet\ningest.protocol: otlp-http\ningest.listen: 127.0.0.1:{ingest_port}\n", + spool_dir.display() + ), + )?; + + eprintln!("starting logjetd"); + let _daemon = ChildGuard::spawn({ + let mut cmd = Command::new(logjetd_bin()); + cmd.arg("--config").arg(&config).arg("serve"); + cmd + }) + .map_err(|err| io::Error::other(format!("failed to start logjetd: {err}")))?; + eprintln!("waiting for ingest tcp"); + wait_for_tcp(&format!("127.0.0.1:{ingest_port}"), Duration::from_secs(5)) + .map_err(|err| io::Error::other(format!("failed waiting for ingest tcp: {err}")))?; + + for message in [ + "java.crap.failed", + "java.alpha.bs", + "ERROR boom", + "eRrOr splash", + "banana", + ] { + eprintln!("emitting {message}"); + run_emitter(ingest_port, message)?; + } + + eprintln!("waiting for spool file"); + wait_until(Duration::from_secs(5), || { + Ok(spool_path.exists() && fs::metadata(&spool_path)?.len() > 0) + }) + .map_err(|err| io::Error::other(format!("failed waiting for spool file: {err}")))?; + + let records = read_logjet_records(&spool_path)?; + assert_eq!(records.len(), 5); + let seq_min = records[1].seq; + let seq_max = records[3].seq; + let ts_min = records[1].ts_unix_ns; + let ts_max = records[3].ts_unix_ns; + + eprintln!("running ljx assertions"); + assert_eq!(run_ljx_count(&spool_path, &[])?, "5"); + assert_eq!(run_ljx_count(&spool_path, &["--type", "logs"])?, "5"); + assert_eq!( + run_ljx_count(&spool_path, &["-F", "java.crap.failed"])?, + "1" + ); + assert_eq!( + run_ljx_count(&spool_path, &["-e", r"java\..*\.bs"])?, + "1" + ); + assert_eq!(run_ljx_count(&spool_path, &["-F", "error", "-i"])?, "2"); + assert_eq!( + run_ljx_count( + &spool_path, + &[ + "--seq-min", + &seq_min.to_string(), + "--seq-max", + &seq_max.to_string(), + ], + )?, + "3" + ); + assert_eq!( + run_ljx_count( + &spool_path, + &[ + "--ts-min", + &ts_min.to_string(), + "--ts-max", + &ts_max.to_string(), + ], + )?, + "3" + ); + + run_ljx_filter( + &spool_path, + &filtered_literal, + &["-F", "java.crap.failed"], + )?; + assert_eq!( + read_logjet_messages(&filtered_literal)?, + vec!["java.crap.failed".to_string()] + ); + + run_ljx_filter(&spool_path, &filtered_regex, &["-e", "error|panic", "-i"])?; + assert_eq!( + read_logjet_messages(&filtered_regex)?, + vec!["ERROR boom".to_string(), "eRrOr splash".to_string()] + ); + + run_ljx_filter( + &spool_path, + &filtered_seq, + &[ + "--seq-min", + &seq_min.to_string(), + "--seq-max", + &seq_max.to_string(), + ], + )?; + assert_eq!( + read_logjet_messages(&filtered_seq)?, + vec![ + "java.alpha.bs".to_string(), + "ERROR boom".to_string(), + "eRrOr splash".to_string() + ] + ); + + let stdout_output = run_ljx( + [ + "filter".as_ref(), + spool_path.as_os_str(), + "-o".as_ref(), + "-".as_ref(), + ], + &["-F", "error", "-i"], + )?; + if !stdout_output.status.success() { + return Err(io::Error::other(format!( + "ljx stdout filter failed: {}", + String::from_utf8_lossy(&stdout_output.stderr) + ))); + } + fs::write(&filtered_stdout, &stdout_output.stdout)?; + assert_eq!( + read_logjet_messages(&filtered_stdout)?, + vec!["ERROR boom".to_string(), "eRrOr splash".to_string()] + ); + + let invalid = run_ljx( + ["count".as_ref(), spool_path.as_os_str()], + &["-F", "error", "-e", "panic"], + )?; + assert!(!invalid.status.success()); + assert!( + String::from_utf8_lossy(&invalid.stderr).contains("cannot be used with") + || String::from_utf8_lossy(&invalid.stderr).contains("choose either") + ); + + Ok(()) +} + +fn ensure_test_binaries_exist() -> io::Result<()> { + for path in [logjetd_bin(), ljx_bin(), emitter_bin()] { + if !path.is_file() { + return Err(io::Error::other(format!( + "missing test binary {}. build it first with: cargo build -p logjetd -p ljx -p otlp-demo --bin otlp-bofh-emitter", + path.display() + ))); + } + } + Ok(()) +} + +fn run_emitter(ingest_port: u16, message: &str) -> io::Result<()> { + let status = Command::new(emitter_bin()) + .arg(format!("127.0.0.1:{ingest_port}")) + .arg("--once") + .arg("--service-name") + .arg("ljx-it") + .arg("--message") + .arg(message) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .map_err(|err| io::Error::other(format!("failed to start emitter: {err}")))?; + if status.success() { + Ok(()) + } else { + Err(io::Error::other(format!("emitter failed for message: {message}"))) + } +} + +fn run_ljx_count(input: &Path, extra_args: &[&str]) -> io::Result { + let output = run_ljx(["count".as_ref(), input.as_os_str()], extra_args)?; + if !output.status.success() { + return Err(io::Error::other(format!( + "ljx count failed: {}", + String::from_utf8_lossy(&output.stderr) + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) +} + +fn run_ljx_filter(input: &Path, output: &Path, extra_args: &[&str]) -> io::Result<()> { + let output = run_ljx( + [ + "filter".as_ref(), + input.as_os_str(), + "-o".as_ref(), + output.as_os_str(), + ], + extra_args, + )?; + if output.status.success() { + Ok(()) + } else { + Err(io::Error::other(format!( + "ljx filter failed: {}", + String::from_utf8_lossy(&output.stderr) + ))) + } +} + +fn run_ljx(prefix_args: [&OsStr; N], extra_args: &[&str]) -> io::Result { + let mut command = Command::new(ljx_bin()); + command.args(prefix_args); + command.args(extra_args); + command + .output() + .map_err(|err| io::Error::other(format!("failed to start ljx: {err}"))) +} + +fn read_logjet_messages(path: &Path) -> io::Result> { + Ok(read_logjet_records(path)? + .into_iter() + .map(|record| record.message) + .collect()) +} + +fn read_logjet_records(path: &Path) -> io::Result> { + let file = File::open(path)?; + let mut reader = LogjetReader::new(BufReader::new(file)); + let mut records = Vec::new(); + while let Some(record) = reader.next_record().map_err(io::Error::other)? { + for message in decode_payload_messages(&record.payload)? { + records.push(DecodedRecord { + seq: record.seq, + ts_unix_ns: record.ts_unix_ns, + message, + }); + } + } + Ok(records) +} + +fn decode_payload_messages(payload: &[u8]) -> io::Result> { + let batch = ExportLogsServiceRequest::decode(payload) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + let mut messages = Vec::new(); + for resource_logs in batch.resource_logs { + for scope_logs in resource_logs.scope_logs { + for record in scope_logs.log_records { + if let Some(body) = record.body + && let Some(Value::StringValue(message)) = body.value + { + messages.push(message); + } + } + } + } + Ok(messages) +} + +fn target_dir() -> PathBuf { + std::env::var_os("CARGO_TARGET_DIR") + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("target")) +} + +struct DecodedRecord { + seq: u64, + ts_unix_ns: u64, + message: String, +} + +fn logjetd_bin() -> PathBuf { + target_dir().join("debug").join(binary_name("logjetd")) +} + +fn ljx_bin() -> PathBuf { + target_dir().join("debug").join(binary_name("ljx")) +} + +fn emitter_bin() -> PathBuf { + target_dir() + .join("debug") + .join(binary_name("otlp-bofh-emitter")) +} + +fn binary_name(name: &str) -> String { + if cfg!(windows) { + format!("{name}.exe") + } else { + name.to_string() + } +} + +struct TestDir { + path: PathBuf, +} + +impl TestDir { + fn new(label: &str) -> io::Result { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + let path = std::env::temp_dir().join(format!( + "logjet-ljx-it-{label}-{nanos}-{}", + std::process::id() + )); + fs::create_dir_all(&path)?; + Ok(Self { path }) + } + + fn path(&self) -> &Path { + &self.path + } + + fn write(&self, name: &str, body: &str) -> io::Result { + let path = self.path.join(name); + fs::write(&path, body)?; + Ok(path) + } +} + +impl Drop for TestDir { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } +} + +struct ChildGuard { + child: Child, +} + +impl ChildGuard { + fn spawn(mut command: Command) -> io::Result { + let child = command + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()?; + Ok(Self { child }) + } +} + +impl Drop for ChildGuard { + fn drop(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +fn free_port() -> io::Result { + let listener = TcpListener::bind("127.0.0.1:0")?; + let port = listener.local_addr()?.port(); + drop(listener); + Ok(port) +} + +fn wait_for_tcp(addr: &str, timeout: Duration) -> io::Result<()> { + let deadline = Instant::now() + timeout; + loop { + match TcpStream::connect(addr) { + Ok(stream) => { + let _ = stream.shutdown(Shutdown::Both); + return Ok(()); + } + Err(err) if Instant::now() < deadline => { + if err.kind() != io::ErrorKind::ConnectionRefused + && err.kind() != io::ErrorKind::TimedOut + && err.kind() != io::ErrorKind::AddrNotAvailable + { + return Err(err); + } + thread::sleep(Duration::from_millis(25)); + } + Err(err) => return Err(err), + } + } +} + +fn wait_until(timeout: Duration, mut predicate: F) -> io::Result<()> +where + F: FnMut() -> io::Result, +{ + let deadline = Instant::now() + timeout; + loop { + if predicate()? { + return Ok(()); + } + if Instant::now() >= deadline { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "timed out waiting for condition", + )); + } + thread::sleep(Duration::from_millis(25)); + } +}