Flow tracing: close the gap (in-process + distributed + SDKs + storage + OTLP)#1
Merged
Conversation
Make in-process flow tracing real end-to-end:
- protocol: client now owns trace_id (StartTrace carries it); add EndTrace
and request/response correlation ids (RequestId) on Query/GetTrace pairs.
- client: stop stamping a fresh random trace_id per event — buffer
(TraceId, TraceEvent) pairs so the id survives the batch path; parse the
server's TracingResponse (was the wrong enum) and route replies to waiters
via pending_requests, so query_traces/get_trace return real data instead
of empty stubs. Add end_trace + TracingIntegration::{end_flow_trace,
current_trace_id}.
- server: aggregate RecordEvents into a FlowTrace (active_traces map),
create on StartTrace, append per event, finalize to storage on EndTrace
and on client disconnect; GetTrace/QueryTraces merge in-flight + stored.
- network: send EndTrace on shutdown so the session trace is persisted.
- reflow_tracing: expose a lib surface so the server can be embedded (tests,
SDK collectors).
The server compresses responses >1KB into zstd Binary frames, but the client receiver only handled Text — so query_traces/get_trace replies were silently dropped and the caller waited out the full 10s timeout. Decode Binary frames (zstd, falling back to raw UTF-8) before routing to the waiter. Adds an end-to-end test that starts an embedded server, records three events under one trace id, finalizes, and asserts get_trace/query_traces return the correlated trace (plus an in-flight Running trace is visible before EndTrace).
Fill MessageSnapshot and PerformanceMetrics honestly, per locked decisions. - checksum module: content-only SHA-256 over RFC 8785-style canonical JSON, encoded "sha256:<hex>". Canonicalization rules documented as prose so any SDK/verifier can re-derive the bytes; routes through serde_json::Value for sorted keys + compact form. Reused once-computed canonical for size+content. - MessageSnapshot: self-describing — content_codec, content_format_version, stored_bytes; size_bytes = pre-compression content size. capture() honors the two orthogonal toggles; invariant checksum==sha256(canonical(content)). - PerformanceMetrics: Option<T> per field (None=unmeasured vs Some(0)); units pinned in docs; cheap-path constructor; heavy fields gated. - TracingConfig: split into capture_checksum (on), capture_content (off), enable_perf_sampling (off). - emit points (connector x3, network): pass the real Message as content so the checksum/size are computed for every traced message; type_name() for the message type; clone content only when tracing is active. - Message::type_name() made pub. - e2e test asserts the data-flow event carries a populated sha256 checksum and retains no content by default.
Propagate trace context across the bridge so a flow spanning processes is one end-to-end trace on a shared collector. - RemoteMessage gains an optional, serde(default) trace_context (TraceContext: trace_id + flow_id + parent span/event). Optional+defaulted keeps the peer wire backward-compatible — older peers omit it and deserialize to None. - route_message auto-attaches the local network's session trace context (read from its TracingIntegration) — no proxy/bridge signature changes needed. - handle_incoming_message records the cross-process hop (a DataFlow into the target actor) under the *propagated* trace id via the receiving network's client, fire-and-forget so tracing never blocks the data plane. With every network pointed at one collector, both processes' events land in one trace. - Network guard is scoped so it's never held across the await. Tests: wire back-compat unit test (legacy JSON -> None; round-trip preserves trace id) and a cross_process_flow_is_one_unified_trace e2e that boots two DistributedNetworks + a shared embedded collector and asserts the hop appears in a single trace. reflow_tracing added as a dev-dependency for the collector.
Add a collector-free way to consume traces, the most ergonomic first-class surface for SDKs: TracingIntegration gains an optional local tap (flume sender); every recorded TraceEvent is fanned to it (best-effort, never blocks) in addition to being shipped to the server. Network installs the tap when tracing is enabled and exposes get_trace_receiver(), mirroring the existing get_event_receiver() pattern. Routed all actor_* trace methods through the shared record() path so they're tapped too. Unit test asserts the tap observes ActorCreated/MessageSent with a populated content checksum.
Expose trace consumption through the C ABI (consumed by Go/JVM/C++), mirroring the rfl_events poll pattern: - Local tap: rfl_network_traces / rfl_traces_recv / rfl_traces_free poll a network's own trace events as JSON, no collector required. - Collector client: rfl_trace_client_connect / _query (historical) / _subscribe + _recv (live) / _free, reusing TracingClient. Protocol: TracingClient gains a notification tap so the receiver surfaces live EventNotification pushes (from subscribe) to a pollable channel; the response router now forwards notifications there. Enabling tracing stays via NetworkConfig.tracing in the config JSON. reflow_tracing_protocol added as a capi dependency.
- Network.traces() returns a TraceStream that yields live trace events as
dicts via the local tap — no collector required. Mirrors the existing
events()/EventStream pattern. Enable tracing via the config:
Network({"tracing": {"server_url": "ws://...", "enabled": True}}).
- Ergonomics fix used by every SDK: TracingConfig and NetworkConfig are now
#[serde(default)], so a partial config object (just the fields you care
about) deserializes — previously a partial {server_url, enabled} failed with
'missing field batch_size'/'compression'.
- Smoke test builds the SDK and asserts the trace stream yields ActorCreated
plus a data-flow snapshot carrying a sha256 content checksum.
Network.traces() returns a TraceStream whose async recv() yields live trace
events as plain objects via the local tap — no collector required. Mirrors the
existing events()/EventStream pattern; the generated index.d.ts now declares
traces()/TraceStream so it's first-class in TypeScript. Enable via
new Network({ tracing: { server_url: "ws://...", enabled: true } }).
Smoke test asserts ActorCreated + a data-flow snapshot with a sha256 checksum.
- Go: Network.Traces() returns a TraceStream over the rfl_network_traces / rfl_traces_recv C ABI, mirroring Events()/EventStream. Tracing test passes against the dev-linked dylib. - Regenerated the cbindgen C header with the new tracing symbols (rfl_network_traces, rfl_traces_recv/free, rfl_trace_client_connect/query/ subscribe/recv/free) and synced it to sdk/go/include and sdk/cpp/include — so the Go and C++ SDKs can bind the trace surface.
Add an RAII TraceStream class + Network::traces() to reflow.hpp, mirroring EventStream/Network::events() over the rfl_network_traces / rfl_traces_recv C ABI. Verified: a smoke TU including the header compiles, links against the capi dylib, and the trace API is callable.
Mirror EventStream for traces across the JNI boundary:
- Rust: Java_..._Network_nativeTraces returns a TraceStreamHandle over
get_trace_receiver(); TraceStream_nativeRecv serializes each TraceEvent to
JSON; TraceStream_nativeFree releases it. (native crate compiles.)
- Java: Network.traces() -> TraceStream; new TraceStream.java mirrors
EventStream.java.
Enable via new Network("{\"tracing\":{\"server_url\":\"ws://...\",\"enabled\":true}}").
(Java not gradle-compiled here — no JDK 17 toolchain in this environment.)
Self-review finding: the local trace tap was an unbounded flume channel fed whenever tracing was enabled (even when tracing is enabled only for a remote collector and nothing consumes locally). record()'s try_send never drops on an unbounded channel, so events accumulated forever — a memory leak. Fix: make the tap bounded (TRACE_TAP_CAPACITY=4096) and install it lazily on the first get_trace_receiver() call, so enabling tracing purely for a collector buffers nothing locally, and a lagging consumer drops events rather than retaining them. Bounded the C ABI subscribe notification channel likewise. Go/python/node SDKs subscribe before start(), so local consumption is unaffected (tests still green).
Bring the reflow_tracing collector docs in line with the implemented API: - Remove non-existent APIs: query_data_flows, get_performance_metrics, TraceQuery.custom_filter/performance_filter, TraceEvent::data_flow_with_content, trace_enhanced/causally_linked_data_flow, DataFlowGraph, TracingConfig:: from_env/from_file. Replace with the real TraceQuery fields + client-side filtering and the real signatures. - Correct trace_data_flow/trace_message_sent to take (msg_type, &content, metrics); TracingConfig literals use ..default() (now #[serde(default)]). - Document the real fidelity model: capture_checksum/capture_content knobs, sha256 content checksums, MessageSnapshot codec/version fields, PerformanceMetrics Option<T> semantics. - Add the new capabilities: local trace tap + first-class SDK consumption (net.traces() across Python/Node/Go/C++/JVM + C ABI), and the distributed unified-trace-via-shared-collector model. - Correct over-claims: bespoke stack (not OpenTelemetry/OTLP); only memory + sqlite storage exist (PostgreSQL/ClickHouse marked planned); state-diff/ time-travel marked planned. Left the separate engine→Zeal pipeline pages (event-types.md, architecture.md) untouched.
…s, mongodb) Only the in-memory backend actually worked: the 'storage' feature that gates SqliteStorage was never declared, so SQLite silently fell back to an error stub. Make durable storage real and add two more integrations. - features: declare 'storage' (in default, enables SQLite); add 'postgres' (sqlx/postgres), 'mongodb' (mongodb+bson), and 'all-backends'. Backend chosen by storage.backend; an uncompiled backend returns a clear error. - sqlite: fix three real bugs — (1) open with mode=rwc so a fresh DB file is created (plain sqlite:<path> errored on a missing file); (2) write-through in store_trace instead of a non-atomic write buffer that raced reads and deletes (a just-stored trace was invisible for ~1s; a delete could be undone by a later flush); (3) delete_trace now also evicts the read cache. Reads are buffer-aware as a defensive measure. - postgres: new PostgresStorage (sqlx PgPool) mirroring the blob+denormalized model; ON CONFLICT upsert, parameterized queries, auto-created schema. - mongodb: new MongoStorage (document per trace, _id=trace_id, indexed query fields). - tests: storage_backends.rs — sqlite roundtrip runs in CI; postgres/mongodb gated on REFLOW_TEST_POSTGRES_URL / REFLOW_TEST_MONGODB_URL. - docs: storage-backends + deployment updated — memory/sqlite default, postgres/mongodb behind features, clickhouse still planned.
TimescaleDB speaks the Postgres protocol, so the 'postgres' backend already works against it. Add a dedicated 'timescale'/'timescaledb' backend that reuses the [storage.postgres] config and converts the traces table into a hypertable partitioned on start_time — the natural fit for time-series traces. - PostgresStorage gains a timescale mode (new_timescale): composite key (trace_id, start_time) — a hypertable's unique key must include the partition column — best-effort CREATE EXTENSION + create_hypertable (7-day chunks, if_not_exists/migrate_data), and a trace_id index for point lookups. Degrades gracefully to a plain table (with a warning) if the extension is absent. - upsert keys on (trace_id, start_time) in timescale mode and doesn't update the partition column; start_time is set once per trace so the key is stable. - backend selection: 'timescale'/'timescaledb' arm; gated test on REFLOW_TEST_TIMESCALE_URL; docs section on hypertable + compression/retention.
Drop the ClickHouse backend from the docs entirely — it was only ever marked planned. Remove the status-table row, the bullet, the full design-only section, and the comparison-table column / migration example, replacing them with the backends that actually exist (TimescaleDB, MongoDB).
Add an OTLP/HTTP export bridge in the collector so finalized traces ship to any OpenTelemetry backend — Monoscope, Jaeger, Tempo, Honeycomb, an OTel Collector. - otlp module: hand-rolled OTLP/JSON (ExportTraceServiceRequest) rather than the churn-prone OTel SDK — the wire format is stable and reflow's model is already span-shaped. FlowTrace maps to one trace (the 128-bit UUID is used directly as the OTel trace id) + a synthetic root span + one child span per TraceEvent, parented by causality.parent_event_id; DataFlow/checksum/ports become attributes, execution_time_ns the duration, ActorFailed an ERROR status. Ids are hex-encoded and timestamps decimal unix-nanos per the OTLP/JSON encoding. - exporter (feature 'otlp', reqwest): POSTs to the endpoint's /v1/traces; a stub when the feature is off so the server holds Option<OtlpExporter> always. - config: an [otlp] block (enabled, endpoint, service_name). The server builds the exporter when enabled; finalize_trace exports best-effort, fire-and-forget, never blocking, cloning only when an exporter is configured. - tests: mapping unit test (trace/span id hex, parenting, checksum attr, duration); live POST gated on REFLOW_TEST_OTLP_ENDPOINT + --features otlp. - docs: observability/otlp-export.md (Monoscope-focused) + TOC + overview link.
Split the database backends into their own practical integration-guide chapters, grounded in the implemented config/schema/features: - observability/storage/sqlite.md - observability/storage/postgres.md - observability/storage/timescaledb.md - observability/storage/mongodb.md Each covers: build feature, running the DB (docker), the exact [storage.*] config block and all fields, the auto-created schema/document model, verification (incl. the gated REFLOW_TEST_* integration tests), operations (retention/compression/sharding/backups), and troubleshooting with the real error messages. Nest them under Storage Backends in SUMMARY.md and add an "Integration guides" index to storage-backends.md (now the conceptual overview; the chapters are the walkthroughs).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Close the flow-tracing gap — in-process, distributed, SDKs, storage, OTLP
Flow tracing was aspirational scaffolding: events weren't correlated, the server never persisted, queries were stubs, and distributed flows produced disconnected traces. This branch makes it real end-to-end and extends it into the SDKs, durable storage backends, and an OpenTelemetry/Monoscope export path.
What's included
Phase 1 — in-process persistence & query. Client owns the
trace_id; the server aggregates events into aFlowTrace(create onStartTrace, append per event, finalize to storage onEndTrace/disconnect);query/getcorrelate replies byRequestId. Fixed a latent bug where the server zstd-compressed replies into Binary frames the client never decoded (every SDK query would have hung 10s).Phase 2 — unified distributed traces.
RemoteMessagecarries an optional,serde(default)TraceContext(wire-backward-compatible);route_messageauto-attaches the local session trace; the receiving network records the cross-process hop under the propagatedtrace_id. With networks pointed at one shared collector, a flow spanning processes is oneFlowTrace.Phase 4 — honest data fidelity. SHA-256
"sha256:<hex>"content checksums over RFC-8785-style canonical JSON (cross-SDK deterministic); self-describingMessageSnapshot(codec/version/stored_bytes);PerformanceMetricsasOption<T>(unmeasured ≠ measured-zero) with pinned units; splitcapture_checksum/capture_content/enable_perf_samplingknobs.Phase 3 — first-class tracing in every SDK. A local trace tap (no collector needed) + a C ABI surface, then idiomatic consumers:
net.traces()in Python, Node, Go, C++, and the JVM (network.traces()), plus a collector-client over the C ABI. Bounded the local tap (lazy-installed) so it can't grow unbounded.Storage backends (only the in-memory one worked before — the
storagefeature gating SQLite was never even declared):--features postgres), TimescaleDB (hypertable onstart_time), MongoDB (--features mongodb)OTLP / Monoscope export (
--features otlp). Finalized traces export to any OpenTelemetry backend (Monoscope, Jaeger, Tempo, …) over OTLP/HTTP; hand-rolled OTLP/JSON since reflow's model is already span-shaped.Testing
REFLOW_TEST_*_URL.REFLOW_TEST_OTLP_ENDPOINT.go test) all pass; C++ compiles+links+runs; JVM JNI compiles.Docs
docs/observability/*reconciled with the now-real API (removed non-existentquery_data_flows/custom_filter/etc., corrected signatures, documented the real fidelity/distributed model), plus dedicated per-database integration-guide chapters and a Monoscope/OTLP page.Notes