Skip to content

Flow tracing: close the gap (in-process + distributed + SDKs + storage + OTLP)#1

Merged
darmie merged 18 commits into
mainfrom
flow-tracing
Jun 18, 2026
Merged

Flow tracing: close the gap (in-process + distributed + SDKs + storage + OTLP)#1
darmie merged 18 commits into
mainfrom
flow-tracing

Conversation

@darmie

@darmie darmie commented Jun 18, 2026

Copy link
Copy Markdown
Member

Close the flow-tracing gap — in-process, distributed, SDKs, storage, OTLP

Flow tracing was aspirational scaffolding: events weren't correlated, the server never persisted, queries were stubs, and distributed flows produced disconnected traces. This branch makes it real end-to-end and extends it into the SDKs, durable storage backends, and an OpenTelemetry/Monoscope export path.

What's included

Phase 1 — in-process persistence & query. Client owns the trace_id; the server aggregates events into a FlowTrace (create on StartTrace, append per event, finalize to storage on EndTrace/disconnect); query/get correlate replies by RequestId. Fixed a latent bug where the server zstd-compressed replies into Binary frames the client never decoded (every SDK query would have hung 10s).

Phase 2 — unified distributed traces. RemoteMessage carries an optional, serde(default) TraceContext (wire-backward-compatible); route_message auto-attaches the local session trace; the receiving network records the cross-process hop under the propagated trace_id. With networks pointed at one shared collector, a flow spanning processes is one FlowTrace.

Phase 4 — honest data fidelity. SHA-256 "sha256:<hex>" content checksums over RFC-8785-style canonical JSON (cross-SDK deterministic); self-describing MessageSnapshot (codec/version/stored_bytes); PerformanceMetrics as Option<T> (unmeasured ≠ measured-zero) with pinned units; split capture_checksum/capture_content/enable_perf_sampling knobs.

Phase 3 — first-class tracing in every SDK. A local trace tap (no collector needed) + a C ABI surface, then idiomatic consumers: net.traces() in Python, Node, Go, C++, and the JVM (network.traces()), plus a collector-client over the C ABI. Bounded the local tap (lazy-installed) so it can't grow unbounded.

Storage backends (only the in-memory one worked before — the storage feature gating SQLite was never even declared):

  • SQLite turned on for real + three bug fixes (create-if-missing, write-through instead of a racy write buffer, cache eviction on delete)
  • PostgreSQL (--features postgres), TimescaleDB (hypertable on start_time), MongoDB (--features mongodb)
  • ClickHouse retired (was only ever planned)

OTLP / Monoscope export (--features otlp). Finalized traces export to any OpenTelemetry backend (Monoscope, Jaeger, Tempo, …) over OTLP/HTTP; hand-rolled OTLP/JSON since reflow's model is already span-shaped.

Testing

  • In-process e2e (record + query a correlated trace) and a cross-process unified-trace e2e (two networks + shared collector).
  • Storage roundtrip tests: SQLite runs in CI; Postgres/Timescale/Mongo gated on REFLOW_TEST_*_URL.
  • OTLP mapping unit test; live export gated on REFLOW_TEST_OTLP_ENDPOINT.
  • SDK smoke tests: Python (pytest), Node (node:test), Go (go test) all pass; C++ compiles+links+runs; JVM JNI compiles.

Docs

docs/observability/* reconciled with the now-real API (removed non-existent query_data_flows/custom_filter/etc., corrected signatures, documented the real fidelity/distributed model), plus dedicated per-database integration-guide chapters and a Monoscope/OTLP page.

Notes

  • Deep per-flow causal attribution of downstream work on receiving hosts is documented as a follow-up (Phase 2 stitches the boundary).
  • Postgres/Timescale/Mongo/OTLP runtime tests need a live server/endpoint (gated by env vars); everything compiles in all feature combinations.

darmie added 18 commits June 17, 2026 16:28
Make in-process flow tracing real end-to-end:

- protocol: client now owns trace_id (StartTrace carries it); add EndTrace
  and request/response correlation ids (RequestId) on Query/GetTrace pairs.
- client: stop stamping a fresh random trace_id per event — buffer
  (TraceId, TraceEvent) pairs so the id survives the batch path; parse the
  server's TracingResponse (was the wrong enum) and route replies to waiters
  via pending_requests, so query_traces/get_trace return real data instead
  of empty stubs. Add end_trace + TracingIntegration::{end_flow_trace,
  current_trace_id}.
- server: aggregate RecordEvents into a FlowTrace (active_traces map),
  create on StartTrace, append per event, finalize to storage on EndTrace
  and on client disconnect; GetTrace/QueryTraces merge in-flight + stored.
- network: send EndTrace on shutdown so the session trace is persisted.
- reflow_tracing: expose a lib surface so the server can be embedded (tests,
  SDK collectors).
The server compresses responses >1KB into zstd Binary frames, but the
client receiver only handled Text — so query_traces/get_trace replies were
silently dropped and the caller waited out the full 10s timeout. Decode
Binary frames (zstd, falling back to raw UTF-8) before routing to the
waiter.

Adds an end-to-end test that starts an embedded server, records three
events under one trace id, finalizes, and asserts get_trace/query_traces
return the correlated trace (plus an in-flight Running trace is visible
before EndTrace).
Fill MessageSnapshot and PerformanceMetrics honestly, per locked decisions.

- checksum module: content-only SHA-256 over RFC 8785-style canonical JSON,
  encoded "sha256:<hex>". Canonicalization rules documented as prose so any
  SDK/verifier can re-derive the bytes; routes through serde_json::Value for
  sorted keys + compact form. Reused once-computed canonical for size+content.
- MessageSnapshot: self-describing — content_codec, content_format_version,
  stored_bytes; size_bytes = pre-compression content size. capture() honors
  the two orthogonal toggles; invariant checksum==sha256(canonical(content)).
- PerformanceMetrics: Option<T> per field (None=unmeasured vs Some(0)); units
  pinned in docs; cheap-path constructor; heavy fields gated.
- TracingConfig: split into capture_checksum (on), capture_content (off),
  enable_perf_sampling (off).
- emit points (connector x3, network): pass the real Message as content so the
  checksum/size are computed for every traced message; type_name() for the
  message type; clone content only when tracing is active.
- Message::type_name() made pub.
- e2e test asserts the data-flow event carries a populated sha256 checksum and
  retains no content by default.
Propagate trace context across the bridge so a flow spanning processes is one
end-to-end trace on a shared collector.

- RemoteMessage gains an optional, serde(default) trace_context (TraceContext:
  trace_id + flow_id + parent span/event). Optional+defaulted keeps the peer
  wire backward-compatible — older peers omit it and deserialize to None.
- route_message auto-attaches the local network's session trace context (read
  from its TracingIntegration) — no proxy/bridge signature changes needed.
- handle_incoming_message records the cross-process hop (a DataFlow into the
  target actor) under the *propagated* trace id via the receiving network's
  client, fire-and-forget so tracing never blocks the data plane. With every
  network pointed at one collector, both processes' events land in one trace.
- Network guard is scoped so it's never held across the await.

Tests: wire back-compat unit test (legacy JSON -> None; round-trip preserves
trace id) and a cross_process_flow_is_one_unified_trace e2e that boots two
DistributedNetworks + a shared embedded collector and asserts the hop appears
in a single trace. reflow_tracing added as a dev-dependency for the collector.
Add a collector-free way to consume traces, the most ergonomic first-class
surface for SDKs: TracingIntegration gains an optional local tap (flume
sender); every recorded TraceEvent is fanned to it (best-effort, never blocks)
in addition to being shipped to the server. Network installs the tap when
tracing is enabled and exposes get_trace_receiver(), mirroring the existing
get_event_receiver() pattern. Routed all actor_* trace methods through the
shared record() path so they're tapped too. Unit test asserts the tap observes
ActorCreated/MessageSent with a populated content checksum.
Expose trace consumption through the C ABI (consumed by Go/JVM/C++), mirroring
the rfl_events poll pattern:

- Local tap: rfl_network_traces / rfl_traces_recv / rfl_traces_free poll a
  network's own trace events as JSON, no collector required.
- Collector client: rfl_trace_client_connect / _query (historical) /
  _subscribe + _recv (live) / _free, reusing TracingClient.

Protocol: TracingClient gains a notification tap so the receiver surfaces live
EventNotification pushes (from subscribe) to a pollable channel; the response
router now forwards notifications there. Enabling tracing stays via
NetworkConfig.tracing in the config JSON. reflow_tracing_protocol added as a
capi dependency.
- Network.traces() returns a TraceStream that yields live trace events as
  dicts via the local tap — no collector required. Mirrors the existing
  events()/EventStream pattern. Enable tracing via the config:
  Network({"tracing": {"server_url": "ws://...", "enabled": True}}).
- Ergonomics fix used by every SDK: TracingConfig and NetworkConfig are now
  #[serde(default)], so a partial config object (just the fields you care
  about) deserializes — previously a partial {server_url, enabled} failed with
  'missing field batch_size'/'compression'.
- Smoke test builds the SDK and asserts the trace stream yields ActorCreated
  plus a data-flow snapshot carrying a sha256 content checksum.
Network.traces() returns a TraceStream whose async recv() yields live trace
events as plain objects via the local tap — no collector required. Mirrors the
existing events()/EventStream pattern; the generated index.d.ts now declares
traces()/TraceStream so it's first-class in TypeScript. Enable via
new Network({ tracing: { server_url: "ws://...", enabled: true } }).
Smoke test asserts ActorCreated + a data-flow snapshot with a sha256 checksum.
- Go: Network.Traces() returns a TraceStream over the rfl_network_traces /
  rfl_traces_recv C ABI, mirroring Events()/EventStream. Tracing test passes
  against the dev-linked dylib.
- Regenerated the cbindgen C header with the new tracing symbols
  (rfl_network_traces, rfl_traces_recv/free, rfl_trace_client_connect/query/
  subscribe/recv/free) and synced it to sdk/go/include and sdk/cpp/include —
  so the Go and C++ SDKs can bind the trace surface.
Add an RAII TraceStream class + Network::traces() to reflow.hpp, mirroring
EventStream/Network::events() over the rfl_network_traces / rfl_traces_recv C
ABI. Verified: a smoke TU including the header compiles, links against the
capi dylib, and the trace API is callable.
Mirror EventStream for traces across the JNI boundary:
- Rust: Java_..._Network_nativeTraces returns a TraceStreamHandle over
  get_trace_receiver(); TraceStream_nativeRecv serializes each TraceEvent to
  JSON; TraceStream_nativeFree releases it. (native crate compiles.)
- Java: Network.traces() -> TraceStream; new TraceStream.java mirrors
  EventStream.java.
Enable via new Network("{\"tracing\":{\"server_url\":\"ws://...\",\"enabled\":true}}").
(Java not gradle-compiled here — no JDK 17 toolchain in this environment.)
Self-review finding: the local trace tap was an unbounded flume channel fed
whenever tracing was enabled (even when tracing is enabled only for a remote
collector and nothing consumes locally). record()'s try_send never drops on an
unbounded channel, so events accumulated forever — a memory leak.

Fix: make the tap bounded (TRACE_TAP_CAPACITY=4096) and install it lazily on
the first get_trace_receiver() call, so enabling tracing purely for a collector
buffers nothing locally, and a lagging consumer drops events rather than
retaining them. Bounded the C ABI subscribe notification channel likewise.
Go/python/node SDKs subscribe before start(), so local consumption is
unaffected (tests still green).
Bring the reflow_tracing collector docs in line with the implemented API:
- Remove non-existent APIs: query_data_flows, get_performance_metrics,
  TraceQuery.custom_filter/performance_filter, TraceEvent::data_flow_with_content,
  trace_enhanced/causally_linked_data_flow, DataFlowGraph, TracingConfig::
  from_env/from_file. Replace with the real TraceQuery fields + client-side
  filtering and the real signatures.
- Correct trace_data_flow/trace_message_sent to take (msg_type, &content,
  metrics); TracingConfig literals use ..default() (now #[serde(default)]).
- Document the real fidelity model: capture_checksum/capture_content knobs,
  sha256 content checksums, MessageSnapshot codec/version fields,
  PerformanceMetrics Option<T> semantics.
- Add the new capabilities: local trace tap + first-class SDK consumption
  (net.traces() across Python/Node/Go/C++/JVM + C ABI), and the distributed
  unified-trace-via-shared-collector model.
- Correct over-claims: bespoke stack (not OpenTelemetry/OTLP); only memory +
  sqlite storage exist (PostgreSQL/ClickHouse marked planned); state-diff/
  time-travel marked planned. Left the separate engine→Zeal pipeline pages
  (event-types.md, architecture.md) untouched.
…s, mongodb)

Only the in-memory backend actually worked: the 'storage' feature that gates
SqliteStorage was never declared, so SQLite silently fell back to an error stub.
Make durable storage real and add two more integrations.

- features: declare 'storage' (in default, enables SQLite); add 'postgres'
  (sqlx/postgres), 'mongodb' (mongodb+bson), and 'all-backends'. Backend chosen
  by storage.backend; an uncompiled backend returns a clear error.
- sqlite: fix three real bugs — (1) open with mode=rwc so a fresh DB file is
  created (plain sqlite:<path> errored on a missing file); (2) write-through in
  store_trace instead of a non-atomic write buffer that raced reads and deletes
  (a just-stored trace was invisible for ~1s; a delete could be undone by a
  later flush); (3) delete_trace now also evicts the read cache. Reads are
  buffer-aware as a defensive measure.
- postgres: new PostgresStorage (sqlx PgPool) mirroring the blob+denormalized
  model; ON CONFLICT upsert, parameterized queries, auto-created schema.
- mongodb: new MongoStorage (document per trace, _id=trace_id, indexed query
  fields).
- tests: storage_backends.rs — sqlite roundtrip runs in CI; postgres/mongodb
  gated on REFLOW_TEST_POSTGRES_URL / REFLOW_TEST_MONGODB_URL.
- docs: storage-backends + deployment updated — memory/sqlite default,
  postgres/mongodb behind features, clickhouse still planned.
TimescaleDB speaks the Postgres protocol, so the 'postgres' backend already
works against it. Add a dedicated 'timescale'/'timescaledb' backend that reuses
the [storage.postgres] config and converts the traces table into a hypertable
partitioned on start_time — the natural fit for time-series traces.

- PostgresStorage gains a timescale mode (new_timescale): composite key
  (trace_id, start_time) — a hypertable's unique key must include the partition
  column — best-effort CREATE EXTENSION + create_hypertable (7-day chunks,
  if_not_exists/migrate_data), and a trace_id index for point lookups. Degrades
  gracefully to a plain table (with a warning) if the extension is absent.
- upsert keys on (trace_id, start_time) in timescale mode and doesn't update the
  partition column; start_time is set once per trace so the key is stable.
- backend selection: 'timescale'/'timescaledb' arm; gated test on
  REFLOW_TEST_TIMESCALE_URL; docs section on hypertable + compression/retention.
Drop the ClickHouse backend from the docs entirely — it was only ever marked
planned. Remove the status-table row, the bullet, the full design-only section,
and the comparison-table column / migration example, replacing them with the
backends that actually exist (TimescaleDB, MongoDB).
Add an OTLP/HTTP export bridge in the collector so finalized traces ship to any
OpenTelemetry backend — Monoscope, Jaeger, Tempo, Honeycomb, an OTel Collector.

- otlp module: hand-rolled OTLP/JSON (ExportTraceServiceRequest) rather than the
  churn-prone OTel SDK — the wire format is stable and reflow's model is already
  span-shaped. FlowTrace maps to one trace (the 128-bit UUID is used directly as
  the OTel trace id) + a synthetic root span + one child span per TraceEvent,
  parented by causality.parent_event_id; DataFlow/checksum/ports become
  attributes, execution_time_ns the duration, ActorFailed an ERROR status. Ids
  are hex-encoded and timestamps decimal unix-nanos per the OTLP/JSON encoding.
- exporter (feature 'otlp', reqwest): POSTs to the endpoint's /v1/traces; a stub
  when the feature is off so the server holds Option<OtlpExporter> always.
- config: an [otlp] block (enabled, endpoint, service_name). The server builds
  the exporter when enabled; finalize_trace exports best-effort, fire-and-forget,
  never blocking, cloning only when an exporter is configured.
- tests: mapping unit test (trace/span id hex, parenting, checksum attr,
  duration); live POST gated on REFLOW_TEST_OTLP_ENDPOINT + --features otlp.
- docs: observability/otlp-export.md (Monoscope-focused) + TOC + overview link.
Split the database backends into their own practical integration-guide
chapters, grounded in the implemented config/schema/features:

- observability/storage/sqlite.md
- observability/storage/postgres.md
- observability/storage/timescaledb.md
- observability/storage/mongodb.md

Each covers: build feature, running the DB (docker), the exact [storage.*]
config block and all fields, the auto-created schema/document model,
verification (incl. the gated REFLOW_TEST_* integration tests), operations
(retention/compression/sharding/backups), and troubleshooting with the real
error messages.

Nest them under Storage Backends in SUMMARY.md and add an "Integration guides"
index to storage-backends.md (now the conceptual overview; the chapters are the
walkthroughs).
@darmie darmie merged commit d402974 into main Jun 18, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant