diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index df438925..e88d9339 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -333,6 +333,15 @@ pub struct NodeContext { /// Channel for the node to emit structured view data for frontend consumption. /// Like stats_tx, this is optional and best-effort. pub view_data_tx: Option>, + /// Optional sender for engine-level control messages. + /// + /// Allows nodes to send [`EngineControlMessage`] to the engine actor, + /// enabling cross-node control (e.g. sending `UpdateParams` to a sibling + /// node by name via [`EngineControlMessage::TuneNode`]). + /// + /// Only provided in dynamic pipelines. `None` in oneshot/static + /// pipelines where the graph is fixed at build time. + pub engine_control_tx: Option>, } impl NodeContext { @@ -348,6 +357,36 @@ impl NodeContext { }) } + /// Send an `UpdateParams` control message to a sibling node by name. + /// + /// This is a convenience wrapper around [`EngineControlMessage::TuneNode`] + /// that routes through the engine actor's control channel — the same path + /// the WebSocket/REST API uses. + /// + /// Only works in dynamic pipelines (where `engine_control_tx` is `Some`). + /// + /// # Errors + /// + /// Returns a [`StreamKitError::Runtime`] if the engine control channel is + /// unavailable (oneshot pipeline) or closed (engine shut down). + pub async fn tune_sibling( + &self, + target_node_id: &str, + params: serde_json::Value, + ) -> Result<(), StreamKitError> { + let tx = self.engine_control_tx.as_ref().ok_or_else(|| { + StreamKitError::Runtime( + "engine_control_tx not available (oneshot pipeline?)".to_string(), + ) + })?; + tx.send(crate::control::EngineControlMessage::TuneNode { + node_id: target_node_id.to_string(), + message: crate::control::NodeControlMessage::UpdateParams(params), + }) + .await + .map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string())) + } + /// Receives a packet from the given receiver, respecting the cancellation token if present. /// Returns None if cancelled or if the channel is closed. /// diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index c0777001..1c14486c 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -124,6 +124,10 @@ pub struct DynamicEngine { pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter, // Node state metric (1=running, 0=not running) pub(super) node_state_gauge: opentelemetry::metrics::Gauge, + /// Clone of the engine's own control sender, handed to every node via + /// [`NodeContext::engine_control_tx`] so that nodes can emit + /// [`EngineControlMessage::TuneNode`] to sibling nodes. + pub(super) engine_control_tx: mpsc::Sender, } impl DynamicEngine { const fn node_state_name(state: &NodeState) -> &'static str { @@ -639,6 +643,7 @@ impl DynamicEngine { video_pool: Some(self.video_pool.clone()), pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: Some(channels.view_data.clone()), + engine_control_tx: Some(self.engine_control_tx.clone()), }; // 5. Spawn Node diff --git a/crates/engine/src/graph_builder.rs b/crates/engine/src/graph_builder.rs index a3c542a6..9d3709ea 100644 --- a/crates/engine/src/graph_builder.rs +++ b/crates/engine/src/graph_builder.rs @@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph( audio_pool: audio_pool.clone(), video_pool: video_pool.clone(), pipeline_mode: streamkit_core::PipelineMode::Oneshot, - view_data_tx: None, // Stateless pipelines don't emit view data + view_data_tx: None, // Stateless pipelines don't emit view data + engine_control_tx: None, // Stateless pipelines don't support cross-node control }; tracing::debug!("Starting task for node '{}'", name); diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 83fba63e..e7405516 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -155,6 +155,7 @@ impl Engine { #[cfg(feature = "dynamic")] pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle { let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY); + let engine_control_tx = control_tx.clone(); let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY); let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY); @@ -236,6 +237,7 @@ impl Engine { .u64_gauge("node.state") .with_description("Node state (1=running, 0=stopped/failed)") .build(), + engine_control_tx, }; let engine_task = tokio::spawn(dynamic_engine.run()); diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index 45f62e94..f69748a0 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -18,6 +18,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -57,6 +58,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, } } diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index 7792d7e1..790c7baa 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -58,6 +59,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, } } diff --git a/crates/nodes/src/audio/filters/resampler.rs b/crates/nodes/src/audio/filters/resampler.rs index 0b093328..4935bfac 100644 --- a/crates/nodes/src/audio/filters/resampler.rs +++ b/crates/nodes/src/audio/filters/resampler.rs @@ -776,6 +776,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node that downsamples from 48kHz to 24kHz @@ -856,6 +857,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; let config = AudioResamplerConfig { diff --git a/crates/nodes/src/core/file_read.rs b/crates/nodes/src/core/file_read.rs index 1f1a6844..e05d76df 100644 --- a/crates/nodes/src/core/file_read.rs +++ b/crates/nodes/src/core/file_read.rs @@ -246,6 +246,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node diff --git a/crates/nodes/src/core/file_write.rs b/crates/nodes/src/core/file_write.rs index eab8f075..4b301540 100644 --- a/crates/nodes/src/core/file_write.rs +++ b/crates/nodes/src/core/file_write.rs @@ -208,6 +208,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node @@ -292,6 +293,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/core/mod.rs b/crates/nodes/src/core/mod.rs index 98c0e0a6..2d75426e 100644 --- a/crates/nodes/src/core/mod.rs +++ b/crates/nodes/src/core/mod.rs @@ -13,6 +13,7 @@ pub mod json_serialize; #[cfg(feature = "object_store")] pub mod object_store_write; pub mod pacer; +pub mod param_bridge; mod passthrough; #[cfg(feature = "script")] pub mod script; @@ -193,6 +194,9 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode // --- Register TelemetryOut Node --- telemetry_out::register(registry); + // --- Register ParamBridge Node --- + param_bridge::register(registry); + // --- Register ObjectStoreWriteNode --- #[cfg(feature = "object_store")] { diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index 680625eb..aa8a61c4 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -799,6 +799,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // No credentials provided — should fail during init diff --git a/crates/nodes/src/core/pacer.rs b/crates/nodes/src/core/pacer.rs index 94a8a6ce..2f8a75fe 100644 --- a/crates/nodes/src/core/pacer.rs +++ b/crates/nodes/src/core/pacer.rs @@ -507,6 +507,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node with very fast speed to minimize test time diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs new file mode 100644 index 00000000..02970442 --- /dev/null +++ b/crates/nodes/src/core/param_bridge.rs @@ -0,0 +1,681 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Parameter bridge node +//! +//! Accepts packets on its input and converts them into `UpdateParams` control +//! messages sent to a configured sibling node via +//! [`NodeContext::tune_sibling()`]. This enables cross-node control within the +//! pipeline graph — the same mechanism the WebSocket/REST API uses, but +//! initiated from inside the data flow. +//! +//! Three mapping modes are supported: +//! +//! - **Auto** — smart per-packet-type mapping (e.g. `Transcription.text` → +//! `{ "properties": { "text": "..." } }`). +//! - **Template** — a user-supplied JSON template with `{{ field }}` placeholders +//! replaced by values extracted from the incoming packet. +//! - **Raw** — forward the packet payload as-is (useful after a `core::script` +//! node that already produced the desired JSON shape). +//! +//! This is a terminal node (no output pins) and is designed for `best_effort` +//! side branches so it never stalls the main data flow. + +use async_trait::async_trait; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use streamkit_core::control::NodeControlMessage; +use streamkit_core::telemetry::TelemetryEmitter; +use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::{ + state_helpers, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, +}; + +/// How the bridge maps incoming packets to `UpdateParams` JSON. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum MappingMode { + /// Smart per-packet-type mapping. + /// + /// `Transcription` and `Text` packets are wrapped in + /// `{ "properties": { "text": "..." } }` — a shape that targets Slint + /// plugin nodes out of the box. `Custom` packets forward their `data` + /// field as-is (assumed to already be the correct `UpdateParams` shape). + /// + /// If you need a different output shape (e.g. targeting a compositor's + /// `text_overlays`), use `template` mode instead. + #[default] + Auto, + /// User-provided JSON template with `{{ text }}` placeholders. + Template, + /// Forward the extracted payload as-is (no transformation). + Raw, +} + +/// Configuration for the `core::param_bridge` node. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct ParamBridgeConfig { + /// The `node_id` of the sibling node to send `UpdateParams` to. + pub target_node: String, + + /// Mapping strategy. + #[serde(default)] + pub mode: MappingMode, + + /// JSON template used when `mode` is `template`. + /// + /// Placeholders like `{{ text }}` (or `{{text}}`) are replaced with values + /// extracted from the incoming packet. + /// + /// Currently only `{{ text }}` is supported. Future extensions could add + /// `{{ language }}`, `{{ confidence }}`, or arbitrary field paths. + #[serde(default)] + pub template: Option, + + /// Optional debounce window in milliseconds. + /// + /// When set, rapid `UpdateParams` messages are coalesced: only the most + /// recent value is sent after the window expires. This is useful for + /// targets like subtitles where intermediate transcription segments are + /// superseded by newer ones. + #[serde(default)] + pub debounce_ms: Option, +} + +pub struct ParamBridgeNode { + config: ParamBridgeConfig, +} + +impl ParamBridgeNode { + /// Creates a new `ParamBridgeNode` from configuration. + /// + /// # Errors + /// + /// Returns an error if the configuration parameters cannot be parsed. + pub fn new(params: Option<&serde_json::Value>) -> Result { + let config: ParamBridgeConfig = if let Some(p) = params { + serde_json::from_value(p.clone()) + .map_err(|e| StreamKitError::Configuration(format!("Invalid config: {e}")))? + } else { + return Err(StreamKitError::Configuration( + "param_bridge requires at least `target_node` in params".to_string(), + )); + }; + + if matches!(config.mode, MappingMode::Template) && config.template.is_none() { + return Err(StreamKitError::Configuration( + "param_bridge: `template` is required when mode is `template`".to_string(), + )); + } + + Ok(Self { config }) + } + + pub fn input_pins() -> Vec { + vec![InputPin { + name: "in".to_string(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }] + } +} + +/// Extract the text content from a packet (for auto/template modes). +fn extract_text(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(t.text.clone()), + Packet::Text(t) => Some(t.to_string()), + _ => None, + } +} + +/// Build `UpdateParams` JSON using the auto-mapping strategy. +fn auto_map(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(serde_json::json!({ "properties": { "text": t.text } })), + Packet::Text(t) => Some(serde_json::json!({ "properties": { "text": t.as_ref() } })), + Packet::Custom(c) => Some(c.data.clone()), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge auto: unsupported packet type, skipping"); + None + }, + } +} + +/// Replace `{{ text }}` (and `{{text}}`) placeholders in a JSON value tree. +/// +/// Currently only the `text` placeholder is supported. To add more fields +/// (e.g. `{{ language }}`, `{{ confidence }}`), extend the replacement list +/// here and extract the additional values in [`extract_text`] or a new +/// dedicated extraction helper. +fn apply_template(template: &JsonValue, text: &str) -> JsonValue { + match template { + JsonValue::String(s) => { + let normalized = s.replace("{{ text }}", "{{text}}"); + JsonValue::String(normalized.replace("{{text}}", text)) + }, + JsonValue::Array(arr) => { + JsonValue::Array(arr.iter().map(|v| apply_template(v, text)).collect()) + }, + JsonValue::Object(map) => JsonValue::Object( + map.iter().map(|(k, v)| (k.clone(), apply_template(v, text))).collect(), + ), + other => other.clone(), + } +} + +/// Extract the raw JSON payload from a packet (for raw mode). +/// +/// **Note:** `Transcription` packets serialize the full `TranscriptionData` +/// struct (including per-segment timing and confidence). For transcriptions +/// with many segments this can produce a non-trivial JSON tree — prefer +/// `auto` or `template` mode for the subtitle use case. +fn raw_payload(packet: &Packet) -> Option { + match packet { + Packet::Custom(c) => Some(c.data.clone()), + Packet::Transcription(t) => serde_json::to_value(t.as_ref()).ok(), + Packet::Text(t) => Some(serde_json::json!({ "text": t.as_ref() })), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge raw: unsupported packet type, skipping"); + None + }, + } +} + +const fn packet_type_label(packet: &Packet) -> &'static str { + match packet { + Packet::Audio(_) => "Audio", + Packet::Video(_) => "Video", + Packet::Text(_) => "Text", + Packet::Transcription(_) => "Transcription", + Packet::Custom(_) => "Custom", + Packet::Binary { .. } => "Binary", + } +} + +#[async_trait] +impl ProcessorNode for ParamBridgeNode { + fn input_pins(&self) -> Vec { + Self::input_pins() + } + + fn output_pins(&self) -> Vec { + vec![] + } + + async fn run(self: Box, mut context: NodeContext) -> Result<(), StreamKitError> { + let node_id = context.output_sender.node_name().to_string(); + let target = &self.config.target_node; + + state_helpers::emit_initializing(&context.state_tx, &node_id); + + if context.engine_control_tx.is_none() { + tracing::error!( + node = %node_id, + "param_bridge requires engine_control_tx (only available in dynamic pipelines)" + ); + state_helpers::emit_failed( + &context.state_tx, + &node_id, + "engine_control_tx not available (oneshot pipeline?)", + ); + return Err(StreamKitError::Runtime( + "engine_control_tx not available (oneshot pipeline?)".to_string(), + )); + } + + let telemetry = TelemetryEmitter::new( + node_id.clone(), + context.session_id.clone(), + context.telemetry_tx.clone(), + ); + + // Take control_rx out of context so we can select on it alongside + // recv_with_cancellation (which borrows context immutably). The + // dummy channel is a one-time allocation that is never read — + // other nodes avoid this because they don't use + // recv_with_cancellation. + let mut control_rx = { + let (_, rx) = tokio::sync::mpsc::channel(1); + std::mem::replace(&mut context.control_rx, rx) + }; + + let mut input_rx = context.take_input("in")?; + state_helpers::emit_running(&context.state_tx, &node_id); + + let debounce = self.config.debounce_ms.map(tokio::time::Duration::from_millis); + + tracing::info!( + node = %node_id, + target_node = %target, + mode = ?self.config.mode, + debounce_ms = ?self.config.debounce_ms, + "param_bridge started" + ); + + // When debouncing is enabled we store the most recent params (and the + // pre-mapping text preview for telemetry) and only send after the + // window elapses without a new packet arriving. + let mut pending_params: Option<(JsonValue, Option)> = None; + + // Dedup: skip UpdateParams that are identical to the last-sent value. + // This avoids redundant Slint re-renders when Whisper emits duplicate + // segments during VAD boundary refinement. + let mut last_sent: Option = None; + let sleep = tokio::time::sleep(tokio::time::Duration::MAX); + tokio::pin!(sleep); + + loop { + tokio::select! { + biased; + + Some(ctrl) = control_rx.recv() => { + match ctrl { + NodeControlMessage::Shutdown => { + tracing::info!(node = %node_id, "param_bridge received shutdown"); + break; + }, + NodeControlMessage::UpdateParams(_) | NodeControlMessage::Start => {}, + } + } + + packet = context.recv_with_cancellation(&mut input_rx) => { + let Some(packet) = packet else { + break; + }; + + // Extract text preview for telemetry — done before mapping + // so it's independent of the target-specific JSON shape. + let text_preview = extract_text(&packet); + + let params = match &self.config.mode { + MappingMode::Auto => auto_map(&packet), + MappingMode::Template => { + let Some(ref text) = text_preview else { + tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); + continue; + }; + self.config.template.as_ref().map(|tmpl| apply_template(tmpl, text)) + }, + MappingMode::Raw => raw_payload(&packet), + }; + + let Some(params) = params else { + continue; + }; + + if let Some(d) = debounce { + pending_params = Some((params, text_preview)); + sleep.as_mut().reset(tokio::time::Instant::now() + d); + } else { + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() == Some(¶ms) { + continue; + } + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } + } + + () = &mut sleep, if pending_params.is_some() => { + if let Some((params, text_preview)) = pending_params.take() { + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() != Some(¶ms) { + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } + } + // Reset sleep to far future so it doesn't fire again. + // Cannot use Duration::MAX — Instant + Duration::MAX overflows. + sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(365 * 24 * 3600)); + } + } + } + + // Flush any pending debounced params before shutting down. + if let Some((params, text_preview)) = pending_params.take() { + if last_sent.as_ref() != Some(¶ms) { + Self::send_params( + &context, + &telemetry, + &node_id, + target, + params, + text_preview.as_deref(), + ) + .await; + } + } + + state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); + tracing::info!(node = %node_id, "param_bridge stopped"); + Ok(()) + } +} + +impl ParamBridgeNode { + async fn send_params( + context: &NodeContext, + telemetry: &TelemetryEmitter, + node_id: &str, + target: &str, + params: JsonValue, + text_preview: Option<&str>, + ) { + tracing::debug!( + node = %node_id, + target_node = %target, + "param_bridge sending UpdateParams" + ); + + // Emit telemetry so the stream view can display forwarded text. + // text_preview is extracted from the packet before mapping, so it + // works regardless of the target node's expected JSON shape. + if let Some(text) = text_preview { + telemetry.emit( + "stt.result", + serde_json::json!({ + "text_preview": text, + "target_node": target, + }), + ); + } + + if let Err(e) = context.tune_sibling(target, params).await { + tracing::warn!( + node = %node_id, + target_node = %target, + error = %e, + "param_bridge failed to send UpdateParams" + ); + } + } +} + +pub fn register(registry: &mut streamkit_core::NodeRegistry) { + use schemars::schema_for; + use streamkit_core::registry::StaticPins; + + let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "Failed to serialize ParamBridgeConfig schema"); + return; + }, + }; + + registry.register_static_with_description( + "core::param_bridge", + |params| Ok(Box::new(ParamBridgeNode::new(params)?)), + schema, + StaticPins { inputs: ParamBridgeNode::input_pins(), outputs: vec![] }, + vec!["core".to_string(), "control".to_string()], + false, + "Bridges data-plane packets to control-plane UpdateParams messages. \ + Accepts any packet type and sends a mapped UpdateParams to a configured \ + target node, enabling cross-node control within the pipeline graph. \ + Supports auto, template, and raw mapping modes.", + ); +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use std::sync::Arc; + + use super::*; + use serde_json::json; + use streamkit_core::types::{ + CustomEncoding, CustomPacketData, TranscriptionData, TranscriptionSegment, + }; + + // ── extract_text ──────────────────────────────────────────────── + + #[test] + fn extract_text_from_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello world".into(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some("hello world".into())); + } + + #[test] + fn extract_text_from_text_packet() { + let pkt = Packet::Text("some text".into()); + assert_eq!(extract_text(&pkt), Some("some text".into())); + } + + #[test] + fn extract_text_from_empty_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: String::new(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some(String::new())); + } + + #[test] + fn extract_text_returns_none_for_custom() { + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: json!({"key": "value"}), + metadata: None, + })); + assert_eq!(extract_text(&pkt), None); + } + + // ── auto_map ──────────────────────────────────────────────────── + + #[test] + fn auto_map_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hi".into(), + segments: vec![], + language: None, + metadata: None, + })); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hi" } })); + } + + #[test] + fn auto_map_text() { + let pkt = Packet::Text("hello".into()); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hello" } })); + } + + #[test] + fn auto_map_custom_forwards_data() { + let data = json!({"props": {"color": "red"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(auto_map(&pkt).unwrap(), data); + } + + #[test] + fn auto_map_returns_none_for_unsupported() { + // Binary is unsupported in auto mode. + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(auto_map(&pkt).is_none()); + } + + // ── apply_template ────────────────────────────────────────────── + + #[test] + fn apply_template_string_replacement() { + let tmpl = json!("prefix: {{ text }}"); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!("prefix: hello")); + } + + #[test] + fn apply_template_no_whitespace_placeholder() { + let tmpl = json!("prefix: {{text}}"); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!("prefix: hello")); + } + + #[test] + fn apply_template_nested_object() { + let tmpl = json!({ + "properties": { + "text": "{{ text }}", + "visible": true + } + }); + let result = apply_template(&tmpl, "subtitle line"); + assert_eq!( + result, + json!({ + "properties": { + "text": "subtitle line", + "visible": true + } + }) + ); + } + + #[test] + fn apply_template_array() { + let tmpl = json!(["{{ text }}", "static"]); + let result = apply_template(&tmpl, "dynamic"); + assert_eq!(result, json!(["dynamic", "static"])); + } + + #[test] + fn apply_template_no_placeholder() { + let tmpl = json!({"key": "no placeholder here"}); + let result = apply_template(&tmpl, "ignored"); + assert_eq!(result, json!({"key": "no placeholder here"})); + } + + #[test] + fn apply_template_empty_text() { + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, ""); + assert_eq!(result, json!("")); + } + + #[test] + fn apply_template_preserves_non_string_values() { + let tmpl = json!({"count": 42, "flag": true, "text": "{{ text }}"}); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!({"count": 42, "flag": true, "text": "hello"})); + } + + #[test] + fn apply_template_text_containing_placeholder_literal() { + // Regression: if substituted text contains "{{text}}", the second + // replace pass must NOT re-replace it. + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, "contains {{text}} marker"); + assert_eq!(result, json!("contains {{text}} marker")); + } + + // ── raw_payload ───────────────────────────────────────────────── + + #[test] + fn raw_payload_custom() { + let data = json!({"properties": {"text": "direct"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(raw_payload(&pkt).unwrap(), data); + } + + #[test] + fn raw_payload_text() { + let pkt = Packet::Text("raw text".into()); + assert_eq!(raw_payload(&pkt).unwrap(), json!({"text": "raw text"})); + } + + #[test] + fn raw_payload_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello".into(), + segments: vec![TranscriptionSegment { + text: "hello".into(), + start_time_ms: 0, + end_time_ms: 1000, + confidence: Some(0.95), + }], + language: Some("en".into()), + metadata: None, + })); + let result = raw_payload(&pkt).unwrap(); + assert_eq!(result["text"], "hello"); + assert_eq!(result["language"], "en"); + } + + #[test] + fn raw_payload_returns_none_for_unsupported() { + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(raw_payload(&pkt).is_none()); + } + + // ── ParamBridgeNode::new (config validation) ──────────────────── + + #[test] + fn config_requires_params() { + assert!(ParamBridgeNode::new(None).is_err()); + } + + #[test] + fn config_requires_target_node() { + let params = json!({"mode": "auto"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_requires_template() { + let params = json!({"target_node": "foo", "mode": "template"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_with_template_ok() { + let params = json!({ + "target_node": "sub", + "mode": "template", + "template": {"properties": {"text": "{{ text }}"}} + }); + assert!(ParamBridgeNode::new(Some(¶ms)).is_ok()); + } + + #[test] + fn config_auto_mode_defaults() { + let params = json!({"target_node": "target"}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert!(matches!(node.config.mode, MappingMode::Auto)); + } + + #[test] + fn config_rejects_unknown_fields() { + let params = json!({"target_node": "foo", "unknown_field": true}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_debounce_ms() { + let params = json!({"target_node": "t", "debounce_ms": 100}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert_eq!(node.config.debounce_ms, Some(100)); + } +} diff --git a/crates/nodes/src/test_utils.rs b/crates/nodes/src/test_utils.rs index 1fad8882..b47827d0 100644 --- a/crates/nodes/src/test_utils.rs +++ b/crates/nodes/src/test_utils.rs @@ -42,6 +42,7 @@ pub fn create_test_context( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx) @@ -85,6 +86,7 @@ pub fn create_test_context_with_pin_mgmt( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx, pin_mgmt_tx) diff --git a/crates/nodes/src/transport/http.rs b/crates/nodes/src/transport/http.rs index 9cedac26..0093a021 100644 --- a/crates/nodes/src/transport/http.rs +++ b/crates/nodes/src/transport/http.rs @@ -393,6 +393,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/video/compositor/config.rs b/crates/nodes/src/video/compositor/config.rs index 6d4e3648..5e7948fa 100644 --- a/crates/nodes/src/video/compositor/config.rs +++ b/crates/nodes/src/video/compositor/config.rs @@ -150,6 +150,13 @@ pub struct TextOverlayConfig { /// When omitted, the default system font (DejaVu Sans) is used. #[serde(default)] pub font_name: Option, + /// Enable word wrapping within the overlay's bounding rectangle. + /// + /// When `true`, text is wrapped at the width specified by + /// `transform.rect.width`. When `false` (the default), text only + /// breaks on explicit newlines — matching the historical behaviour. + #[serde(default)] + pub word_wrap: bool, } pub(crate) const fn default_opacity() -> f32 { diff --git a/crates/nodes/src/video/compositor/overlay.rs b/crates/nodes/src/video/compositor/overlay.rs index 74e9407c..2fe879ec 100644 --- a/crates/nodes/src/video/compositor/overlay.rs +++ b/crates/nodes/src/video/compositor/overlay.rs @@ -545,9 +545,9 @@ pub fn rasterize_text_overlay( }; let font_size = config.font_size.max(1) as f32; - // No word wrapping — text only breaks on explicit newlines. - // Passing 0 tells wrap_text_lines to split on '\n' only. - let wrap_width = 0; + // Word-wrap when enabled: use the overlay's bounding rect width. + // Otherwise only break on explicit newlines (wrap_width = 0). + let wrap_width = if config.word_wrap { config.transform.rect.width } else { 0 }; // Measure actual text dimensions so the bitmap is large enough to hold // the full rendered string without clipping. When a wrap width is set diff --git a/crates/nodes/src/video/compositor/tests.rs b/crates/nodes/src/video/compositor/tests.rs index c40bf05b..70aa9969 100644 --- a/crates/nodes/src/video/compositor/tests.rs +++ b/crates/nodes/src/video/compositor/tests.rs @@ -286,6 +286,7 @@ fn test_rasterize_text_overlay_produces_pixels() { color: [255, 255, 0, 255], font_size: 24, font_name: None, + word_wrap: false, }; let overlay = rasterize_text_overlay(&cfg, 7680, 10_000); // Bitmap is sized to the measured text extent, not the config rect. @@ -2232,6 +2233,7 @@ fn test_text_overlay_cache_reuses_arc_on_unchanged_config() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut config = @@ -2350,6 +2352,7 @@ fn test_text_overlay_cache_handles_length_changes() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut stats = NodeStatsTracker::new("test".to_string(), None); @@ -2509,6 +2512,7 @@ async fn test_compositor_output_format_runtime_change() { video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Start with no output_format (RGBA8). diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml new file mode 100644 index 00000000..87050b88 --- /dev/null +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -0,0 +1,187 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# Live subtitle demo: webcam PiP over colorbars with real-time Parakeet +# transcription rendered as a Slint subtitle overlay. +# +# Data flow (subtitles): +# mic → opus_decoder → resampler → parakeet → [best_effort] → +# param_bridge → UpdateParams → slint subtitle → compositor layer +# +# parakeet → [best_effort] → telemetry_out (stream view) +# +# Requires: +# - plugin::native::parakeet loaded (with Parakeet TDT model) +# - plugin::native::slint loaded +# - Silero VAD model: models/silero_vad.onnx +# just build-plugin-native-parakeet && just build-plugin-native-slint && just copy-plugins-native +# just download-parakeet-models + +name: Webcam PiP + Live Subtitles (MoQ) +description: >- + Composites the user's webcam as picture-in-picture over colorbars with + real-time Parakeet TDT transcription displayed as subtitles via a Slint + overlay. Demonstrates the param_bridge node for cross-node control + messaging. ~10x faster than Whisper on CPU. +mode: dynamic +client: + gateway_path: /moq/video + publish: + broadcast: input + tracks: + - kind: audio + source: microphone + - kind: video + source: camera + watch: + broadcast: output + audio: true + video: true + +nodes: + # --- Background --- + + colorbars_bg: + kind: video::colorbars + params: + width: 1280 + height: 720 + fps: 30 + pixel_format: rgba8 + + # --- MoQ transport (publish + subscribe) --- + + moq_peer: + kind: transport::moq::peer + params: + gateway_path: /moq/video + input_broadcasts: + - input + output_broadcast: output + allow_reconnect: true + needs: + in: opus_encoder + in_1: vp9_encoder + + # --- Audio → STT path --- + + opus_decoder: + kind: audio::opus::decoder + needs: + in: moq_peer.audio/data + + resampler: + kind: audio::resampler + params: + target_sample_rate: 16000 + needs: opus_decoder + + parakeet: + kind: plugin::native::parakeet + params: + model_dir: models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8 + num_threads: 4 + use_vad: true + vad_model_path: models/silero_vad.onnx + vad_threshold: 0.5 + min_silence_duration_ms: 700 + needs: resampler + + # Surface STT results in the stream view telemetry timeline. + stt_telemetry: + kind: core::telemetry_out + params: + packet_types: ["Transcription"] + max_events_per_sec: 20 + needs: + node: parakeet + mode: best_effort + + # --- Subtitle rendering (Slint) --- + + subtitles: + kind: plugin::native::slint + params: + width: 1280 + height: 120 + fps: 30 + slint_file: samples/slint/system/subtitle.slint + properties: + text: "" + show: true + + subtitle_bridge: + kind: core::param_bridge + params: + target_node: subtitles + mode: template + template: + properties: + text: "{{ text }}" + debounce_ms: 100 + needs: + in: + node: parakeet + mode: best_effort + + # --- Video decode + compositing --- + + vp9_decoder: + kind: video::vp9::decoder + needs: + in: moq_peer.video/hd + + compositor: + kind: video::compositor + params: + width: 1280 + height: 720 + num_inputs: 3 + layers: + in_0: + opacity: 1.0 + z_index: 0 + in_1: + rect: + x: 880 + y: 20 + width: 380 + height: 285 + opacity: 0.9 + z_index: 1 + crop_zoom: 2.2 + crop_x: 0.5 + crop_y: 0.5 + in_2: + rect: + x: 0 + y: 600 + width: 1280 + height: 120 + opacity: 1.0 + z_index: 2 + needs: + - colorbars_bg + - vp9_decoder + - subtitles + + # --- Encode + output --- + + pixel_convert: + kind: video::pixel_convert + params: + output_format: nv12 + needs: compositor + + vp9_encoder: + kind: video::vp9::encoder + params: + keyframe_interval: 30 + needs: pixel_convert + + # --- Audio loopback --- + + opus_encoder: + kind: audio::opus::encoder + needs: opus_decoder diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint new file mode 100644 index 00000000..2cf06fff --- /dev/null +++ b/samples/slint/system/subtitle.slint @@ -0,0 +1,61 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +// Subtitle overlay for real-time transcription display. +// +// Renders a semi-transparent dark panel at the bottom of the frame. +// The text fades in with a slight upward slide when transcription +// arrives, and fades out when text is cleared. The backdrop itself +// appears/disappears instantly — the transition emphasis is on the +// text content. +// +// Properties updated at runtime via param_bridge → UpdateParams: +// text, show + +export component Subtitle inherits Window { + in property text: ""; + in property show: true; + + // Active when show is true and there is text to display. + property active: show && text != ""; + + width: 1280px; + height: 120px; + background: transparent; + + Rectangle { + width: 100%; + height: 100%; + background: transparent; + + // Dark backdrop — appears/disappears with the panel (no animation). + Rectangle { + x: 40px; + y: 10px; + width: root.width - 80px; + height: root.height - 20px; + background: #000000cc; + border-radius: 8px; + opacity: root.active ? 1.0 : 0.0; + + // Subtitle text — fades in with a slight slide-up. + Text { + x: 20px; + y: root.active ? 10px : 20px; + width: parent.width - 40px; + height: parent.height - 20px; + text: root.text; + color: #ffffffee; + font-size: 24px; + font-weight: 600; + wrap: word-wrap; + vertical-alignment: center; + horizontal-alignment: center; + opacity: root.active ? 1.0 : 0.0; + animate opacity { duration: 400ms; easing: ease-in-out; } + animate y { duration: 400ms; easing: ease-out; } + } + } + } +}