Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
391 changes: 388 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ fix: setup
cargo clippy $(CORE_WORKSPACE) --fix --all-targets --all-features --allow-dirty --allow-staged -- -D warnings

test: setup
cargo build -p logjetd -p ljx
cargo build -p otlp-demo --bin otlp-bofh-emitter
@if command -v cargo-nextest >/dev/null 2>&1; then \
cargo nextest run $(CORE_WORKSPACE); \
else \
Expand Down
4 changes: 4 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ It also contains scenario demos under subdirectories:
- five emitters running continuously into one `logjetd` and one live collector
- [`multi-client-behaviour`](./multi-client-behaviour)
- one replay client stalls while another keeps flowing
- [`replay-handoff`](./replay-handoff)
- a late replay client drains retained backlog and then continues live on the same connection
- [`file-replay`](./file-replay)
- replay stored `.logjet` files into a collector
- [`file-tooling`](./file-tooling)
- inspect rotated file segments and prune archived files deliberately
- [`tui-view`](./tui-view)
- generate 1000 randomized log entries and open `ljx view` on the result
- [`bridge-resume`](./bridge-resume)
- consumer restart resumes from persisted sequence state without replaying from zero
- [`upstream-reset-resume`](./upstream-reset-resume)
Expand Down
50 changes: 50 additions & 0 deletions demo/replay-handoff/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Replay Handoff Demo

This demo isolates the backlog-to-live handoff inside `logjetd serve`.

It proves one replay client can:

- connect after backlog already exists
- receive the retained backlog first
- stay on the same replay connection
- continue receiving new records through direct ingest wakeups

## Build First

From the project root:

```bash
make demo
```

## Run In Two Terminals

Terminal 1, from this directory:

```bash
./run-appliance.sh
```

This starts `logjetd`, writes three retained messages before any replay client
connects, and then keeps sending one live message per second.

Terminal 2, from this directory:

```bash
./run-consumer.sh
```

This starts the demo collector and then connects a replay client late through
the internal wire forwarder.

## What You Should See

On the collector side:

- `HANDOFF backlog 001`
- `HANDOFF backlog 002`
- `HANDOFF backlog 003`
- then ongoing `HANDOFF live` records every second

That transition should happen on one replay connection, without reconnecting
between backlog replay and live delivery.
7 changes: 7 additions & 0 deletions demo/replay-handoff/logjetd.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
output: file
file.path: ./spool
file.size: 256
file.name: handoff.logjet
ingest.protocol: otlp-http
ingest.listen: 127.0.0.1:4318
replay.listen: 127.0.0.1:7002
44 changes: 44 additions & 0 deletions demo/replay-handoff/run-appliance.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/sh
set -eu

SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
TARGET_DIR="$SCRIPT_DIR/../../target/debug"
LOGJETD="$TARGET_DIR/logjetd"
EMITTER="$TARGET_DIR/otlp-bofh-emitter"
CONFIG="$SCRIPT_DIR/logjetd.conf"

for bin in "$LOGJETD" "$EMITTER"; do
if [ ! -x "$bin" ]; then
echo "missing $bin"
echo "build everything first with: make demo"
exit 1
fi
done

mkdir -p "$SCRIPT_DIR/spool"
cd "$SCRIPT_DIR"

echo "starting logjetd with config $CONFIG"
"$LOGJETD" --config "$CONFIG" serve &
LOGJETD_PID=$!

cleanup() {
kill "${LIVE_PID:-}" 2>/dev/null || true
kill "${LOGJETD_PID:-}" 2>/dev/null || true
}

trap cleanup EXIT INT TERM

sleep 1

echo "writing retained backlog before any replay client connects"
"$EMITTER" 127.0.0.1:4318 --once --service-name handoff-demo --message "HANDOFF backlog 001"
"$EMITTER" 127.0.0.1:4318 --once --service-name handoff-demo --message "HANDOFF backlog 002"
"$EMITTER" 127.0.0.1:4318 --once --service-name handoff-demo --message "HANDOFF backlog 003"

echo "starting a live emitter after the backlog is already retained"
"$EMITTER" 127.0.0.1:4318 --service-name handoff-demo --interval-ms 1000 --message "HANDOFF live" &
LIVE_PID=$!

echo "appliance side is running; start ./run-consumer.sh in another terminal"
wait
37 changes: 37 additions & 0 deletions demo/replay-handoff/run-consumer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/sh
set -eu

SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
TARGET_DIR="$SCRIPT_DIR/../../target/debug"
COLLECTOR="$TARGET_DIR/otlp-demo-collector"
FORWARDER="$TARGET_DIR/otlp-wire-forwarder"

for bin in "$COLLECTOR" "$FORWARDER"; do
if [ ! -x "$bin" ]; then
echo "missing $bin"
echo "build everything first with: make demo"
exit 1
fi
done

cd "$SCRIPT_DIR"

echo "starting collector on 127.0.0.1:4320"
"$COLLECTOR" 127.0.0.1:4320 &
COLLECTOR_PID=$!

cleanup() {
kill "${FORWARDER_PID:-}" 2>/dev/null || true
kill "${COLLECTOR_PID:-}" 2>/dev/null || true
}

trap cleanup EXIT INT TERM

sleep 1

echo "connecting a late replay client to 127.0.0.1:7002"
"$FORWARDER" 127.0.0.1:7002 127.0.0.1:4320 &
FORWARDER_PID=$!

echo "you should first see HANDOFF backlog messages, then HANDOFF live records continue"
wait
27 changes: 7 additions & 20 deletions demo/src/bin/otlp-bofh-emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use std::time::Duration;

use prost::Message;

use otlp_demo::{
build_excuse_request, build_excuse_request_for_service_with_severity,
build_message_request_for_service, format_batch_plain,
};
use otlp_demo::{build_excuse_request, build_excuse_request_for_service_with_severity, build_message_request_for_service, format_batch_plain};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = env::args().skip(1);
Expand All @@ -24,17 +21,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
while let Some(arg) = args.next() {
match arg.as_str() {
"--count" => {
count = Some(
args.next()
.ok_or("missing value for --count")?
.parse::<u64>()?,
);
count = Some(args.next().ok_or("missing value for --count")?.parse::<u64>()?);
}
"--interval-ms" => {
interval_ms = args
.next()
.ok_or("missing value for --interval-ms")?
.parse::<u64>()?;
interval_ms = args.next().ok_or("missing value for --interval-ms")?.parse::<u64>()?;
}
"--once" => {
count = Some(1);
Expand All @@ -54,7 +44,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"--server-name" => {
server_name = Some(args.next().ok_or("missing value for --server-name")?);
}
value if value.starts_with("--") => return Err(format!("unknown argument: {value}").into()),
value if value.starts_with("--") => {
return Err(format!("unknown argument: {value}").into());
}
value => addr = value.to_string(),
}
}
Expand All @@ -74,12 +66,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
None => build_excuse_request_for_service_with_severity(sequence, &service_name, &severity),
};
print!("{}", format_batch_plain(&request));
match otlp_demo::post_raw_otlp_http(
&addr,
&request.encode_to_vec(),
ca_file.as_deref(),
server_name.as_deref(),
) {
match otlp_demo::post_raw_otlp_http(&addr, &request.encode_to_vec(), ca_file.as_deref(), server_name.as_deref()) {
Ok(()) => eprintln!("sent OTLP log batch #{sequence} to {display_target}"),
Err(err) => eprintln!("send failed for batch #{sequence}: {err}"),
}
Expand Down
19 changes: 4 additions & 15 deletions demo/src/bin/otlp-bofh-grpc-emitter.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
use std::env;
use std::time::Duration;

use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
};
use opentelemetry_proto::tonic::collector::logs::v1::{ExportLogsServiceRequest, logs_service_client::LogsServiceClient};
use tonic::Request;

use otlp_demo::{build_excuse_request, format_batch_plain};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:4317".to_string());
let endpoint = if addr.starts_with("http://") || addr.starts_with("https://") {
addr
} else {
format!("http://{addr}")
};
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:4317".to_string());
let endpoint = if addr.starts_with("http://") || addr.starts_with("https://") { addr } else { format!("http://{addr}") };

eprintln!("otlp-bofh-grpc-emitter sending OTLP logs to {endpoint}");

Expand All @@ -35,10 +27,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

async fn send_batch(
endpoint: &str,
request: ExportLogsServiceRequest,
) -> Result<(), Box<dyn std::error::Error>> {
async fn send_batch(endpoint: &str, request: ExportLogsServiceRequest) -> Result<(), Box<dyn std::error::Error>> {
let mut client = LogsServiceClient::connect(endpoint.to_string()).await?;
client.export(Request::new(request)).await?;
Ok(())
Expand Down
56 changes: 13 additions & 43 deletions demo/src/bin/otlp-demo-collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match arg.as_str() {
"--tls" => tls = true,
"--delay-ms" => {
delay_ms = args
.next()
.ok_or("missing value for --delay-ms")?
.parse::<u64>()?;
delay_ms = args.next().ok_or("missing value for --delay-ms")?.parse::<u64>()?;
}
"--cert-file" => {
cert_file = Some(PathBuf::from(args.next().ok_or("missing value for --cert-file")?));
Expand Down Expand Up @@ -66,8 +63,7 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
request.respond(response)?;
}
Err(err) => {
let response = Response::from_string(format!("decode error: {err}"))
.with_status_code(StatusCode(400));
let response = Response::from_string(format!("decode error: {err}")).with_status_code(StatusCode(400));
request.respond(response)?;
}
}
Expand All @@ -77,10 +73,7 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

fn run_tls(
bind_addr: &str,
cert_file: Option<PathBuf>,
key_file: Option<PathBuf>,
delay_ms: u64,
bind_addr: &str, cert_file: Option<PathBuf>, key_file: Option<PathBuf>, delay_ms: u64,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cert_file = cert_file.ok_or("missing --cert-file for --tls")?;
let key_file = key_file.ok_or("missing --key-file for --tls")?;
Expand All @@ -102,9 +95,7 @@ fn run_tls(
}

fn handle_tls_client(
stream: std::net::TcpStream,
config: Arc<rustls::ServerConfig>,
delay_ms: u64,
stream: std::net::TcpStream, config: Arc<rustls::ServerConfig>, delay_ms: u64,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let conn = ServerConnection::new(config)?;
let mut transport = StreamOwned::new(conn, stream);
Expand All @@ -115,8 +106,7 @@ fn handle_tls_client(
}

fn handle_tls_http_request(
transport: &mut StreamOwned<ServerConnection, std::net::TcpStream>,
delay_ms: u64,
transport: &mut StreamOwned<ServerConnection, std::net::TcpStream>, delay_ms: u64,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let request = read_http_request(transport)?;
if request.method != "POST" || request.path != "/v1/logs" {
Expand Down Expand Up @@ -161,35 +151,23 @@ fn read_http_request<T: Read>(transport: &mut T) -> io::Result<ParsedHttpRequest
}
}

let header = std::str::from_utf8(&buffer[..buffer.len() - 4])
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid http header"))?;
let header = std::str::from_utf8(&buffer[..buffer.len() - 4]).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid http header"))?;
let mut lines = header.lines();
let request_line = lines
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
let request_line = lines.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
let mut parts = request_line.split_whitespace();
let method = parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?
.to_string();
let path = parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?
.to_string();
let method = parts.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?.to_string();
let path = parts.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?.to_string();

let mut content_length = None;
for line in lines {
if let Some((name, value)) = line.split_once(':')
&& name.eq_ignore_ascii_case("content-length")
{
content_length = Some(value.trim().parse::<usize>().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "invalid content-length")
})?);
content_length = Some(value.trim().parse::<usize>().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid content-length"))?);
}
}

let content_length = content_length
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing content-length"))?;
let content_length = content_length.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing content-length"))?;
let mut body = Vec::new();
transport.take(content_length as u64).read_to_end(&mut body)?;
if body.len() != content_length {
Expand All @@ -206,17 +184,9 @@ fn write_http_response<T: Write>(transport: &mut T, status: u16, body: &str) ->
404 => "Not Found",
_ => "Error",
};
write!(
transport,
"HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
status_text,
body.len(),
body
)?;
write!(transport, "HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", status, status_text, body.len(), body)?;
transport.flush()
}
fn content_type_header() -> Header {
Header::from_bytes(&b"Content-Type"[..], &b"application/x-protobuf"[..])
.expect("static content-type header is valid")
Header::from_bytes(&b"Content-Type"[..], &b"application/x-protobuf"[..]).expect("static content-type header is valid")
}
Loading