Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ pub struct NodeContext {
/// Channel for the node to emit structured view data for frontend consumption.
/// Like stats_tx, this is optional and best-effort.
pub view_data_tx: Option<mpsc::Sender<NodeViewDataUpdate>>,
/// Optional sender for engine-level control messages.
///
/// Allows nodes to send [`EngineControlMessage`] to the engine actor,
/// enabling cross-node control (e.g. sending `UpdateParams` to a sibling
/// node by name via [`EngineControlMessage::TuneNode`]).
///
/// Only provided in dynamic pipelines. `None` in oneshot/static
/// pipelines where the graph is fixed at build time.
pub engine_control_tx: Option<mpsc::Sender<crate::control::EngineControlMessage>>,
}

impl NodeContext {
Expand All @@ -348,6 +357,36 @@ impl NodeContext {
})
}

/// Send an `UpdateParams` control message to a sibling node by name.
///
/// This is a convenience wrapper around [`EngineControlMessage::TuneNode`]
/// that routes through the engine actor's control channel — the same path
/// the WebSocket/REST API uses.
///
/// Only works in dynamic pipelines (where `engine_control_tx` is `Some`).
///
/// # Errors
///
/// Returns a [`StreamKitError::Runtime`] if the engine control channel is
/// unavailable (oneshot pipeline) or closed (engine shut down).
pub async fn tune_sibling(
&self,
target_node_id: &str,
params: serde_json::Value,
) -> Result<(), StreamKitError> {
let tx = self.engine_control_tx.as_ref().ok_or_else(|| {
StreamKitError::Runtime(
"engine_control_tx not available (oneshot pipeline?)".to_string(),
)
})?;
tx.send(crate::control::EngineControlMessage::TuneNode {
node_id: target_node_id.to_string(),
message: crate::control::NodeControlMessage::UpdateParams(params),
})
.await
.map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string()))
}

/// Receives a packet from the given receiver, respecting the cancellation token if present.
/// Returns None if cancelled or if the channel is closed.
///
Expand Down
5 changes: 5 additions & 0 deletions crates/engine/src/dynamic_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ pub struct DynamicEngine {
pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter<u64>,
// Node state metric (1=running, 0=not running)
pub(super) node_state_gauge: opentelemetry::metrics::Gauge<u64>,
/// Clone of the engine's own control sender, handed to every node via
/// [`NodeContext::engine_control_tx`] so that nodes can emit
/// [`EngineControlMessage::TuneNode`] to sibling nodes.
pub(super) engine_control_tx: mpsc::Sender<EngineControlMessage>,
}
impl DynamicEngine {
const fn node_state_name(state: &NodeState) -> &'static str {
Expand Down Expand Up @@ -639,6 +643,7 @@ impl DynamicEngine {
video_pool: Some(self.video_pool.clone()),
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: Some(channels.view_data.clone()),
engine_control_tx: Some(self.engine_control_tx.clone()),
};

// 5. Spawn Node
Expand Down
3 changes: 2 additions & 1 deletion crates/engine/src/graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph(
audio_pool: audio_pool.clone(),
video_pool: video_pool.clone(),
pipeline_mode: streamkit_core::PipelineMode::Oneshot,
view_data_tx: None, // Stateless pipelines don't emit view data
view_data_tx: None, // Stateless pipelines don't emit view data
engine_control_tx: None, // Stateless pipelines don't support cross-node control
};

tracing::debug!("Starting task for node '{}'", name);
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl Engine {
#[cfg(feature = "dynamic")]
pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle {
let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY);
let engine_control_tx = control_tx.clone();
let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY);

let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY);
Expand Down Expand Up @@ -236,6 +237,7 @@ impl Engine {
.u64_gauge("node.state")
.with_description("Node state (1=running, 0=stopped/failed)")
.build(),
engine_control_tx,
};

let engine_task = tokio::spawn(dynamic_engine.run());
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/tests/connection_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::mpsc;
fn create_test_engine() -> DynamicEngine {
let (control_tx, control_rx) = mpsc::channel(32);
let (query_tx, query_rx) = mpsc::channel(32);
let engine_control_tx = control_tx.clone();
drop(control_tx);
drop(query_tx);

Expand Down Expand Up @@ -57,6 +58,7 @@ fn create_test_engine() -> DynamicEngine {
node_state_gauge: meter.u64_gauge("test.state").build(),
runtime_schemas: HashMap::new(),
runtime_schema_subscribers: Vec::new(),
engine_control_tx,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/tests/pipeline_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::sync::mpsc;
fn create_test_engine() -> DynamicEngine {
let (control_tx, control_rx) = mpsc::channel(32);
let (query_tx, query_rx) = mpsc::channel(32);
let engine_control_tx = control_tx.clone();
drop(control_tx);
drop(query_tx);

Expand Down Expand Up @@ -58,6 +59,7 @@ fn create_test_engine() -> DynamicEngine {
node_state_gauge: meter.u64_gauge("test.state").build(),
runtime_schemas: HashMap::new(),
runtime_schema_subscribers: Vec::new(),
engine_control_tx,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/nodes/src/audio/filters/resampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// Create node that downsamples from 48kHz to 24kHz
Expand Down Expand Up @@ -856,6 +857,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

let config = AudioResamplerConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/nodes/src/core/file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// Create and run node
Expand Down
2 changes: 2 additions & 0 deletions crates/nodes/src/core/file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// Create and run node
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// Create and run node with small chunk size for testing
Expand Down
4 changes: 4 additions & 0 deletions crates/nodes/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod json_serialize;
#[cfg(feature = "object_store")]
pub mod object_store_write;
pub mod pacer;
pub mod param_bridge;
mod passthrough;
#[cfg(feature = "script")]
pub mod script;
Expand Down Expand Up @@ -193,6 +194,9 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode
// --- Register TelemetryOut Node ---
telemetry_out::register(registry);

// --- Register ParamBridge Node ---
param_bridge::register(registry);

// --- Register ObjectStoreWriteNode ---
#[cfg(feature = "object_store")]
{
Expand Down
1 change: 1 addition & 0 deletions crates/nodes/src/core/object_store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// No credentials provided — should fail during init
Expand Down
1 change: 1 addition & 0 deletions crates/nodes/src/core/pacer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ mod tests {
video_pool: None,
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: None,
engine_control_tx: None,
};

// Create node with very fast speed to minimize test time
Expand Down
Loading
Loading