Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7f8c12b
tracing(phase1): persist + correlate in-process flow traces
darmie Jun 17, 2026
c5e3b07
tracing(phase1): handle compressed reply frames + e2e test
darmie Jun 17, 2026
3e94aeb
tracing(phase4): honest message fidelity — checksums + metrics
darmie Jun 17, 2026
f9e7e9d
tracing(phase2): unified cross-process traces via shared collector
darmie Jun 17, 2026
87501a5
tracing(phase3a): local trace-event tap on the network
darmie Jun 17, 2026
68639c8
tracing(phase3b): C ABI tracing surface for FFI SDKs
darmie Jun 17, 2026
f84902f
tracing(phase3c): first-class tracing in the Python SDK
darmie Jun 17, 2026
e83cf52
tracing(phase3c): first-class tracing in the Node SDK
darmie Jun 17, 2026
51f32c1
tracing(phase3d): Go SDK wrapper + regenerated C header
darmie Jun 17, 2026
2cadcd9
tracing(phase3d): C++ SDK TraceStream wrapper
darmie Jun 17, 2026
77db241
tracing(phase3d): JVM SDK TraceStream (JNI)
darmie Jun 17, 2026
f867491
tracing(review): bound the local trace tap to prevent unbounded growth
darmie Jun 17, 2026
ad72b56
docs(observability): reconcile with the now-real tracing API
darmie Jun 17, 2026
4ed44fb
tracing(storage): real DB backend integrations (sqlite fixed, postgre…
darmie Jun 17, 2026
e10933e
tracing(storage): TimescaleDB backend (hypertable on start_time)
darmie Jun 17, 2026
51eca2c
docs(storage): retire ClickHouse
darmie Jun 18, 2026
1c8340b
tracing(otlp): export finalized traces to OpenTelemetry / Monoscope
darmie Jun 18, 2026
a366a13
docs(storage): dedicated DB integration-guide chapters
darmie Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
716 changes: 705 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion crates/reflow_actor/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,8 @@ impl Message {
.map_err(|e| MessageError::Compression(e.to_string()))
}

fn type_name(&self) -> &'static str {
/// Stable variant name (e.g. `"Integer"`), used for trace message types.
pub fn type_name(&self) -> &'static str {
match self {
Message::Flow => "Flow",
Message::Event(_) => "Event",
Expand Down
3 changes: 3 additions & 0 deletions crates/reflow_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ wasm = []
[dev-dependencies]
wasm-bindgen-test = "0.3.50"
tracing-subscriber = "0.3"
# Embed the tracing collector in distributed e2e tests to assert that a flow
# spanning two processes is stitched into one trace on a shared collector.
reflow_tracing = { path = "../reflow_tracing" }

[[example]]
name = "distributed_example"
Expand Down
58 changes: 37 additions & 21 deletions crates/reflow_network/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,19 @@ impl Connector {
});
from_actor_load_count.dec();

// Send tracing event if tracing is enabled
// Send tracing event if tracing is enabled. The message itself
// is the content; the integration computes the checksum/size
// (and optionally captures content) per the configured knobs.
if let Some(ref tracing) = tracing_integration {
let message_size = std::mem::size_of_val(&msg);
use reflow_tracing_protocol::PerformanceMetrics;
let message_type = msg.type_name();
let _ = tracing
.trace_message_sent(
from_actor_id.clone(),
_from_port.clone(),
format!("{:?}", std::mem::discriminant(&msg)),
message_size,
message_type,
&msg,
PerformanceMetrics::default(),
)
.await;

Expand All @@ -158,8 +162,9 @@ impl Connector {
_from_port.clone(),
to_actor_id.clone(),
to_port.clone(),
format!("{:?}", std::mem::discriminant(&msg)),
message_size,
message_type,
&msg,
PerformanceMetrics::default(),
)
.await;
}
Expand Down Expand Up @@ -227,8 +232,9 @@ impl Connector {
continue;
};

let message_size = std::mem::size_of_val(&msg);
let msg_discriminant = format!("{:?}", std::mem::discriminant(&msg));
// Capture the message for tracing before it is moved into the
// downstream channel (only when tracing is active).
let traced_msg = tracing_integration.as_ref().map(|_| msg.clone());

let encodable = if let Message::Bytes(_) = &msg {
crate::message::EncodableValue::from(serde_json::Value::String(
Expand Down Expand Up @@ -259,13 +265,16 @@ impl Connector {
)
});

if let Some(ref tracing) = tracing_integration {
if let (Some(tracing), Some(content)) = (&tracing_integration, &traced_msg) {
use reflow_tracing_protocol::PerformanceMetrics;
let message_type = content.type_name();
let _ = tracing
.trace_message_sent(
from_actor_id.clone(),
from_port.clone(),
msg_discriminant.clone(),
message_size,
message_type,
content,
PerformanceMetrics::default(),
)
.await;
let _ = tracing
Expand All @@ -274,8 +283,9 @@ impl Connector {
from_port.clone(),
to_actor_id.clone(),
to_port.clone(),
msg_discriminant,
message_size,
message_type,
content,
PerformanceMetrics::default(),
)
.await;
}
Expand Down Expand Up @@ -333,9 +343,9 @@ impl Connector {
continue;
};

// Capture tracing info from &msg before moving
let message_size = std::mem::size_of_val(&msg);
let msg_discriminant = format!("{:?}", std::mem::discriminant(&msg));
// Capture the message for tracing before it is moved
// into the downstream channel (only when tracing is active).
let traced_msg = tracing_integration.as_ref().map(|_| msg.clone());

// Emit MessageSent event — skip expensive serialization for binary blobs
let encodable = if let Message::Bytes(_) = &msg {
Expand Down Expand Up @@ -376,13 +386,18 @@ impl Connector {
});

// Send tracing event if tracing is enabled
if let Some(ref tracing) = tracing_integration {
if let (Some(tracing), Some(content)) =
(&tracing_integration, &traced_msg)
{
use reflow_tracing_protocol::PerformanceMetrics;
let message_type = content.type_name();
let _ = tracing
.trace_message_sent(
from_actor_id.clone(),
from_port.clone(),
msg_discriminant.clone(),
message_size,
message_type,
content,
PerformanceMetrics::default(),
)
.await;

Expand All @@ -392,8 +407,9 @@ impl Connector {
from_port.clone(),
to_actor_id.clone(),
to_port.clone(),
msg_discriminant,
message_size,
message_type,
content,
PerformanceMetrics::default(),
)
.await;
}
Expand Down
40 changes: 40 additions & 0 deletions crates/reflow_network/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ mod tests {
target_port: "input".to_string(),
payload: crate::message::Message::String(std::sync::Arc::new("hello".to_string())),
timestamp: chrono::Utc::now(),
trace_context: None,
};
let result = router.handle_incoming_message(msg).await;
assert!(result.is_err());
Expand All @@ -137,6 +138,45 @@ mod tests {
let actors = router.get_local_actor_list();
assert!(actors.is_empty());
}

#[test]
fn remote_message_trace_context_is_wire_backward_compatible() {
use crate::router::{RemoteMessage, TraceContext};
use reflow_tracing_protocol::TraceId;

// An older peer sends no `trace_context`; it must deserialize to None
// rather than failing.
let legacy = r#"{
"message_id":"m1","source_network":"a","source_actor":"s",
"target_network":"b","target_actor":"t","target_port":"in",
"payload":{"type":"Flow"},"timestamp":"2020-01-01T00:00:00Z"
}"#;
let msg: RemoteMessage =
serde_json::from_str(legacy).expect("deserialize without trace_context");
assert!(msg.trace_context.is_none());

// A context round-trips and preserves the propagated trace id.
let trace_id = TraceId::new();
let with_ctx = RemoteMessage {
message_id: "m2".into(),
source_network: "a".into(),
source_actor: "s".into(),
target_network: "b".into(),
target_actor: "t".into(),
target_port: "in".into(),
payload: crate::message::Message::Flow,
timestamp: chrono::Utc::now(),
trace_context: Some(TraceContext {
trace_id: trace_id.clone(),
flow_id: None,
parent_span_id: "span-1".into(),
parent_event_id: None,
}),
};
let encoded = serde_json::to_string(&with_ctx).unwrap();
let decoded: RemoteMessage = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded.trace_context.unwrap().trace_id, trace_id);
}
}

// === 5.3: Subgraph composition tests ===
Expand Down
82 changes: 70 additions & 12 deletions crates/reflow_network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ use std::sync::{Arc, Mutex};
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
#[cfg_attr(target_arch = "wasm32", tsify(into_wasm_abi))]
#[cfg_attr(target_arch = "wasm32", tsify(from_wasm_abi))]
// Container-level default so callers can supply a partial config (e.g. just
// `tracing`) and have the rest fall back to defaults — important for SDKs that
// enable tracing through a config object.
#[serde(default)]
pub struct NetworkConfig {
pub compression: CompressionConfig,
pub tracing: TracingConfig,
Expand Down Expand Up @@ -156,6 +160,12 @@ pub struct Network {
event_handle: Vec<JsValue>,
compression_config: CompressionConfig,
pub(crate) tracing_integration: Option<TracingIntegration>,
/// Local tap for trace events, so SDKs can consume traces without a
/// collector. Fed by `tracing_integration` when tracing is enabled.
trace_event_emitter: (
flume::Sender<reflow_tracing_protocol::TraceEvent>,
flume::Receiver<reflow_tracing_protocol::TraceEvent>,
),
}

unsafe impl Send for Network {}
Expand Down Expand Up @@ -366,12 +376,26 @@ impl Network {
}
}

/// Capacity of the per-network local trace-event tap. Bounds memory when a
/// consumer lags or never drains; excess events are dropped best-effort.
const TRACE_TAP_CAPACITY: usize = 4096;

impl Network {
pub fn new(config: NetworkConfig) -> Self {
// Initialize tracing if enabled
// Local trace-event tap. Bounded so it can never grow without bound if
// nothing drains it; the tap is installed lazily (see
// `get_trace_receiver`) so when no local consumer exists, no events are
// buffered at all. `try_send` drops on a full buffer — best-effort,
// never blocks the data plane.
let trace_event_emitter = flume::bounded(TRACE_TAP_CAPACITY);

// Initialize tracing if enabled. The local tap is NOT wired here — it is
// attached on first `get_trace_receiver()` call so that enabling tracing
// purely for a remote collector costs no local buffering.
let tracing_integration = if config.tracing.enabled {
let client = TracingClient::new(config.tracing.clone());
Some(TracingIntegration::new(client))
let integration = TracingIntegration::new(client);
Some(integration)
} else {
None
};
Expand All @@ -390,6 +414,7 @@ impl Network {
event_handle: Vec::new(),
compression_config: config.compression,
tracing_integration,
trace_event_emitter,
}
}

Expand Down Expand Up @@ -810,26 +835,39 @@ impl Network {
timestamp,
});

// Trace the message being sent
// Trace the message being sent. The message is the content; the
// integration computes checksum/size per the configured knobs.
if let Some(ref tracing) = self.tracing_integration {
let message_type = format!("{:?}", std::mem::discriminant(&data));
let size_bytes = serde_json::to_string(&data).unwrap_or_default().len();
use reflow_tracing_protocol::PerformanceMetrics;
let tracing_clone = tracing.clone();
let id_clone = id.to_string();
let port_clone = port.to_string();
let content = data.clone();
#[cfg(not(target_arch = "wasm32"))]
{
tokio::runtime::Handle::current().spawn(async move {
let _ = tracing_clone
.trace_message_sent(&id_clone, &port_clone, &message_type, size_bytes)
.trace_message_sent(
&id_clone,
&port_clone,
content.type_name(),
&content,
PerformanceMetrics::default(),
)
.await;
});
}
#[cfg(target_arch = "wasm32")]
{
spawn_local(async move {
let _ = tracing_clone
.trace_message_sent(&id_clone, &port_clone, &message_type, size_bytes)
.trace_message_sent(
&id_clone,
&port_clone,
content.type_name(),
&content,
PerformanceMetrics::default(),
)
.await;
});
}
Expand Down Expand Up @@ -1367,11 +1405,15 @@ impl Network {
{
let tracing_integration = self.tracing_integration.clone();
tokio::runtime::Handle::current().spawn(async move {
// Shutdown tracing first to flush any pending events
if let Some(ref tracing) = tracing_integration
&& let Err(e) = tracing.client().shutdown().await
{
tracing::warn!("Failed to shutdown tracing client: {}", e);
if let Some(ref tracing) = tracing_integration {
// Finalize the session trace so it's persisted as Completed,
// then flush and close the client.
let _ = tracing
.end_flow_trace(reflow_tracing_protocol::ExecutionStatus::Completed)
.await;
if let Err(e) = tracing.client().shutdown().await {
tracing::warn!("Failed to shutdown tracing client: {}", e);
}
}
});
}
Expand Down Expand Up @@ -1498,6 +1540,22 @@ impl Network {
pub fn get_event_receiver(&self) -> flume::Receiver<NetworkEvent> {
self.network_event_emitter.1.clone()
}

/// Subscribe to this network's live trace events locally — no tracing
/// collector required. Each `TraceEvent` recorded while tracing is enabled
/// is delivered here in addition to being shipped to the configured server.
/// Returns an empty (never-fed) receiver when tracing is disabled.
///
/// Installs the local tap on first call (idempotent), so enabling tracing
/// solely for a collector buffers nothing locally. Call before `start()`
/// for full coverage. The tap is bounded ([`TRACE_TAP_CAPACITY`]); if a
/// consumer stops draining, new events are dropped rather than retained.
pub fn get_trace_receiver(&self) -> flume::Receiver<reflow_tracing_protocol::TraceEvent> {
if let Some(ref tracing) = self.tracing_integration {
tracing.set_local_tap(self.trace_event_emitter.0.clone());
}
self.trace_event_emitter.1.clone()
}
}

/// GraphNetwork
Expand Down
Loading
Loading