diff --git a/Cargo.lock b/Cargo.lock index 3582156..240226b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anstream" -version = "0.6.21" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" dependencies = [ "anstyle", "anstyle-parse", @@ -34,15 +34,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" [[package]] name = "anstyle-parse" -version = "0.2.7" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" dependencies = [ "utf8parse", ] @@ -212,9 +212,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.56" +version = "1.2.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ "find-msvc-tools", "shlex", @@ -243,9 +243,9 @@ checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" [[package]] name = "clap" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", "clap_derive", @@ -253,9 +253,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ "anstream", "anstyle", @@ -266,9 +266,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.55" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" dependencies = [ "heck", "proc-macro2", @@ -278,15 +278,15 @@ dependencies = [ [[package]] name = "clap_lex" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "colorchoice" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" [[package]] name = "colored" @@ -691,9 +691,9 @@ dependencies = [ [[package]] name = "instability" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357b7205c6cd18dd2c86ed312d1e70add149aea98e7ef72b9fdf0270e555c11d" +checksum = "5eb2d60ef19920a3a9193c3e371f726ec1dafc045dac788d0fb3704272458971" dependencies = [ "darling", "indoc", @@ -744,9 +744,19 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.182" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "liblogjet" +version = "0.1.0" +dependencies = [ + "opentelemetry-proto", + "prost", + "tokio", + "tonic", +] [[package]] name = "linux-raw-sys" @@ -828,9 +838,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" [[package]] name = "matchit" @@ -873,9 +883,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "once_cell_polyfill" @@ -1185,7 +1195,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1818,6 +1828,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" @@ -1967,18 +1986,18 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "zerocopy" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index c09338b..068cf69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" license = "Apache-2.0" [workspace] -members = [".", "logjetd", "demo", "ljx"] -default-members = [".", "logjetd", "demo", "ljx"] +members = [".", "logjetd", "demo", "ljx", "liblogjet"] +default-members = [".", "logjetd", "demo", "ljx", "liblogjet"] [profile.release] lto = true diff --git a/README.md b/README.md index 03400c4..e54a4c1 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,7 @@ fn replay_batches() -> Result<(), Box> { ## Notes - Examples for standalone usage live in [examples](./examples). +- C and C++ shared-library usage lives in [doc/c-cpp-integration.md](./doc/c-cpp-integration.md). - The reader is sequential by design. - Compression is per block, not per file. - The payload bytes are opaque to `logjet`. diff --git a/demo/README.md b/demo/README.md index 69c7b14..0b8ecf4 100644 --- a/demo/README.md +++ b/demo/README.md @@ -35,6 +35,8 @@ It also contains scenario demos under subdirectories: - one replay client stalls while another keeps flowing - [`replay-handoff`](./replay-handoff) - a late replay client drains retained backlog and then continues live on the same connection +- [`cpp-shared-lib`](./cpp-shared-lib) + - a C++ process loads `liblogjet.so`, sends OTLP logs into `ljd`, and opens the result in `ljx view` - [`file-replay`](./file-replay) - replay stored `.logjet` files into a collector - [`file-tooling`](./file-tooling) diff --git a/demo/cpp-shared-lib/README.md b/demo/cpp-shared-lib/README.md new file mode 100644 index 0000000..e7e8bde --- /dev/null +++ b/demo/cpp-shared-lib/README.md @@ -0,0 +1,46 @@ +# C++ Shared Library Demo + +This demo shows one C++ process loading a Rust shared library and sending OTLP +logs into `ljd` over gRPC. + +The path is: + +`C++ appliance -> liblogjet.so -> OTLP/gRPC -> ljd -> .logjet file -> ljx view` + +## Build First + +From the project root: + +```bash +cargo build -p ljd -p ljx -p liblogjet +``` + +You also need `g++` available locally because the demo compiles the C++ +example on demand. + +## Run + +From this directory: + +```bash +./run-demo.sh +``` + +## What It Does + +The script: + +1. builds the example C++ logger +2. starts file-backed `ljd` on `127.0.0.1:4317` +3. loads `liblogjet.so` through `dlopen` +4. sends 25 OTLP log records from C++ by default +5. opens `ljx view` on the resulting `./logs/cpp-demo.logjet` + +## Notes + +- the library now supports both OTLP/HTTP and OTLP/gRPC constructors +- this demo specifically uses OTLP/gRPC +- the FFI API is intentionally small: endpoint, service name, timestamp, severity, message body, and string attributes +- those key/value pairs become OTLP `LogRecord.attributes`, which is the standard OTel metadata field for log records +- if the appliance already has JSON metadata, the better long-term shape is to flatten that JSON into separate attributes where possible; a raw JSON blob can still be sent as one string attribute when needed +- the demo uses a local symlink named `liblogjet.so` that points at Cargo's built shared object diff --git a/demo/cpp-shared-lib/cpp-logger b/demo/cpp-shared-lib/cpp-logger new file mode 100755 index 0000000..db2a8eb Binary files /dev/null and b/demo/cpp-shared-lib/cpp-logger differ diff --git a/demo/cpp-shared-lib/cpp-logger.cpp b/demo/cpp-shared-lib/cpp-logger.cpp new file mode 100644 index 0000000..11e31bb --- /dev/null +++ b/demo/cpp-shared-lib/cpp-logger.cpp @@ -0,0 +1,170 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +using version_fn = const char *(*)(); +using error_fn = const char *(*)(); +using new_http_fn = lj_logger *(*)(const char *, const char *, std::uint64_t); +using new_grpc_fn = lj_logger *(*)(const char *, const char *, std::uint64_t); +using free_fn = void (*)(lj_logger *); +using log_fn = bool (*)(lj_logger *, const lj_log_record *); + +struct api { + version_fn version; + error_fn error_message; + new_http_fn new_http; + new_grpc_fn new_grpc; + free_fn free_logger; + log_fn log_record; +}; + +std::int32_t info_severity() { + return LJ_SEVERITY_INFO; +} + +const char *pick(const std::vector &values, std::mt19937 &rng) { + std::uniform_int_distribution dist(0, values.size() - 1); + return values[dist(rng)]; +} + +std::uint64_t unix_time_nanos() { + auto now = std::chrono::system_clock::now().time_since_epoch(); + return static_cast(std::chrono::duration_cast(now).count()); +} + +void *must_symbol(void *handle, const char *name) { + dlerror(); + void *symbol = dlsym(handle, name); + const char *error = dlerror(); + if (error != nullptr) { + std::cerr << "dlsym failed for " << name << ": " << error << "\n"; + std::exit(1); + } + return symbol; +} + +api load_api(void *handle) { + return api{ + reinterpret_cast(must_symbol(handle, "lj_version")), + reinterpret_cast(must_symbol(handle, "lj_error_message")), + reinterpret_cast(must_symbol(handle, "lj_logger_new_http")), + reinterpret_cast(must_symbol(handle, "lj_logger_new_grpc")), + reinterpret_cast(must_symbol(handle, "lj_logger_free")), + reinterpret_cast(must_symbol(handle, "lj_logger_log")), + }; +} + +} // namespace + +int main(int argc, char **argv) { + const std::string so_path = argc > 1 ? argv[1] : "./liblogjet.so"; + const std::string endpoint = argc > 2 ? argv[2] : "127.0.0.1:4317"; + const int message_count = argc > 3 ? std::atoi(argv[3]) : 25; + + void *handle = dlopen(so_path.c_str(), RTLD_NOW | RTLD_LOCAL); + if (handle == nullptr) { + std::cerr << "dlopen failed: " << dlerror() << "\n"; + return 1; + } + + const api lib = load_api(handle); + std::cout << "loaded liblogjet version " << lib.version() << "\n"; + + std::mt19937 rng(std::random_device{}()); + const std::vector characters = { + "Bender", "Fry", "Leela", "Professor Farnsworth", "Zoidberg", + "Amy", "Hermes", "Nibbler", "Scruffy", "Calculon", + }; + const std::vector locations = { + "Planet Express", "New New York", "The Moon", "Mars University", + "Robot Hell", "Slurm factory", "Omicron Persei 8", "Bender's fun park", + }; + const std::vector attractions = { + "blackjack dome", "dark matter coaster", "hooker-bot lounge", + "slurm chute", "robot petting zoo", "delivery cannon", + }; + const std::vector moods = { + "greedy", "heroic", "dramatic", "sleepy", + "chaotic", "optimistic", "hungry", "unbothered", + }; + const std::vector schemes = { + "casino expansion", "fun park launch", "delivery detour", + "robot uprising rehearsal", "slurm promotion", "budget evaporation", + }; + const std::vector quotes = { + "Bender promised a classy fun park financed mostly by blackjack.", + "Fry pressed the glowing button because hesitation felt off-brand.", + "Leela requested a routine delivery and got stylish chaos instead.", + "The Professor called this outage a perfectly normal science moment.", + "Zoidberg celebrated because nobody had blamed him yet.", + "Hermes filed the disaster under efficient bureaucratic progress.", + "Amy said the ship felt stable, which worried everyone instantly.", + "Nibbler stared into the void like it owed him money.", + "Scruffy fixed the panel and resumed mopping without commentary.", + "Calculon demanded better lighting for the emergency landing.", + "Bender unveiled a premium attraction featuring hooker-bots and bad odds.", + "The crew found a shortcut through poor planning and dark matter.", + "Mission control agreed this was still cheaper than preparation.", + "Someone ordered suspicious robot bees and called it innovation.", + "The delivery manifest now includes one crate of dramatic overreaction.", + }; + + lj_logger *logger = lib.new_grpc(endpoint.c_str(), "cpp-appliance", 2000); + if (logger == nullptr) { + std::cerr << "lj_logger_new_grpc failed: " << lib.error_message() << "\n"; + dlclose(handle); + return 1; + } + + for (int index = 1; index <= message_count; ++index) { + const std::string sequence = std::to_string(index); + const std::string character = pick(characters, rng); + const std::string location = pick(locations, rng); + const std::string attraction = pick(attractions, rng); + const std::string mood = pick(moods, rng); + const std::string scheme = pick(schemes, rng); + const std::string message = + std::string(pick(quotes, rng)) + " character=" + character + " location=" + location; + const lj_attribute attributes[] = { + {"appliance.kind", "cpp-demo"}, + {"appliance.sequence", sequence.c_str()}, + {"character", character.c_str()}, + {"location", location.c_str()}, + {"attraction", attraction.c_str()}, + {"mood", mood.c_str()}, + {"scheme", scheme.c_str()}, + }; + const lj_log_record record{ + unix_time_nanos(), + info_severity(), + "INFO", + message.c_str(), + attributes, + sizeof(attributes) / sizeof(attributes[0]), + }; + + if (!lib.log_record(logger, &record)) { + std::cerr << "lj_logger_log failed: " << lib.error_message() << "\n"; + lib.free_logger(logger); + dlclose(handle); + return 1; + } + + std::cout << "sent: " << message << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + + lib.free_logger(logger); + dlclose(handle); + return 0; +} diff --git a/demo/cpp-shared-lib/liblogjet.so b/demo/cpp-shared-lib/liblogjet.so new file mode 120000 index 0000000..3a3e4e9 --- /dev/null +++ b/demo/cpp-shared-lib/liblogjet.so @@ -0,0 +1 @@ +/home/boma6672/work/logjet/demo/cpp-shared-lib/../../target/debug/libliblogjet.so \ No newline at end of file diff --git a/demo/cpp-shared-lib/ljd.conf b/demo/cpp-shared-lib/ljd.conf new file mode 100644 index 0000000..909904e --- /dev/null +++ b/demo/cpp-shared-lib/ljd.conf @@ -0,0 +1,6 @@ +output: file +file.path: ./logs +file.size: 1048576 +file.name: cpp-demo.logjet +ingest.protocol: otlp-grpc +ingest.listen: 127.0.0.1:4317 diff --git a/demo/cpp-shared-lib/run-demo.sh b/demo/cpp-shared-lib/run-demo.sh new file mode 100755 index 0000000..696c79c --- /dev/null +++ b/demo/cpp-shared-lib/run-demo.sh @@ -0,0 +1,52 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +TARGET_DIR="$SCRIPT_DIR/../../target/debug" +LJD="$TARGET_DIR/ljd" +LJX="$TARGET_DIR/ljx" +LIB_SRC="$TARGET_DIR/libliblogjet.so" +LIB_DST="$SCRIPT_DIR/liblogjet.so" +CPP_SRC="$SCRIPT_DIR/cpp-logger.cpp" +CPP_BIN="$SCRIPT_DIR/cpp-logger" +CONFIG="$SCRIPT_DIR/ljd.conf" + +for bin in "$LJD" "$LJX" "$LIB_SRC"; do + if [ ! -e "$bin" ]; then + echo "missing $bin" + echo "build everything first with: cargo build -p ljd -p ljx -p liblogjet" + exit 1 + fi +done + +if ! command -v g++ >/dev/null 2>&1; then + echo "g++ not found" + echo "install a C++ compiler to run this demo" + exit 1 +fi + +mkdir -p "$SCRIPT_DIR/logs" +ln -sf "$LIB_SRC" "$LIB_DST" + +echo "building C++ example" +g++ -std=c++17 -Wall -Wextra -pedantic -O2 -I"$SCRIPT_DIR/../../liblogjet/include" "$CPP_SRC" -ldl -o "$CPP_BIN" + +echo "starting ljd with file-backed OTLP ingest" +"$LJD" --config "$CONFIG" serve & +LJD_PID=$! + +cleanup() { + kill "${LJD_PID:-}" 2>/dev/null || true +} + +trap cleanup EXIT INT TERM + +sleep 1 + +echo "sending logs from C++ through liblogjet.so into ljd over OTLP/gRPC" +"$CPP_BIN" "$LIB_DST" "127.0.0.1:4317" 25 + +sleep 1 + +echo "opening ljx view on ./logs/cpp-demo.logjet" +"$LJX" view "$SCRIPT_DIR/logs/cpp-demo.logjet" diff --git a/doc/README.md b/doc/README.md index 7d496d3..300b275 100644 --- a/doc/README.md +++ b/doc/README.md @@ -1,6 +1,7 @@ # Documentation - [overview.md](./overview.md): project overview +- [c-cpp-integration.md](./c-cpp-integration.md): minimal `liblogjet` usage from C and C++ - [ljx.md](./ljx.md): `ljx` offline CLI scope and command plan - [daemon.md](./daemon.md): `ljd` behaviour and current limits - [configuration.md](./configuration.md): YAML keys and defaults diff --git a/doc/c-cpp-integration.md b/doc/c-cpp-integration.md new file mode 100644 index 0000000..a374420 --- /dev/null +++ b/doc/c-cpp-integration.md @@ -0,0 +1,97 @@ +# C/C++ Integration + +`liblogjet` is a shared library for C and C++ callers that want to emit OTLP +logs into `ljd` without embedding Rust directly in the appliance application. + +The ABI is intentionally small: + +- create a logger for OTLP/HTTP or OTLP/gRPC +- send one log record at a time +- provide: + - message body as a string + - severity + - timestamp in Unix nanoseconds + - zero or more string key/value attributes + +Those key/value pairs become OTLP `LogRecord.attributes`, which is the standard +OpenTelemetry metadata field for logs. + +## Build + +From the project root: + +```bash +cargo build -p liblogjet +``` + +Header: + +```text +liblogjet/include/liblogjet.h +``` + +Shared object: + +```text +target/debug/libliblogjet.so +``` + +## Minimal C++ Example + +This is the smallest useful flow: + +1. load the `.so` +2. resolve the needed symbols +3. create a logger +4. send one warning-level log + +```cpp +#include "liblogjet.h" +#include + +using new_grpc_fn = lj_logger *(*)(const char *, const char *, uint64_t); +using log_fn = bool (*)(lj_logger *, const lj_log_record *); +using free_fn = void (*)(lj_logger *); +using err_fn = const char *(*)(); + +int main() { + void *so = dlopen("./liblogjet.so", RTLD_NOW | RTLD_LOCAL); + auto lj_logger_new_grpc = reinterpret_cast(dlsym(so, "lj_logger_new_grpc")); + auto lj_logger_log = reinterpret_cast(dlsym(so, "lj_logger_log")); + auto lj_logger_free = reinterpret_cast(dlsym(so, "lj_logger_free")); + auto lj_error_message = reinterpret_cast(dlsym(so, "lj_error_message")); + + lj_logger *logger = lj_logger_new_grpc("127.0.0.1:4317", "hello-cpp", 2000); + if (logger == nullptr) return 1; + + const lj_attribute attrs[] = { + {"tag", "hello-world"}, + {"version", "2.04"}, + }; + const lj_log_record record{ + 1700000000000000000ULL, // timestamp in unix ns + LJ_SEVERITY_WARN, // severity number + "WARN", // severity text + "Biohazard: C++ is in use!", // :-) + attrs, // attributes + sizeof(attrs) / sizeof(attrs[0]), // attributes (len) + }; + + if (!lj_logger_log(logger, &record)) { + const char *err = lj_error_message(); + (void)err; + } + + lj_logger_free(logger); + return 0; +} +``` + +## Notes + +- use `lj_logger_new_http(...)` for OTLP/HTTP +- use `lj_logger_new_grpc(...)` for OTLP/gRPC +- strings must be valid UTF-8 +- attribute keys and values are currently string-only by design +- richer C++ usage lives in the demo: + - [`demo/cpp-shared-lib`](../demo/cpp-shared-lib) diff --git a/liblogjet/Cargo.toml b/liblogjet/Cargo.toml new file mode 100644 index 0000000..9f0109b --- /dev/null +++ b/liblogjet/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "liblogjet" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +opentelemetry-proto = { version = "0.28", features = ["gen-tonic", "logs"] } +prost = "0.13" +tokio = { version = "1", features = ["rt-multi-thread", "time"] } +tonic = { version = "0.12", features = ["transport"] } diff --git a/liblogjet/include/liblogjet.h b/liblogjet/include/liblogjet.h new file mode 100644 index 0000000..4e84f85 --- /dev/null +++ b/liblogjet/include/liblogjet.h @@ -0,0 +1,80 @@ +#ifndef LIBLOGJET_H +#define LIBLOGJET_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Public C ABI for sending OTLP log records through liblogjet. */ + +/* Opaque logger handle created by lj_logger_new_http/grpc and freed by lj_logger_free. */ +typedef struct lj_logger lj_logger; + +/* Selected OTLP severity_number values for common log levels. */ +enum { + LJ_SEVERITY_TRACE = 1, + LJ_SEVERITY_DEBUG = 5, + LJ_SEVERITY_INFO = 9, + LJ_SEVERITY_WARN = 13, + LJ_SEVERITY_ERROR = 17, + LJ_SEVERITY_FATAL = 21 +}; + +typedef struct lj_attribute { + /* OTLP LogRecord attribute key */ + const char *key; + /* OTLP LogRecord attribute value as UTF-8 string */ + const char *value; +} lj_attribute; + +typedef struct lj_log_record { + /* OTel time_unix_nano */ + uint64_t timestamp_unix_ns; + /* OTel severity_number, for example LJ_SEVERITY_INFO */ + int32_t severity_number; + /* OTel severity_text, for example "INFO"; UTF-8, NUL-terminated, may be NULL */ + const char *severity_text; + /* OTel body string; UTF-8, NUL-terminated, must not be NULL */ + const char *body; + /* Arbitrary OTLP LogRecord string attributes; may be NULL only when attributes_len == 0 */ + const struct lj_attribute *attributes; + /* Number of entries in attributes */ + size_t attributes_len; +} lj_log_record; + +/* Returns the liblogjet version string as a static NUL-terminated string. */ +const char *lj_version(void); + +/* Returns the calling thread's last liblogjet error message as a static string view. */ +const char *lj_error_message(void); + +/* Creates an OTLP/HTTP logger. + * endpoint must be UTF-8 and NUL-terminated, using http://host:port[/path] or bare host:port[/path]. + * https:// is rejected for this constructor. Returns NULL on failure. + */ +lj_logger *lj_logger_new_http(const char *endpoint, const char *service_name, uint64_t timeout_ms); + +/* Creates an OTLP/gRPC logger. + * endpoint must be UTF-8 and NUL-terminated, using host:port or an explicit http/https URL. + * Returns NULL on failure. + */ +lj_logger *lj_logger_new_grpc(const char *endpoint, const char *service_name, uint64_t timeout_ms); + +/* Frees a logger created by lj_logger_new_http or lj_logger_new_grpc. Accepts NULL. */ +void lj_logger_free(lj_logger *logger); + +/* Sends one OTLP log record through logger. + * logger and record must be valid for the duration of the call. + * Returns false on failure; inspect lj_error_message() for details. + */ +bool lj_logger_log(lj_logger *logger, const lj_log_record *record); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/liblogjet/src/lib.rs b/liblogjet/src/lib.rs new file mode 100644 index 0000000..0e0953c --- /dev/null +++ b/liblogjet/src/lib.rs @@ -0,0 +1,402 @@ +use std::cell::RefCell; +use std::ffi::{CStr, CString, c_char}; +use std::io::{self, Read, Write}; +use std::net::TcpStream; +use std::ptr; +use std::time::Duration; + +use opentelemetry_proto::tonic::collector::logs::v1::{ExportLogsServiceRequest, logs_service_client::LogsServiceClient}; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use opentelemetry_proto::tonic::logs::v1::SeverityNumber; +use opentelemetry_proto::tonic::logs::v1::{ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use prost::Message; +use tokio::runtime::Runtime; +use tonic::Request; + +thread_local! { + static LAST_ERROR: RefCell = RefCell::new(cstring_lossy("ok")); +} + +#[repr(C)] +pub struct lj_attribute { + key: *const c_char, + value: *const c_char, +} + +#[repr(C)] +pub struct lj_log_record { + timestamp_unix_ns: u64, + severity_number: i32, + severity_text: *const c_char, + body: *const c_char, + attributes: *const lj_attribute, + attributes_len: usize, +} + +pub struct LjLogger { + transport: Transport, + service_name: String, + timeout: Duration, +} + +struct LogRecordInput { + timestamp_unix_ns: u64, + severity_number: i32, + severity_text: Option, + body: String, + attributes: Vec<(String, String)>, +} + +#[derive(Debug, Clone)] +struct HttpEndpoint { + authority: String, + path: String, +} + +#[derive(Debug, Clone)] +struct GrpcEndpoint { + url: String, +} + +enum Transport { + Http(HttpEndpoint), + Grpc { endpoint: GrpcEndpoint, runtime: Runtime }, +} + +impl HttpEndpoint { + fn parse(input: &str) -> io::Result { + if input.trim().starts_with("https://") { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "https endpoints are not supported by lj_logger_new_http; use http:// or lj_logger_new_grpc", + )); + } + let value = input.strip_prefix("http://").unwrap_or(input).trim(); + if value.is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "endpoint must not be empty")); + } + + let (authority, path) = match value.find('/') { + Some(index) => (&value[..index], &value[index..]), + None => (value, "/v1/logs"), + }; + if authority.is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "endpoint authority must not be empty")); + } + + Ok(Self { authority: authority.to_string(), path: normalize_path(path) }) + } +} + +impl LjLogger { + fn new_http(endpoint: &str, service_name: &str, timeout_ms: u64) -> io::Result { + Ok(Self { + transport: Transport::Http(HttpEndpoint::parse(endpoint)?), + service_name: service_name.to_string(), + timeout: Duration::from_millis(timeout_ms.max(1)), + }) + } + + fn new_grpc(endpoint: &str, service_name: &str, timeout_ms: u64) -> io::Result { + Ok(Self { + transport: Transport::Grpc { endpoint: GrpcEndpoint::parse(endpoint)?, runtime: Runtime::new().map_err(io::Error::other)? }, + service_name: service_name.to_string(), + timeout: Duration::from_millis(timeout_ms.max(1)), + }) + } + + fn log(&self, record: LogRecordInput) -> io::Result<()> { + match &self.transport { + Transport::Http(endpoint) => { + post_otlp_http(endpoint, self.timeout, &build_logs_request(&self.service_name, record, self.transport_name()).encode_to_vec()) + } + Transport::Grpc { endpoint, runtime } => { + runtime.block_on(post_otlp_grpc(endpoint, self.timeout, build_logs_request(&self.service_name, record, self.transport_name()))) + } + } + } + + fn transport_name(&self) -> &'static str { + match self.transport { + Transport::Http(_) => "otlp-http", + Transport::Grpc { .. } => "otlp-grpc", + } + } +} + +#[unsafe(no_mangle)] +pub extern "C" fn lj_version() -> *const c_char { + concat!(env!("CARGO_PKG_VERSION"), "\0").as_ptr().cast() +} + +#[unsafe(no_mangle)] +pub extern "C" fn lj_error_message() -> *const c_char { + LAST_ERROR.with(|cell| cell.borrow().as_ptr()) +} + +#[unsafe(no_mangle)] +pub extern "C" fn lj_logger_new_http(endpoint: *const c_char, service_name: *const c_char, timeout_ms: u64) -> *mut LjLogger { + ffi_new(|| { + Ok(Box::into_raw(Box::new(LjLogger::new_http( + &required_cstr(endpoint, "endpoint")?, + &required_cstr(service_name, "service_name")?, + timeout_ms, + )?))) + }) +} + +#[unsafe(no_mangle)] +pub extern "C" fn lj_logger_new_grpc(endpoint: *const c_char, service_name: *const c_char, timeout_ms: u64) -> *mut LjLogger { + ffi_new(|| { + Ok(Box::into_raw(Box::new(LjLogger::new_grpc( + &required_cstr(endpoint, "endpoint")?, + &required_cstr(service_name, "service_name")?, + timeout_ms, + )?))) + }) +} + +#[unsafe(no_mangle)] +/// # Safety +/// +/// `logger` must be either null or a pointer previously returned by +/// `lj_logger_new_http` or `lj_logger_new_grpc` that has not already been +/// freed. +pub unsafe extern "C" fn lj_logger_free(logger: *mut LjLogger) { + if logger.is_null() { + return; + } + let _ = std::panic::catch_unwind(|| { + // SAFETY: ownership comes from Box::into_raw in lj_logger_new_http. + unsafe { + drop(Box::from_raw(logger)); + } + }); +} + +#[unsafe(no_mangle)] +/// # Safety +/// +/// `logger` must be a valid pointer returned by `lj_logger_new_http` or +/// `lj_logger_new_grpc`. `record` must be a valid pointer for the duration of +/// this call, and all nested C strings and attribute pointers referenced by +/// `record` must also remain valid for the duration of the call. +pub unsafe extern "C" fn lj_logger_log(logger: *mut LjLogger, record: *const lj_log_record) -> bool { + ffi_bool(|| { + if logger.is_null() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "logger must not be null")); + } + if record.is_null() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "record must not be null")); + } + + // SAFETY: caller guarantees valid pointers for the duration of this call. + unsafe { (&*logger).log(parse_record(&*record)?)? }; + Ok(()) + }) +} + +fn build_logs_request(service_name: &str, record: LogRecordInput, transport_name: &str) -> ExportLogsServiceRequest { + ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { attributes: vec![string_attr("service.name", service_name)], dropped_attributes_count: 0 }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "liblogjet".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + time_unix_nano: record.timestamp_unix_ns, + observed_time_unix_nano: record.timestamp_unix_ns, + severity_number: normalized_severity(record.severity_number), + severity_text: record.severity_text.unwrap_or_else(|| default_severity_text(record.severity_number).to_string()), + body: Some(AnyValue { value: Some(Value::StringValue(record.body)) }), + attributes: std::iter::once(string_attr("liblogjet.transport", transport_name)) + .chain(std::iter::once(string_attr("liblogjet.runtime", "cpp-ffi"))) + .chain(record.attributes.into_iter().map(|(key, value)| string_attr(&key, &value))) + .collect(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: "log".to_string(), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + } +} + +fn post_otlp_http(endpoint: &HttpEndpoint, timeout: Duration, body: &[u8]) -> io::Result<()> { + let mut stream = TcpStream::connect(&endpoint.authority)?; + stream.set_write_timeout(Some(timeout))?; + stream.set_read_timeout(Some(timeout))?; + + write!( + stream, + "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/x-protobuf\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + endpoint.path, + endpoint.authority, + body.len() + )?; + stream.write_all(body)?; + stream.flush()?; + + let mut response = String::new(); + stream.read_to_string(&mut response)?; + if !response.starts_with("HTTP/1.1 200") && !response.starts_with("HTTP/1.0 200") { + return Err(io::Error::other(format!("collector returned non-200 response: {}", response.lines().next().unwrap_or("unknown response")))); + } + Ok(()) +} + +impl GrpcEndpoint { + fn parse(input: &str) -> io::Result { + let value = input.trim(); + if value.is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "endpoint must not be empty")); + } + let url = if value.starts_with("http://") || value.starts_with("https://") { value.to_string() } else { format!("http://{value}") }; + Ok(Self { url }) + } +} + +async fn post_otlp_grpc(endpoint: &GrpcEndpoint, timeout: Duration, batch: ExportLogsServiceRequest) -> io::Result<()> { + let channel = tonic::transport::Endpoint::from_shared(endpoint.url.clone()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err.to_string()))? + .connect_timeout(timeout) + .timeout(timeout) + .connect() + .await + .map_err(io::Error::other)?; + let mut client = LogsServiceClient::new(channel); + client.export(Request::new(batch)).await.map_err(io::Error::other)?; + Ok(()) +} + +fn parse_record(record: &lj_log_record) -> io::Result { + Ok(LogRecordInput { + timestamp_unix_ns: record.timestamp_unix_ns, + severity_number: record.severity_number, + severity_text: optional_cstr(record.severity_text, "record.severity_text")?, + body: required_cstr(record.body, "record.body")?, + attributes: parse_attributes(record.attributes, record.attributes_len)?, + }) +} + +fn required_cstr(ptr: *const c_char, field: &str) -> io::Result { + if ptr.is_null() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("{field} must not be null"))); + } + // SAFETY: pointer is checked above and treated as read-only. + let text = + unsafe { CStr::from_ptr(ptr) }.to_str().map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, format!("{field} must be valid UTF-8")))?; + Ok(text.to_string()) +} + +fn optional_cstr(ptr: *const c_char, field: &str) -> io::Result> { + if ptr.is_null() { + return Ok(None); + } + required_cstr(ptr, field).map(Some) +} + +fn parse_attributes(attributes: *const lj_attribute, attributes_len: usize) -> io::Result> { + if attributes_len == 0 { + return Ok(Vec::new()); + } + if attributes.is_null() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "record.attributes is null while attributes_len is non-zero")); + } + + // SAFETY: pointer validity is checked above and length is provided by the caller. + unsafe { std::slice::from_raw_parts(attributes, attributes_len) } + .iter() + .map(|attr| Ok((required_cstr(attr.key, "attribute.key")?, required_cstr(attr.value, "attribute.value")?))) + .collect() +} + +fn normalized_severity(value: i32) -> i32 { + if value == 0 { SeverityNumber::Info as i32 } else { value } +} + +fn default_severity_text(value: i32) -> &'static str { + match normalized_severity(value) { + x if x == SeverityNumber::Trace as i32 => "TRACE", + x if x == SeverityNumber::Debug as i32 => "DEBUG", + x if x == SeverityNumber::Info as i32 => "INFO", + x if x == SeverityNumber::Warn as i32 => "WARN", + x if x == SeverityNumber::Error as i32 => "ERROR", + x if x == SeverityNumber::Fatal as i32 => "FATAL", + _ => "INFO", + } +} + +fn string_attr(key: &str, value: &str) -> KeyValue { + KeyValue { key: key.to_string(), value: Some(AnyValue { value: Some(Value::StringValue(value.to_string())) }) } +} + +fn normalize_path(path: &str) -> String { + if path.is_empty() { + "/v1/logs".to_string() + } else if path.starts_with('/') { + path.to_string() + } else { + format!("/{path}") + } +} + +fn cstring_lossy(message: &str) -> CString { + let filtered = message.replace('\0', " "); + CString::new(filtered).unwrap_or_else(|_| CString::new("ffi error").expect("static string")) +} + +fn set_last_error(message: impl Into) { + let message = cstring_lossy(&message.into()); + LAST_ERROR.with(|cell| { + *cell.borrow_mut() = message; + }); +} + +fn ffi_new(func: impl FnOnce() -> io::Result<*mut T> + std::panic::UnwindSafe) -> *mut T { + match std::panic::catch_unwind(func) { + Ok(Ok(value)) => { + set_last_error("ok"); + value + } + Ok(Err(err)) => { + set_last_error(err.to_string()); + ptr::null_mut() + } + Err(_) => { + set_last_error("panic across FFI boundary"); + ptr::null_mut() + } + } +} + +fn ffi_bool(func: impl FnOnce() -> io::Result<()> + std::panic::UnwindSafe) -> bool { + match std::panic::catch_unwind(func) { + Ok(Ok(())) => { + set_last_error("ok"); + true + } + Ok(Err(err)) => { + set_last_error(err.to_string()); + false + } + Err(_) => { + set_last_error("panic across FFI boundary"); + false + } + } +} + +#[cfg(test)] +mod lib_ut; diff --git a/liblogjet/src/lib_ut.rs b/liblogjet/src/lib_ut.rs new file mode 100644 index 0000000..cda55d7 --- /dev/null +++ b/liblogjet/src/lib_ut.rs @@ -0,0 +1,175 @@ +use super::*; +use std::net::TcpListener; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::sync::oneshot; +use tonic::transport::Server; +use tonic::{Response, Status}; + +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceResponse, + logs_service_server::{LogsService, LogsServiceServer}, +}; + +#[test] +fn endpoint_parse_defaults_path() { + let endpoint = HttpEndpoint::parse("127.0.0.1:4318").unwrap(); + assert_eq!(endpoint.authority, "127.0.0.1:4318"); + assert_eq!(endpoint.path, "/v1/logs"); +} + +#[test] +fn http_endpoint_rejects_https_scheme() { + let err = HttpEndpoint::parse("https://127.0.0.1:4318").unwrap_err(); + assert!(err.to_string().contains("https endpoints are not supported")); +} + +#[test] +fn grpc_endpoint_parse_defaults_scheme() { + let endpoint = GrpcEndpoint::parse("127.0.0.1:4317").unwrap(); + assert_eq!(endpoint.url, "http://127.0.0.1:4317"); +} + +#[test] +fn ffi_logger_posts_log_record() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + + let server = thread::spawn(move || -> ExportLogsServiceRequest { + let (mut stream, _) = listener.accept().unwrap(); + let request = read_http_request(&mut stream).unwrap(); + stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").unwrap(); + ExportLogsServiceRequest::decode(request.as_slice()).unwrap() + }); + + let endpoint = CString::new(format!("127.0.0.1:{}", addr.port())).unwrap(); + let service = CString::new("cpp-appliance").unwrap(); + let logger = lj_logger_new_http(endpoint.as_ptr(), service.as_ptr(), 1_000); + assert!(!logger.is_null(), "ffi init failed: {}", unsafe { CStr::from_ptr(lj_error_message()).to_string_lossy() }); + + let severity_text = CString::new("INFO").unwrap(); + let body = CString::new("ffi hello").unwrap(); + let attr_key = CString::new("appliance.id").unwrap(); + let attr_value = CString::new("node-7").unwrap(); + let attributes = [lj_attribute { key: attr_key.as_ptr(), value: attr_value.as_ptr() }]; + let record = lj_log_record { + timestamp_unix_ns: 123, + severity_number: SeverityNumber::Info as i32, + severity_text: severity_text.as_ptr(), + body: body.as_ptr(), + attributes: attributes.as_ptr(), + attributes_len: attributes.len(), + }; + + assert!(unsafe { lj_logger_log(logger, &record) }); + unsafe { lj_logger_free(logger) }; + + let batch = server.join().unwrap(); + let resource = &batch.resource_logs[0].resource.as_ref().unwrap().attributes; + assert!(resource.iter().any(|attr| attr.key == "service.name")); + let log_record = &batch.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!(log_record.severity_text, "INFO"); + let body = log_record.body.as_ref().and_then(|value| value.value.as_ref()); + assert!(matches!(body, Some(Value::StringValue(text)) if text == "ffi hello")); + assert!(log_record.attributes.iter().any(|attr| attr.key == "appliance.id")); +} + +#[test] +fn ffi_logger_posts_log_record_over_grpc() { + let received = Arc::new(Mutex::new(Vec::new())); + let runtime = Runtime::new().unwrap(); + let addr = std::net::TcpListener::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let service = TestLogsService { received: Arc::clone(&received) }; + runtime.spawn(async move { + Server::builder() + .add_service(LogsServiceServer::new(service)) + .serve_with_shutdown(addr, async { + let _ = shutdown_rx.await; + }) + .await + .unwrap(); + }); + + let endpoint = CString::new(format!("127.0.0.1:{}", addr.port())).unwrap(); + let service_name = CString::new("cpp-appliance").unwrap(); + let logger = lj_logger_new_grpc(endpoint.as_ptr(), service_name.as_ptr(), 1_000); + assert!(!logger.is_null(), "ffi init failed: {}", unsafe { CStr::from_ptr(lj_error_message()).to_string_lossy() }); + + let severity_text = CString::new("INFO").unwrap(); + let body = CString::new("ffi grpc hello").unwrap(); + let attr_key = CString::new("appliance.id").unwrap(); + let attr_value = CString::new("node-9").unwrap(); + let attributes = [lj_attribute { key: attr_key.as_ptr(), value: attr_value.as_ptr() }]; + let record = lj_log_record { + timestamp_unix_ns: 456, + severity_number: SeverityNumber::Info as i32, + severity_text: severity_text.as_ptr(), + body: body.as_ptr(), + attributes: attributes.as_ptr(), + attributes_len: attributes.len(), + }; + + assert!(unsafe { lj_logger_log(logger, &record) }); + unsafe { lj_logger_free(logger) }; + + runtime.block_on(async { + for _ in 0..50 { + if !received.lock().unwrap().is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + }); + let _ = shutdown_tx.send(()); + + let batches = received.lock().unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + let log_record = &batch.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!(log_record.severity_text, "INFO"); + let body = log_record.body.as_ref().and_then(|value| value.value.as_ref()); + assert!(matches!(body, Some(Value::StringValue(text)) if text == "ffi grpc hello")); + assert!(log_record.attributes.iter().any(|attr| attr.key == "appliance.id")); + assert!(log_record.attributes.iter().any(|attr| attr.key == "liblogjet.transport")); +} + +fn read_http_request(stream: &mut TcpStream) -> io::Result> { + let mut header = Vec::new(); + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte)?; + header.push(byte[0]); + if header.ends_with(b"\r\n\r\n") { + break; + } + } + + let header_text = std::str::from_utf8(&header[..header.len() - 4]).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid header"))?; + let mut content_length = None; + for line in header_text.lines().skip(1) { + if let Some((name, value)) = line.split_once(':') + && name.eq_ignore_ascii_case("content-length") + { + content_length = Some(value.trim().parse::().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid content-length"))?); + } + } + + let mut body = vec![0u8; content_length.unwrap_or(0)]; + stream.read_exact(&mut body)?; + Ok(body) +} + +#[derive(Clone)] +struct TestLogsService { + received: Arc>>, +} + +#[tonic::async_trait] +impl LogsService for TestLogsService { + async fn export(&self, request: Request) -> Result, Status> { + self.received.lock().unwrap().push(request.into_inner()); + Ok(Response::new(ExportLogsServiceResponse { partial_success: None })) + } +} diff --git a/ljx/src/commands/view.rs b/ljx/src/commands/view.rs index 5ae091b..c6d8ae2 100644 --- a/ljx/src/commands/view.rs +++ b/ljx/src/commands/view.rs @@ -14,6 +14,7 @@ use crossterm::execute; use crossterm::terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode}; use logjet::{LogjetReader, LogjetWriter, OwnedRecord, RecordType, WriterConfig}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::AnyValue; use opentelemetry_proto::tonic::common::v1::any_value::Value; use prost::Message; use ratatui::backend::CrosstermBackend; @@ -32,6 +33,7 @@ const SUMMARY_CACHE_LIMIT: usize = 256; const DETAIL_PREVIEW_BYTES: usize = 1024; const SCAN_BATCH_SIZE: usize = 128; const TICK_RATE: Duration = Duration::from_millis(100); +const MODAL_ATTR_ENTRY_LIMIT_PER_KIND: usize = 32; pub fn run(args: ViewArgs) -> Result<()> { if !io::stdin().is_terminal() || !io::stdout().is_terminal() { @@ -1025,6 +1027,10 @@ fn render_modal_info_entries(detail: &DetailRecord) -> Vec<(String, String)> { let mut record_attr_count = 0usize; let mut trace_ids = 0usize; let mut span_ids = 0usize; + let mut resource_attr_entries = Vec::new(); + let mut record_attr_entries = Vec::new(); + let mut resource_attr_omitted = 0usize; + let mut record_attr_omitted = 0usize; for resource_logs in &batch.resource_logs { if let Some(resource) = &resource_logs.resource { @@ -1037,6 +1043,13 @@ fn render_modal_info_entries(detail: &DetailRecord) -> Vec<(String, String)> { { service_names.push(service.clone()); } + push_modal_attribute_entry( + &mut resource_attr_entries, + &mut resource_attr_omitted, + "resource", + &attr.key, + format_any_value(attr.value.as_ref()), + ); } } @@ -1061,6 +1074,15 @@ fn render_modal_info_entries(detail: &DetailRecord) -> Vec<(String, String)> { if !record.span_id.is_empty() { span_ids += 1; } + for attr in &record.attributes { + push_modal_attribute_entry( + &mut record_attr_entries, + &mut record_attr_omitted, + "record", + &attr.key, + format_any_value(attr.value.as_ref()), + ); + } } } } @@ -1081,6 +1103,18 @@ fn render_modal_info_entries(detail: &DetailRecord) -> Vec<(String, String)> { } lines.push(("resource.attrs".to_string(), resource_attr_count.to_string())); lines.push(("record.attrs".to_string(), record_attr_count.to_string())); + for (kind, key, value) in resource_attr_entries { + lines.push((format!("{kind}.{key}"), value)); + } + if resource_attr_omitted > 0 { + lines.push(("resource.attrs.more".to_string(), format!("{resource_attr_omitted} not shown"))); + } + for (kind, key, value) in record_attr_entries { + lines.push((format!("{kind}.{key}"), value)); + } + if record_attr_omitted > 0 { + lines.push(("record.attrs.more".to_string(), format!("{record_attr_omitted} not shown"))); + } if trace_ids > 0 { lines.push(("trace_id".to_string(), format!("{trace_ids} present"))); } @@ -1091,12 +1125,88 @@ fn render_modal_info_entries(detail: &DetailRecord) -> Vec<(String, String)> { lines } +fn push_modal_attribute_entry(entries: &mut Vec<(String, String, String)>, omitted: &mut usize, kind: &str, key: &str, value: String) { + if entries.len() < MODAL_ATTR_ENTRY_LIMIT_PER_KIND { + entries.push((kind.to_string(), key.to_string(), value)); + } else { + *omitted += 1; + } +} + fn modal_info_line(key: &str, value: String, key_width: usize, value_width: usize) -> Line<'static> { let value = trim_single_line(&value, value_width); - Line::from(vec![ - Span::styled(format!("{key:) -> String { + let Some(value) = value else { + return "null".to_string(); + }; + match &value.value { + Some(Value::StringValue(text)) => text.clone(), + Some(Value::BoolValue(flag)) => flag.to_string(), + Some(Value::IntValue(number)) => number.to_string(), + Some(Value::DoubleValue(number)) => number.to_string(), + Some(Value::BytesValue(bytes)) => format!("<{} bytes>", bytes.len()), + Some(Value::ArrayValue(array)) => format!("", array.values.len()), + Some(Value::KvlistValue(map)) => format!("", map.values.len()), + None => "null".to_string(), + } +} + +fn is_otlp_attribute_entry(key: &str) -> bool { + (key.starts_with("resource.") && key != "resource.attrs") || (key.starts_with("record.") && key != "record.attrs") +} + +fn is_standard_otlp_attribute_entry(key: &str) -> bool { + let Some((_, attr_key)) = key.split_once('.') else { + return false; + }; + + const STANDARD_PREFIXES: &[&str] = &[ + "service.", + "telemetry.", + "host.", + "os.", + "process.", + "container.", + "k8s.", + "cloud.", + "deployment.", + "device.", + "faas.", + "enduser.", + "server.", + "client.", + "http.", + "url.", + "network.", + "net.", + "rpc.", + "db.", + "messaging.", + "exception.", + "code.", + "thread.", + "gen_ai.", + "browser.", + "user_agent.", + "aws.", + "gcp.", + "azure.", + "vcs.", + ]; + + STANDARD_PREFIXES.iter().any(|prefix| attr_key.starts_with(prefix)) } fn footer_sep() -> Span<'static> { @@ -1323,78 +1433,5 @@ fn create_temp_path() -> Result { } #[cfg(test)] -mod tests { - use super::{DetailRecord, EntryMeta, extract_otlp_log_message, format_summary, render_modal_message, text_preview}; - use logjet::RecordType; - use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; - use opentelemetry_proto::tonic::common::v1::any_value::Value; - use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope}; - use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; - use opentelemetry_proto::tonic::resource::v1::Resource; - use prost::Message; - - #[test] - fn text_preview_flattens_newlines() { - assert_eq!(text_preview(b"hello\nworld", 32), "hello world"); - } - - #[test] - fn summary_uses_trimmed_single_line_preview() { - let detail = DetailRecord { - meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 7, ts_unix_ns: 9, payload_len: 13 }, - payload: b"line one\nline two".to_vec(), - }; - let summary = format_summary(&detail, false); - assert_eq!(summary, "line one line two"); - } - - #[test] - fn summary_prefers_decoded_otlp_log_message() { - let batch = ExportLogsServiceRequest { - resource_logs: vec![ResourceLogs { - resource: Some(Resource { attributes: Vec::new(), dropped_attributes_count: 0 }), - scope_logs: vec![ScopeLogs { - scope: Some(InstrumentationScope { - name: "test".to_string(), - version: String::new(), - attributes: Vec::new(), - dropped_attributes_count: 0, - }), - log_records: vec![LogRecord { - time_unix_nano: 0, - observed_time_unix_nano: 0, - severity_number: 0, - severity_text: String::new(), - body: Some(AnyValue { value: Some(Value::StringValue("hello from body".to_string())) }), - attributes: Vec::new(), - dropped_attributes_count: 0, - flags: 0, - trace_id: Vec::new(), - span_id: Vec::new(), - event_name: String::new(), - }], - schema_url: String::new(), - }], - schema_url: String::new(), - }], - }; - let payload = batch.encode_to_vec(); - let detail = DetailRecord { - meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 1, ts_unix_ns: 2, payload_len: payload.len() as u64 }, - payload, - }; - - assert_eq!(extract_otlp_log_message(&detail.payload).as_deref(), Some("hello from body")); - assert_eq!(format_summary(&detail, false), "hello from body"); - } - - #[test] - fn modal_falls_back_to_raw_payload() { - let detail = DetailRecord { - meta: EntryMeta { offset: 0, record_type: RecordType::Metrics, seq: 1, ts_unix_ns: 2, payload_len: 5 }, - payload: b"hello".to_vec(), - }; - let body = render_modal_message(&detail, false); - assert_eq!(body, "hello"); - } -} +#[path = "view_ut.rs"] +mod view_ut; diff --git a/ljx/src/commands/view_ut.rs b/ljx/src/commands/view_ut.rs new file mode 100644 index 0000000..53c04b0 --- /dev/null +++ b/ljx/src/commands/view_ut.rs @@ -0,0 +1,171 @@ +use super::{ + DetailRecord, EntryMeta, MODAL_ATTR_ENTRY_LIMIT_PER_KIND, extract_otlp_log_message, format_summary, render_modal_info_entries, + render_modal_message, text_preview, +}; +use logjet::RecordType; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use prost::Message; + +#[test] +fn text_preview_flattens_newlines() { + assert_eq!(text_preview(b"hello\nworld", 32), "hello world"); +} + +#[test] +fn summary_uses_trimmed_single_line_preview() { + let detail = DetailRecord { + meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 7, ts_unix_ns: 9, payload_len: 13 }, + payload: b"line one\nline two".to_vec(), + }; + let summary = format_summary(&detail, false); + assert_eq!(summary, "line one line two"); +} + +#[test] +fn summary_prefers_decoded_otlp_log_message() { + let batch = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { attributes: Vec::new(), dropped_attributes_count: 0 }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "test".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + time_unix_nano: 0, + observed_time_unix_nano: 0, + severity_number: 0, + severity_text: String::new(), + body: Some(AnyValue { value: Some(Value::StringValue("hello from body".to_string())) }), + attributes: Vec::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let payload = batch.encode_to_vec(); + let detail = DetailRecord { + meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 1, ts_unix_ns: 2, payload_len: payload.len() as u64 }, + payload, + }; + + assert_eq!(extract_otlp_log_message(&detail.payload).as_deref(), Some("hello from body")); + assert_eq!(format_summary(&detail, false), "hello from body"); +} + +#[test] +fn modal_falls_back_to_raw_payload() { + let detail = DetailRecord { + meta: EntryMeta { offset: 0, record_type: RecordType::Metrics, seq: 1, ts_unix_ns: 2, payload_len: 5 }, + payload: b"hello".to_vec(), + }; + let body = render_modal_message(&detail, false); + assert_eq!(body, "hello"); +} + +#[test] +fn modal_info_lists_otlp_attributes() { + let batch = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("cpp-appliance".to_string())) }), + }], + dropped_attributes_count: 0, + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "liblogjet".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + time_unix_nano: 0, + observed_time_unix_nano: 0, + severity_number: 0, + severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue("hello from cpp".to_string())) }), + attributes: vec![KeyValue { + key: "character".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("Bender".to_string())) }), + }], + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let payload = batch.encode_to_vec(); + let detail = DetailRecord { + meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 1, ts_unix_ns: 2, payload_len: payload.len() as u64 }, + payload, + }; + + let entries = render_modal_info_entries(&detail); + assert!(entries.iter().any(|(key, value)| key == "resource.service.name" && value == "cpp-appliance")); + assert!(entries.iter().any(|(key, value)| key == "record.character" && value == "Bender")); +} + +#[test] +fn modal_info_caps_attribute_entries_per_kind() { + let attributes = (0..40) + .map(|index| KeyValue { key: format!("custom.{index}"), value: Some(AnyValue { value: Some(Value::StringValue(format!("value-{index}"))) }) }) + .collect::>(); + let batch = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { attributes: Vec::new(), dropped_attributes_count: 0 }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "liblogjet".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + time_unix_nano: 0, + observed_time_unix_nano: 0, + severity_number: 0, + severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue("hello from cpp".to_string())) }), + attributes, + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let payload = batch.encode_to_vec(); + let detail = DetailRecord { + meta: EntryMeta { offset: 0, record_type: RecordType::Logs, seq: 1, ts_unix_ns: 2, payload_len: payload.len() as u64 }, + payload, + }; + + let entries = render_modal_info_entries(&detail); + let record_entries = entries.iter().filter(|(key, _)| key.starts_with("record.custom.")).count(); + assert_eq!(record_entries, MODAL_ATTR_ENTRY_LIMIT_PER_KIND); + assert!(entries.iter().any(|(key, value)| key == "record.attrs.more" && value == "8 not shown")); +} diff --git a/ljx/src/predicate.rs b/ljx/src/predicate.rs index 26284f3..124df8b 100644 --- a/ljx/src/predicate.rs +++ b/ljx/src/predicate.rs @@ -180,88 +180,5 @@ impl From for RecordType { } #[cfg(test)] -mod tests { - use super::{FilterMode, PredicateArgs, RecordKind, parse_filter_query}; - use logjet::{OwnedRecord, RecordType}; - - fn sample_record(payload: &[u8]) -> OwnedRecord { - OwnedRecord { record_type: RecordType::Logs, seq: 42, ts_unix_ns: 1_700_000_000, payload: payload.to_vec() } - } - - #[test] - fn fixed_string_match_is_literal() { - let predicate = PredicateArgs { fixed_string: Some("java.crap.failed".to_string()), ..PredicateArgs::default() }.build().unwrap(); - - assert!(predicate.matches(&sample_record(b"xxx java.crap.failed yyy"))); - assert!(!predicate.matches(&sample_record(b"javaXcrapXfailed"))); - } - - #[test] - fn regex_match_supports_wildcards() { - let predicate = PredicateArgs { grep: Some(r"java\..*\.bs".to_string()), ..PredicateArgs::default() }.build().unwrap(); - - assert!(predicate.matches(&sample_record(b"java.very.long.bs"))); - assert!(!predicate.matches(&sample_record(b"java.very.long.cs"))); - } - - #[test] - fn ignore_case_applies_to_fixed_string_and_regex() { - let fixed = PredicateArgs { fixed_string: Some("error".to_string()), ignore_case: true, ..PredicateArgs::default() }.build().unwrap(); - let regex = PredicateArgs { grep: Some("error".to_string()), ignore_case: true, ..PredicateArgs::default() }.build().unwrap(); - - let record = sample_record(b"prefix eRrOr suffix"); - assert!(fixed.matches(&record)); - assert!(regex.matches(&record)); - } - - #[test] - fn matcher_combines_with_record_fields() { - let predicate = PredicateArgs { - record_type: Some(RecordKind::Logs), - seq_min: Some(40), - seq_max: Some(45), - ts_min: Some(1_699_999_999), - ts_max: Some(1_700_000_001), - fixed_string: Some("hello".to_string()), - ..PredicateArgs::default() - } - .build() - .unwrap(); - - assert!(predicate.matches(&sample_record(b"hello world"))); - assert!(!predicate.matches(&sample_record(b"bye world"))); - } - - #[test] - fn invalid_regex_is_reported() { - let error = PredicateArgs { grep: Some("(".to_string()), ..PredicateArgs::default() }.build().unwrap_err(); - - assert!(error.to_string().contains("invalid payload matcher")); - } - - #[test] - fn parse_filter_query_treats_bare_text_as_fixed_string() { - let predicate = parse_filter_query("hello world", FilterMode::Strings).unwrap(); - assert!(predicate.matches(&sample_record(b"say hello world now"))); - assert!(!predicate.matches(&sample_record(b"say hello now"))); - } - - #[test] - fn parse_filter_query_supports_cli_style_flags() { - let predicate = parse_filter_query(r#"--type logs -e "error|panic" -i"#, FilterMode::Strings).unwrap(); - assert!(predicate.matches(&sample_record(b"PANIC happened"))); - assert!(!predicate.matches(&OwnedRecord { - record_type: RecordType::Metrics, - seq: 42, - ts_unix_ns: 1_700_000_000, - payload: b"panic".to_vec(), - })); - } - - #[test] - fn parse_filter_query_uses_regex_mode_for_bare_text() { - let predicate = parse_filter_query("reb.*", FilterMode::Regex).unwrap(); - assert!(predicate.matches(&sample_record(b"rebooted node"))); - assert!(!predicate.matches(&sample_record(b"stopped node"))); - } -} +#[path = "predicate_ut.rs"] +mod predicate_ut; diff --git a/ljx/src/predicate_ut.rs b/ljx/src/predicate_ut.rs new file mode 100644 index 0000000..1507438 --- /dev/null +++ b/ljx/src/predicate_ut.rs @@ -0,0 +1,78 @@ +use super::{FilterMode, PredicateArgs, RecordKind, parse_filter_query}; +use logjet::{OwnedRecord, RecordType}; + +fn sample_record(payload: &[u8]) -> OwnedRecord { + OwnedRecord { record_type: RecordType::Logs, seq: 42, ts_unix_ns: 1_700_000_000, payload: payload.to_vec() } +} + +#[test] +fn fixed_string_match_is_literal() { + let predicate = PredicateArgs { fixed_string: Some("java.crap.failed".to_string()), ..PredicateArgs::default() }.build().unwrap(); + + assert!(predicate.matches(&sample_record(b"xxx java.crap.failed yyy"))); + assert!(!predicate.matches(&sample_record(b"javaXcrapXfailed"))); +} + +#[test] +fn regex_match_supports_wildcards() { + let predicate = PredicateArgs { grep: Some(r"java\..*\.bs".to_string()), ..PredicateArgs::default() }.build().unwrap(); + + assert!(predicate.matches(&sample_record(b"java.very.long.bs"))); + assert!(!predicate.matches(&sample_record(b"java.very.long.cs"))); +} + +#[test] +fn ignore_case_applies_to_fixed_string_and_regex() { + let fixed = PredicateArgs { fixed_string: Some("error".to_string()), ignore_case: true, ..PredicateArgs::default() }.build().unwrap(); + let regex = PredicateArgs { grep: Some("error".to_string()), ignore_case: true, ..PredicateArgs::default() }.build().unwrap(); + + let record = sample_record(b"prefix eRrOr suffix"); + assert!(fixed.matches(&record)); + assert!(regex.matches(&record)); +} + +#[test] +fn matcher_combines_with_record_fields() { + let predicate = PredicateArgs { + record_type: Some(RecordKind::Logs), + seq_min: Some(40), + seq_max: Some(45), + ts_min: Some(1_699_999_999), + ts_max: Some(1_700_000_001), + fixed_string: Some("hello".to_string()), + ..PredicateArgs::default() + } + .build() + .unwrap(); + + assert!(predicate.matches(&sample_record(b"hello world"))); + assert!(!predicate.matches(&sample_record(b"bye world"))); +} + +#[test] +fn invalid_regex_is_reported() { + let error = PredicateArgs { grep: Some("(".to_string()), ..PredicateArgs::default() }.build().unwrap_err(); + + assert!(error.to_string().contains("invalid payload matcher")); +} + +#[test] +fn parse_filter_query_treats_bare_text_as_fixed_string() { + let predicate = parse_filter_query("hello world", FilterMode::Strings).unwrap(); + assert!(predicate.matches(&sample_record(b"say hello world now"))); + assert!(!predicate.matches(&sample_record(b"say hello now"))); +} + +#[test] +fn parse_filter_query_supports_cli_style_flags() { + let predicate = parse_filter_query(r#"--type logs -e "error|panic" -i"#, FilterMode::Strings).unwrap(); + assert!(predicate.matches(&sample_record(b"PANIC happened"))); + assert!(!predicate.matches(&OwnedRecord { record_type: RecordType::Metrics, seq: 42, ts_unix_ns: 1_700_000_000, payload: b"panic".to_vec() })); +} + +#[test] +fn parse_filter_query_uses_regex_mode_for_bare_text() { + let predicate = parse_filter_query("reb.*", FilterMode::Regex).unwrap(); + assert!(predicate.matches(&sample_record(b"rebooted node"))); + assert!(!predicate.matches(&sample_record(b"stopped node"))); +}