From 19b8a7c7ea1e07c192e0d57653063b511606a063 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 17:09:46 +0000 Subject: [PATCH 01/14] feat(engine,nodes): add cross-node control messaging via param_bridge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a generalizable pattern for cross-node control messaging within pipeline graphs, enabling any node to send UpdateParams to sibling nodes by name. Phase 1 — Engine control channel in NodeContext: - Add engine_control_tx: Option> field to NodeContext, wired in DynamicEngine::initialize_node() - Add tune_sibling() convenience method for sending TuneNode messages - Set to None in oneshot/stateless pipelines (not supported) Phase 2 — core::param_bridge node: - Terminal node that bridges data-plane packets to control-plane UpdateParams messages on a configured target node - Three mapping modes: * Auto: smart per-packet-type (Transcription/Text → properties.text, Custom → forward data as-is) * Template: user-supplied JSON with {{ text }} placeholders * Raw: forward extracted payload unchanged - Designed for best_effort side branches to never stall main data flow Phase 3 — Compositor word-wrap: - Add word_wrap: bool field to TextOverlayConfig (default false) - When true, uses transform.rect.width as wrap boundary - Backward compatible — existing overlays unchanged Phase 4 — Demo pipeline + Slint subtitle component: - samples/slint/system/subtitle.slint: semi-transparent panel with word-wrapped text and fade animation - samples/pipelines/dynamic/video_moq_webcam_subtitles.yml: webcam PiP with Whisper STT → param_bridge → Slint subtitle overlay Data flow: mic → opus_decoder → resampler → whisper → [best_effort] → param_bridge → UpdateParams → slint → compositor layer Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/core/src/node.rs | 38 +++ crates/engine/src/dynamic_actor.rs | 5 + crates/engine/src/graph_builder.rs | 3 +- crates/engine/src/lib.rs | 2 + crates/engine/src/tests/connection_types.rs | 2 + .../engine/src/tests/pipeline_activation.rs | 2 + crates/nodes/src/audio/filters/resampler.rs | 2 + crates/nodes/src/core/file_read.rs | 1 + crates/nodes/src/core/file_write.rs | 2 + crates/nodes/src/core/mod.rs | 4 + crates/nodes/src/core/object_store_write.rs | 1 + crates/nodes/src/core/pacer.rs | 1 + crates/nodes/src/core/param_bridge.rs | 266 ++++++++++++++++++ crates/nodes/src/test_utils.rs | 2 + crates/nodes/src/transport/http.rs | 1 + crates/nodes/src/video/compositor/config.rs | 7 + crates/nodes/src/video/compositor/overlay.rs | 6 +- crates/nodes/src/video/compositor/tests.rs | 4 + .../dynamic/video_moq_webcam_subtitles.yml | 167 +++++++++++ samples/slint/system/subtitle.slint | 54 ++++ 20 files changed, 566 insertions(+), 4 deletions(-) create mode 100644 crates/nodes/src/core/param_bridge.rs create mode 100644 samples/pipelines/dynamic/video_moq_webcam_subtitles.yml create mode 100644 samples/slint/system/subtitle.slint diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index df438925..d633ce80 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -333,6 +333,15 @@ pub struct NodeContext { /// Channel for the node to emit structured view data for frontend consumption. /// Like stats_tx, this is optional and best-effort. pub view_data_tx: Option>, + /// Optional sender for engine-level control messages. + /// + /// Allows nodes to send [`EngineControlMessage`] to the engine actor, + /// enabling cross-node control (e.g. sending `UpdateParams` to a sibling + /// node by name via [`EngineControlMessage::TuneNode`]). + /// + /// Only provided in dynamic pipelines. `None` in oneshot/static + /// pipelines where the graph is fixed at build time. + pub engine_control_tx: Option>, } impl NodeContext { @@ -348,6 +357,35 @@ impl NodeContext { }) } + /// Send an `UpdateParams` control message to a sibling node by name. + /// + /// This is a convenience wrapper around [`EngineControlMessage::TuneNode`] + /// that routes through the engine actor's control channel — the same path + /// the WebSocket/REST API uses. + /// + /// Only works in dynamic pipelines (where `engine_control_tx` is `Some`). + /// + /// # Errors + /// + /// Returns an error string if the engine control channel is unavailable + /// (oneshot pipeline) or closed (engine shut down). + pub async fn tune_sibling( + &self, + target_node_id: &str, + params: serde_json::Value, + ) -> Result<(), String> { + let tx = self + .engine_control_tx + .as_ref() + .ok_or_else(|| "engine_control_tx not available (oneshot pipeline?)".to_string())?; + tx.send(crate::control::EngineControlMessage::TuneNode { + node_id: target_node_id.to_string(), + message: crate::control::NodeControlMessage::UpdateParams(params), + }) + .await + .map_err(|_| "Engine control channel closed".to_string()) + } + /// Receives a packet from the given receiver, respecting the cancellation token if present. /// Returns None if cancelled or if the channel is closed. /// diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index c0777001..1c14486c 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -124,6 +124,10 @@ pub struct DynamicEngine { pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter, // Node state metric (1=running, 0=not running) pub(super) node_state_gauge: opentelemetry::metrics::Gauge, + /// Clone of the engine's own control sender, handed to every node via + /// [`NodeContext::engine_control_tx`] so that nodes can emit + /// [`EngineControlMessage::TuneNode`] to sibling nodes. + pub(super) engine_control_tx: mpsc::Sender, } impl DynamicEngine { const fn node_state_name(state: &NodeState) -> &'static str { @@ -639,6 +643,7 @@ impl DynamicEngine { video_pool: Some(self.video_pool.clone()), pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: Some(channels.view_data.clone()), + engine_control_tx: Some(self.engine_control_tx.clone()), }; // 5. Spawn Node diff --git a/crates/engine/src/graph_builder.rs b/crates/engine/src/graph_builder.rs index a3c542a6..9d3709ea 100644 --- a/crates/engine/src/graph_builder.rs +++ b/crates/engine/src/graph_builder.rs @@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph( audio_pool: audio_pool.clone(), video_pool: video_pool.clone(), pipeline_mode: streamkit_core::PipelineMode::Oneshot, - view_data_tx: None, // Stateless pipelines don't emit view data + view_data_tx: None, // Stateless pipelines don't emit view data + engine_control_tx: None, // Stateless pipelines don't support cross-node control }; tracing::debug!("Starting task for node '{}'", name); diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 83fba63e..e7405516 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -155,6 +155,7 @@ impl Engine { #[cfg(feature = "dynamic")] pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle { let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY); + let engine_control_tx = control_tx.clone(); let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY); let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY); @@ -236,6 +237,7 @@ impl Engine { .u64_gauge("node.state") .with_description("Node state (1=running, 0=stopped/failed)") .build(), + engine_control_tx, }; let engine_task = tokio::spawn(dynamic_engine.run()); diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index 45f62e94..f69748a0 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -18,6 +18,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -57,6 +58,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, } } diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index 7792d7e1..790c7baa 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -58,6 +59,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, } } diff --git a/crates/nodes/src/audio/filters/resampler.rs b/crates/nodes/src/audio/filters/resampler.rs index 0b093328..4935bfac 100644 --- a/crates/nodes/src/audio/filters/resampler.rs +++ b/crates/nodes/src/audio/filters/resampler.rs @@ -776,6 +776,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node that downsamples from 48kHz to 24kHz @@ -856,6 +857,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; let config = AudioResamplerConfig { diff --git a/crates/nodes/src/core/file_read.rs b/crates/nodes/src/core/file_read.rs index 1f1a6844..e05d76df 100644 --- a/crates/nodes/src/core/file_read.rs +++ b/crates/nodes/src/core/file_read.rs @@ -246,6 +246,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node diff --git a/crates/nodes/src/core/file_write.rs b/crates/nodes/src/core/file_write.rs index eab8f075..4b301540 100644 --- a/crates/nodes/src/core/file_write.rs +++ b/crates/nodes/src/core/file_write.rs @@ -208,6 +208,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node @@ -292,6 +293,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/core/mod.rs b/crates/nodes/src/core/mod.rs index 98c0e0a6..2d75426e 100644 --- a/crates/nodes/src/core/mod.rs +++ b/crates/nodes/src/core/mod.rs @@ -13,6 +13,7 @@ pub mod json_serialize; #[cfg(feature = "object_store")] pub mod object_store_write; pub mod pacer; +pub mod param_bridge; mod passthrough; #[cfg(feature = "script")] pub mod script; @@ -193,6 +194,9 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode // --- Register TelemetryOut Node --- telemetry_out::register(registry); + // --- Register ParamBridge Node --- + param_bridge::register(registry); + // --- Register ObjectStoreWriteNode --- #[cfg(feature = "object_store")] { diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index 680625eb..aa8a61c4 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -799,6 +799,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // No credentials provided — should fail during init diff --git a/crates/nodes/src/core/pacer.rs b/crates/nodes/src/core/pacer.rs index 94a8a6ce..2f8a75fe 100644 --- a/crates/nodes/src/core/pacer.rs +++ b/crates/nodes/src/core/pacer.rs @@ -507,6 +507,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node with very fast speed to minimize test time diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs new file mode 100644 index 00000000..7ebfbe93 --- /dev/null +++ b/crates/nodes/src/core/param_bridge.rs @@ -0,0 +1,266 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Parameter bridge node +//! +//! Accepts packets on its input and converts them into `UpdateParams` control +//! messages sent to a configured sibling node via +//! [`NodeContext::tune_sibling()`]. This enables cross-node control within the +//! pipeline graph — the same mechanism the WebSocket/REST API uses, but +//! initiated from inside the data flow. +//! +//! Three mapping modes are supported: +//! +//! - **Auto** — smart per-packet-type mapping (e.g. `Transcription.text` → +//! `{ "properties": { "text": "..." } }`). +//! - **Template** — a user-supplied JSON template with `{{ field }}` placeholders +//! replaced by values extracted from the incoming packet. +//! - **Raw** — forward the packet payload as-is (useful after a `core::script` +//! node that already produced the desired JSON shape). +//! +//! This is a terminal node (no output pins) and is designed for `best_effort` +//! side branches so it never stalls the main data flow. + +use async_trait::async_trait; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::{ + state_helpers, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, +}; + +/// How the bridge maps incoming packets to `UpdateParams` JSON. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum MappingMode { + /// Smart per-packet-type mapping: + /// - `Transcription` → `{ "properties": { "text": "" } }` + /// - `Text` → `{ "properties": { "text": "" } }` + /// - `Custom` → forward `custom.data` as-is + #[default] + Auto, + /// User-provided JSON template with `{{ field }}` placeholders. + Template, + /// Forward the extracted payload as-is (no transformation). + Raw, +} + +/// Configuration for the `core::param_bridge` node. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct ParamBridgeConfig { + /// The `node_id` of the sibling node to send `UpdateParams` to. + pub target_node: String, + + /// Mapping strategy. + #[serde(default)] + pub mode: MappingMode, + + /// JSON template used when `mode` is `template`. + /// Placeholders like `{{ text }}` are replaced with values extracted + /// from the incoming packet (currently supports `{{ text }}`). + #[serde(default)] + pub template: Option, +} + +pub struct ParamBridgeNode { + config: ParamBridgeConfig, +} + +impl ParamBridgeNode { + /// Creates a new `ParamBridgeNode` from configuration. + /// + /// # Errors + /// + /// Returns an error if the configuration parameters cannot be parsed. + pub fn new(params: Option<&serde_json::Value>) -> Result { + let config: ParamBridgeConfig = if let Some(p) = params { + serde_json::from_value(p.clone()) + .map_err(|e| StreamKitError::Configuration(format!("Invalid config: {e}")))? + } else { + return Err(StreamKitError::Configuration( + "param_bridge requires at least `target_node` in params".to_string(), + )); + }; + + if matches!(config.mode, MappingMode::Template) && config.template.is_none() { + return Err(StreamKitError::Configuration( + "param_bridge: `template` is required when mode is `template`".to_string(), + )); + } + + Ok(Self { config }) + } + + pub fn input_pins() -> Vec { + vec![InputPin { + name: "in".to_string(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }] + } +} + +/// Extract the text content from a packet (for auto/template modes). +fn extract_text(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(t.text.clone()), + Packet::Text(t) => Some(t.to_string()), + _ => None, + } +} + +/// Build `UpdateParams` JSON using the auto-mapping strategy. +fn auto_map(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(serde_json::json!({ "properties": { "text": t.text } })), + Packet::Text(t) => Some(serde_json::json!({ "properties": { "text": t.as_ref() } })), + Packet::Custom(c) => Some(c.data.clone()), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge auto: unsupported packet type, skipping"); + None + }, + } +} + +/// Replace `{{ text }}` placeholders in a JSON value tree. +fn apply_template(template: &JsonValue, text: &str) -> JsonValue { + match template { + JsonValue::String(s) => JsonValue::String(s.replace("{{ text }}", text)), + JsonValue::Array(arr) => { + JsonValue::Array(arr.iter().map(|v| apply_template(v, text)).collect()) + }, + JsonValue::Object(map) => JsonValue::Object( + map.iter().map(|(k, v)| (k.clone(), apply_template(v, text))).collect(), + ), + other => other.clone(), + } +} + +/// Extract the raw JSON payload from a packet (for raw mode). +fn raw_payload(packet: &Packet) -> Option { + match packet { + Packet::Custom(c) => Some(c.data.clone()), + Packet::Transcription(t) => serde_json::to_value(t.as_ref()).ok(), + Packet::Text(t) => Some(serde_json::json!({ "text": t.as_ref() })), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge raw: unsupported packet type, skipping"); + None + }, + } +} + +const fn packet_type_label(packet: &Packet) -> &'static str { + match packet { + Packet::Audio(_) => "Audio", + Packet::Video(_) => "Video", + Packet::Text(_) => "Text", + Packet::Transcription(_) => "Transcription", + Packet::Custom(_) => "Custom", + Packet::Binary { .. } => "Binary", + } +} + +#[async_trait] +impl ProcessorNode for ParamBridgeNode { + fn input_pins(&self) -> Vec { + Self::input_pins() + } + + fn output_pins(&self) -> Vec { + vec![] + } + + async fn run(self: Box, mut context: NodeContext) -> Result<(), StreamKitError> { + let node_id = context.output_sender.node_name().to_string(); + let target = &self.config.target_node; + + state_helpers::emit_initializing(&context.state_tx, &node_id); + + if context.engine_control_tx.is_none() { + tracing::error!( + node = %node_id, + "param_bridge requires engine_control_tx (only available in dynamic pipelines)" + ); + state_helpers::emit_failed( + &context.state_tx, + &node_id, + "engine_control_tx not available", + ); + return Err(StreamKitError::Runtime( + "param_bridge requires engine_control_tx (only available in dynamic pipelines)" + .to_string(), + )); + } + + let mut input_rx = context.take_input("in")?; + state_helpers::emit_running(&context.state_tx, &node_id); + + tracing::info!( + node = %node_id, + target_node = %target, + mode = ?self.config.mode, + "param_bridge started" + ); + + while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { + let params = match &self.config.mode { + MappingMode::Auto => auto_map(&packet), + MappingMode::Template => { + let text = extract_text(&packet).unwrap_or_default(); + self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) + }, + MappingMode::Raw => raw_payload(&packet), + }; + + let Some(params) = params else { + continue; + }; + + tracing::debug!( + node = %node_id, + target_node = %target, + "param_bridge sending UpdateParams" + ); + + if let Err(e) = context.tune_sibling(target, params).await { + tracing::warn!( + node = %node_id, + target_node = %target, + error = %e, + "param_bridge failed to send UpdateParams" + ); + } + } + + state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); + tracing::info!(node = %node_id, "param_bridge stopped"); + Ok(()) + } +} + +pub fn register(registry: &mut streamkit_core::NodeRegistry) { + use schemars::schema_for; + + let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "Failed to serialize ParamBridgeConfig schema"); + return; + }, + }; + + registry.register_dynamic_with_description( + "core::param_bridge", + |params| Ok(Box::new(ParamBridgeNode::new(params)?)), + schema, + vec!["core".to_string(), "control".to_string()], + false, + "Bridges data-plane packets to control-plane UpdateParams messages. \ + Accepts any packet type and sends a mapped UpdateParams to a configured \ + target node, enabling cross-node control within the pipeline graph. \ + Supports auto, template, and raw mapping modes.", + ); +} diff --git a/crates/nodes/src/test_utils.rs b/crates/nodes/src/test_utils.rs index 1fad8882..b47827d0 100644 --- a/crates/nodes/src/test_utils.rs +++ b/crates/nodes/src/test_utils.rs @@ -42,6 +42,7 @@ pub fn create_test_context( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx) @@ -85,6 +86,7 @@ pub fn create_test_context_with_pin_mgmt( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx, pin_mgmt_tx) diff --git a/crates/nodes/src/transport/http.rs b/crates/nodes/src/transport/http.rs index 9cedac26..0093a021 100644 --- a/crates/nodes/src/transport/http.rs +++ b/crates/nodes/src/transport/http.rs @@ -393,6 +393,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/video/compositor/config.rs b/crates/nodes/src/video/compositor/config.rs index 6d4e3648..5e7948fa 100644 --- a/crates/nodes/src/video/compositor/config.rs +++ b/crates/nodes/src/video/compositor/config.rs @@ -150,6 +150,13 @@ pub struct TextOverlayConfig { /// When omitted, the default system font (DejaVu Sans) is used. #[serde(default)] pub font_name: Option, + /// Enable word wrapping within the overlay's bounding rectangle. + /// + /// When `true`, text is wrapped at the width specified by + /// `transform.rect.width`. When `false` (the default), text only + /// breaks on explicit newlines — matching the historical behaviour. + #[serde(default)] + pub word_wrap: bool, } pub(crate) const fn default_opacity() -> f32 { diff --git a/crates/nodes/src/video/compositor/overlay.rs b/crates/nodes/src/video/compositor/overlay.rs index 74e9407c..2fe879ec 100644 --- a/crates/nodes/src/video/compositor/overlay.rs +++ b/crates/nodes/src/video/compositor/overlay.rs @@ -545,9 +545,9 @@ pub fn rasterize_text_overlay( }; let font_size = config.font_size.max(1) as f32; - // No word wrapping — text only breaks on explicit newlines. - // Passing 0 tells wrap_text_lines to split on '\n' only. - let wrap_width = 0; + // Word-wrap when enabled: use the overlay's bounding rect width. + // Otherwise only break on explicit newlines (wrap_width = 0). + let wrap_width = if config.word_wrap { config.transform.rect.width } else { 0 }; // Measure actual text dimensions so the bitmap is large enough to hold // the full rendered string without clipping. When a wrap width is set diff --git a/crates/nodes/src/video/compositor/tests.rs b/crates/nodes/src/video/compositor/tests.rs index c40bf05b..70aa9969 100644 --- a/crates/nodes/src/video/compositor/tests.rs +++ b/crates/nodes/src/video/compositor/tests.rs @@ -286,6 +286,7 @@ fn test_rasterize_text_overlay_produces_pixels() { color: [255, 255, 0, 255], font_size: 24, font_name: None, + word_wrap: false, }; let overlay = rasterize_text_overlay(&cfg, 7680, 10_000); // Bitmap is sized to the measured text extent, not the config rect. @@ -2232,6 +2233,7 @@ fn test_text_overlay_cache_reuses_arc_on_unchanged_config() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut config = @@ -2350,6 +2352,7 @@ fn test_text_overlay_cache_handles_length_changes() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut stats = NodeStatsTracker::new("test".to_string(), None); @@ -2509,6 +2512,7 @@ async fn test_compositor_output_format_runtime_change() { video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Start with no output_format (RGBA8). diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml new file mode 100644 index 00000000..29c36108 --- /dev/null +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -0,0 +1,167 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# Live subtitle demo: webcam PiP over colorbars with real-time Whisper +# transcription rendered as a Slint subtitle overlay. +# +# Data flow (subtitles): +# mic → opus_decoder → resampler → whisper → [best_effort] → +# param_bridge → UpdateParams → slint subtitle → compositor layer +# +# Requires: +# - plugin::native::whisper loaded (with a model, e.g. base.en) +# - plugin::native::slint loaded +# just build-plugin-native-whisper && just build-plugin-native-slint && just copy-plugins-native + +name: Webcam PiP + Live Subtitles (MoQ) +description: >- + Composites the user's webcam as picture-in-picture over colorbars with + real-time Whisper transcription displayed as subtitles via a Slint overlay. + Demonstrates the param_bridge node for cross-node control messaging. +mode: dynamic +client: + gateway_path: /moq/video + publish: + broadcast: input + tracks: + - kind: audio + source: microphone + - kind: video + source: camera + watch: + broadcast: output + audio: true + video: true + +nodes: + # --- Background --- + + colorbars_bg: + kind: video::colorbars + params: + width: 1280 + height: 720 + fps: 30 + pixel_format: rgba8 + + # --- MoQ transport (publish + subscribe) --- + + moq_peer: + kind: transport::moq::peer + params: + gateway_path: /moq/video + input_broadcasts: + - input + output_broadcast: output + allow_reconnect: true + needs: + in: opus_encoder + in_1: vp9_encoder + + # --- Audio → STT path --- + + opus_decoder: + kind: audio::opus::decoder + needs: + in: moq_peer.audio/data + + resampler: + kind: audio::resampler + params: + sample_rate: 16000 + channels: 1 + needs: opus_decoder + + whisper: + kind: plugin::native::whisper + params: + model: base.en + needs: resampler + + # --- Subtitle rendering (Slint) --- + + subtitles: + kind: plugin::native::slint + params: + width: 1280 + height: 120 + fps: 30 + slint_file: samples/slint/system/subtitle.slint + properties: + text: "" + visible: true + + subtitle_bridge: + kind: core::param_bridge + params: + target_node: subtitles + mode: template + template: + properties: + text: "{{ text }}" + visible: true + needs: + in: whisper + connection_mode: best_effort + + # --- Video decode + compositing --- + + vp9_decoder: + kind: video::vp9::decoder + needs: + in: moq_peer.video/hd + + compositor: + kind: video::compositor + params: + width: 1280 + height: 720 + num_inputs: 3 + layers: + in_0: + opacity: 1.0 + z_index: 0 + in_1: + rect: + x: 880 + y: 20 + width: 380 + height: 285 + opacity: 0.9 + z_index: 1 + crop_zoom: 2.2 + crop_x: 0.5 + crop_y: 0.5 + in_2: + rect: + x: 0 + y: 600 + width: 1280 + height: 120 + opacity: 1.0 + z_index: 2 + needs: + - colorbars_bg + - vp9_decoder + - subtitles + + # --- Encode + output --- + + pixel_convert: + kind: video::pixel_convert + params: + output_format: nv12 + needs: compositor + + vp9_encoder: + kind: video::vp9::encoder + params: + keyframe_interval: 30 + needs: pixel_convert + + # --- Audio loopback --- + + opus_encoder: + kind: audio::opus::encoder + needs: opus_decoder diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint new file mode 100644 index 00000000..e2ce0349 --- /dev/null +++ b/samples/slint/system/subtitle.slint @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +// Subtitle overlay for real-time transcription display. +// +// Renders a semi-transparent panel at the bottom of the frame with +// word-wrapped text and a fade-in animation on updates. +// +// Properties updated at runtime via param_bridge → UpdateParams: +// text, visible + +export component Subtitle inherits Window { + in property text: ""; + in property visible: true; + + width: 1280px; + height: 120px; + background: transparent; + + // Background panel with semi-transparent dark fill. + Rectangle { + width: 100%; + height: 100%; + background: transparent; + opacity: root.visible ? 1.0 : 0.0; + animate opacity { duration: 300ms; easing: ease-in-out; } + + // Dark backdrop. + Rectangle { + x: 40px; + y: 10px; + width: root.width - 80px; + height: root.height - 20px; + background: #000000cc; + border-radius: 8px; + + // Subtitle text with word wrap. + Text { + x: 20px; + y: 10px; + width: parent.width - 40px; + height: parent.height - 20px; + text: root.text; + color: #ffffffee; + font-size: 24px; + font-weight: 600; + wrap: word-wrap; + vertical-alignment: center; + horizontal-alignment: center; + } + } + } +} From 587d05c8db0f43df9bcf260905d611db23346a1b Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 17:17:26 +0000 Subject: [PATCH 02/14] fix(nodes): skip unsupported packets in template mode, add unit tests Fix template mode sending spurious UpdateParams with empty text when receiving unsupported packet types (Audio, Video, Binary). Now skips them consistently with auto and raw modes. Add comprehensive unit tests for all param_bridge helper functions: extract_text, auto_map, apply_template, raw_payload, and config validation (24 tests). Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 246 +++++++++++++++++++++++++- 1 file changed, 245 insertions(+), 1 deletion(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index 7ebfbe93..ab8d2702 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -209,7 +209,13 @@ impl ProcessorNode for ParamBridgeNode { let params = match &self.config.mode { MappingMode::Auto => auto_map(&packet), MappingMode::Template => { - let text = extract_text(&packet).unwrap_or_default(); + let text = match extract_text(&packet) { + Some(t) => t, + None => { + tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); + continue; + }, + }; self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) }, MappingMode::Raw => raw_payload(&packet), @@ -241,6 +247,244 @@ impl ProcessorNode for ParamBridgeNode { } } +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use serde_json::json; + use streamkit_core::types::{ + CustomEncoding, CustomPacketData, TranscriptionData, TranscriptionSegment, + }; + + // ── extract_text ──────────────────────────────────────────────── + + #[test] + fn extract_text_from_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello world".into(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some("hello world".into())); + } + + #[test] + fn extract_text_from_text_packet() { + let pkt = Packet::Text("some text".into()); + assert_eq!(extract_text(&pkt), Some("some text".into())); + } + + #[test] + fn extract_text_from_empty_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: String::new(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some(String::new())); + } + + #[test] + fn extract_text_returns_none_for_custom() { + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: json!({"key": "value"}), + metadata: None, + })); + assert_eq!(extract_text(&pkt), None); + } + + // ── auto_map ──────────────────────────────────────────────────── + + #[test] + fn auto_map_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hi".into(), + segments: vec![], + language: None, + metadata: None, + })); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hi" } })); + } + + #[test] + fn auto_map_text() { + let pkt = Packet::Text("hello".into()); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hello" } })); + } + + #[test] + fn auto_map_custom_forwards_data() { + let data = json!({"props": {"color": "red"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(auto_map(&pkt).unwrap(), data); + } + + #[test] + fn auto_map_returns_none_for_unsupported() { + let pkt = Packet::Text("".into()); + // Text is supported, but let's test an actual unsupported type. + // Binary is unsupported in auto mode. + let binary_pkt = + Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(auto_map(&binary_pkt).is_none()); + } + + // ── apply_template ────────────────────────────────────────────── + + #[test] + fn apply_template_string_replacement() { + let tmpl = json!("prefix: {{ text }}"); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!("prefix: hello")); + } + + #[test] + fn apply_template_nested_object() { + let tmpl = json!({ + "properties": { + "text": "{{ text }}", + "visible": true + } + }); + let result = apply_template(&tmpl, "subtitle line"); + assert_eq!( + result, + json!({ + "properties": { + "text": "subtitle line", + "visible": true + } + }) + ); + } + + #[test] + fn apply_template_array() { + let tmpl = json!(["{{ text }}", "static"]); + let result = apply_template(&tmpl, "dynamic"); + assert_eq!(result, json!(["dynamic", "static"])); + } + + #[test] + fn apply_template_no_placeholder() { + let tmpl = json!({"key": "no placeholder here"}); + let result = apply_template(&tmpl, "ignored"); + assert_eq!(result, json!({"key": "no placeholder here"})); + } + + #[test] + fn apply_template_empty_text() { + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, ""); + assert_eq!(result, json!("")); + } + + #[test] + fn apply_template_preserves_non_string_values() { + let tmpl = json!({"count": 42, "flag": true, "text": "{{ text }}"}); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!({"count": 42, "flag": true, "text": "hello"})); + } + + // ── raw_payload ───────────────────────────────────────────────── + + #[test] + fn raw_payload_custom() { + let data = json!({"properties": {"text": "direct"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(raw_payload(&pkt).unwrap(), data); + } + + #[test] + fn raw_payload_text() { + let pkt = Packet::Text("raw text".into()); + assert_eq!(raw_payload(&pkt).unwrap(), json!({"text": "raw text"})); + } + + #[test] + fn raw_payload_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello".into(), + segments: vec![TranscriptionSegment { + text: "hello".into(), + start_time_ms: 0, + end_time_ms: 1000, + confidence: Some(0.95), + }], + language: Some("en".into()), + metadata: None, + })); + let result = raw_payload(&pkt).unwrap(); + assert_eq!(result["text"], "hello"); + assert_eq!(result["language"], "en"); + } + + #[test] + fn raw_payload_returns_none_for_unsupported() { + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(raw_payload(&pkt).is_none()); + } + + // ── ParamBridgeNode::new (config validation) ──────────────────── + + #[test] + fn config_requires_params() { + assert!(ParamBridgeNode::new(None).is_err()); + } + + #[test] + fn config_requires_target_node() { + let params = json!({"mode": "auto"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_requires_template() { + let params = json!({"target_node": "foo", "mode": "template"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_with_template_ok() { + let params = json!({ + "target_node": "sub", + "mode": "template", + "template": {"properties": {"text": "{{ text }}"}} + }); + assert!(ParamBridgeNode::new(Some(¶ms)).is_ok()); + } + + #[test] + fn config_auto_mode_defaults() { + let params = json!({"target_node": "target"}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert!(matches!(node.config.mode, MappingMode::Auto)); + } + + #[test] + fn config_rejects_unknown_fields() { + let params = json!({"target_node": "foo", "unknown_field": true}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } +} + pub fn register(registry: &mut streamkit_core::NodeRegistry) { use schemars::schema_for; From 966a7f7ff1e63ea85cf91c3f1b9ca931e2b6a464 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 17:24:45 +0000 Subject: [PATCH 03/14] style(nodes): fix clippy lints in param_bridge - Use let-else instead of if-let for template mode extract_text - Move test module to end of file (items_after_test_module) - Allow unwrap_used in test module (matches repo convention) - Remove unused variable in test Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 65 +++++++++++++-------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index ab8d2702..5a7647a3 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -209,12 +209,9 @@ impl ProcessorNode for ParamBridgeNode { let params = match &self.config.mode { MappingMode::Auto => auto_map(&packet), MappingMode::Template => { - let text = match extract_text(&packet) { - Some(t) => t, - None => { - tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); - continue; - }, + let Some(text) = extract_text(&packet) else { + tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); + continue; }; self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) }, @@ -247,7 +244,32 @@ impl ProcessorNode for ParamBridgeNode { } } +pub fn register(registry: &mut streamkit_core::NodeRegistry) { + use schemars::schema_for; + + let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "Failed to serialize ParamBridgeConfig schema"); + return; + }, + }; + + registry.register_dynamic_with_description( + "core::param_bridge", + |params| Ok(Box::new(ParamBridgeNode::new(params)?)), + schema, + vec!["core".to_string(), "control".to_string()], + false, + "Bridges data-plane packets to control-plane UpdateParams messages. \ + Accepts any packet type and sends a mapped UpdateParams to a configured \ + target node, enabling cross-node control within the pipeline graph. \ + Supports auto, template, and raw mapping modes.", + ); +} + #[cfg(test)] +#[allow(clippy::unwrap_used)] mod tests { use std::sync::Arc; @@ -333,12 +355,9 @@ mod tests { #[test] fn auto_map_returns_none_for_unsupported() { - let pkt = Packet::Text("".into()); - // Text is supported, but let's test an actual unsupported type. // Binary is unsupported in auto mode. - let binary_pkt = - Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; - assert!(auto_map(&binary_pkt).is_none()); + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(auto_map(&pkt).is_none()); } // ── apply_template ────────────────────────────────────────────── @@ -484,27 +503,3 @@ mod tests { assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); } } - -pub fn register(registry: &mut streamkit_core::NodeRegistry) { - use schemars::schema_for; - - let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { - Ok(v) => v, - Err(e) => { - tracing::error!(error = %e, "Failed to serialize ParamBridgeConfig schema"); - return; - }, - }; - - registry.register_dynamic_with_description( - "core::param_bridge", - |params| Ok(Box::new(ParamBridgeNode::new(params)?)), - schema, - vec!["core".to_string(), "control".to_string()], - false, - "Bridges data-plane packets to control-plane UpdateParams messages. \ - Accepts any packet type and sends a mapped UpdateParams to a configured \ - target node, enabling cross-node control within the pipeline graph. \ - Supports auto, template, and raw mapping modes.", - ); -} From 392bf8c44899419978c893e85e5c17fc465ce279 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 17:49:39 +0000 Subject: [PATCH 04/14] fix(nodes): address review findings for param_bridge - tune_sibling() now returns Result<(), StreamKitError> instead of String - Add optional debounce_ms config to coalesce rapid UpdateParams - Make placeholder matching whitespace-insensitive ({{text}} and {{ text }}) - Document auto_map asymmetry (Slint-oriented default) in MappingMode doc - Add extension path comment for future placeholders (language, confidence) - Align error strings between early check and tune_sibling - Register with StaticPins to fix schema endpoint ERROR log - Fix sample pipeline: target_sample_rate (not sample_rate/channels), model_path with tiny model, add debounce_ms to subtitle_bridge - Add tests for debounce_ms config and whitespace-insensitive placeholders Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/core/src/node.rs | 17 +- crates/nodes/src/core/param_bridge.rs | 162 +++++++++++++----- .../dynamic/video_moq_webcam_subtitles.yml | 9 +- 3 files changed, 137 insertions(+), 51 deletions(-) diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index d633ce80..e88d9339 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -367,23 +367,24 @@ impl NodeContext { /// /// # Errors /// - /// Returns an error string if the engine control channel is unavailable - /// (oneshot pipeline) or closed (engine shut down). + /// Returns a [`StreamKitError::Runtime`] if the engine control channel is + /// unavailable (oneshot pipeline) or closed (engine shut down). pub async fn tune_sibling( &self, target_node_id: &str, params: serde_json::Value, - ) -> Result<(), String> { - let tx = self - .engine_control_tx - .as_ref() - .ok_or_else(|| "engine_control_tx not available (oneshot pipeline?)".to_string())?; + ) -> Result<(), StreamKitError> { + let tx = self.engine_control_tx.as_ref().ok_or_else(|| { + StreamKitError::Runtime( + "engine_control_tx not available (oneshot pipeline?)".to_string(), + ) + })?; tx.send(crate::control::EngineControlMessage::TuneNode { node_id: target_node_id.to_string(), message: crate::control::NodeControlMessage::UpdateParams(params), }) .await - .map_err(|_| "Engine control channel closed".to_string()) + .map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string())) } /// Receives a packet from the given receiver, respecting the cancellation token if present. diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index 5a7647a3..a9460c00 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -35,13 +35,18 @@ use streamkit_core::{ #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum MappingMode { - /// Smart per-packet-type mapping: - /// - `Transcription` → `{ "properties": { "text": "" } }` - /// - `Text` → `{ "properties": { "text": "" } }` - /// - `Custom` → forward `custom.data` as-is + /// Smart per-packet-type mapping. + /// + /// `Transcription` and `Text` packets are wrapped in + /// `{ "properties": { "text": "..." } }` — a shape that targets Slint + /// plugin nodes out of the box. `Custom` packets forward their `data` + /// field as-is (assumed to already be the correct `UpdateParams` shape). + /// + /// If you need a different output shape (e.g. targeting a compositor's + /// `text_overlays`), use `template` mode instead. #[default] Auto, - /// User-provided JSON template with `{{ field }}` placeholders. + /// User-provided JSON template with `{{ text }}` placeholders. Template, /// Forward the extracted payload as-is (no transformation). Raw, @@ -59,10 +64,23 @@ pub struct ParamBridgeConfig { pub mode: MappingMode, /// JSON template used when `mode` is `template`. - /// Placeholders like `{{ text }}` are replaced with values extracted - /// from the incoming packet (currently supports `{{ text }}`). + /// + /// Placeholders like `{{ text }}` (or `{{text}}`) are replaced with values + /// extracted from the incoming packet. + /// + /// Currently only `{{ text }}` is supported. Future extensions could add + /// `{{ language }}`, `{{ confidence }}`, or arbitrary field paths. #[serde(default)] pub template: Option, + + /// Optional debounce window in milliseconds. + /// + /// When set, rapid `UpdateParams` messages are coalesced: only the most + /// recent value is sent after the window expires. This is useful for + /// targets like subtitles where intermediate transcription segments are + /// superseded by newer ones. + #[serde(default)] + pub debounce_ms: Option, } pub struct ParamBridgeNode { @@ -125,10 +143,18 @@ fn auto_map(packet: &Packet) -> Option { } } -/// Replace `{{ text }}` placeholders in a JSON value tree. +/// Replace `{{ text }}` (and `{{text}}`) placeholders in a JSON value tree. +/// +/// Currently only the `text` placeholder is supported. To add more fields +/// (e.g. `{{ language }}`, `{{ confidence }}`), extend the replacement list +/// here and extract the additional values in [`extract_text`] or a new +/// dedicated extraction helper. fn apply_template(template: &JsonValue, text: &str) -> JsonValue { match template { - JsonValue::String(s) => JsonValue::String(s.replace("{{ text }}", text)), + JsonValue::String(s) => { + let replaced = s.replace("{{ text }}", text); + JsonValue::String(replaced.replace("{{text}}", text)) + }, JsonValue::Array(arr) => { JsonValue::Array(arr.iter().map(|v| apply_template(v, text)).collect()) }, @@ -187,65 +213,108 @@ impl ProcessorNode for ParamBridgeNode { state_helpers::emit_failed( &context.state_tx, &node_id, - "engine_control_tx not available", + "engine_control_tx not available (oneshot pipeline?)", ); return Err(StreamKitError::Runtime( - "param_bridge requires engine_control_tx (only available in dynamic pipelines)" - .to_string(), + "engine_control_tx not available (oneshot pipeline?)".to_string(), )); } let mut input_rx = context.take_input("in")?; state_helpers::emit_running(&context.state_tx, &node_id); + let debounce = self.config.debounce_ms.map(tokio::time::Duration::from_millis); + tracing::info!( node = %node_id, target_node = %target, mode = ?self.config.mode, + debounce_ms = ?self.config.debounce_ms, "param_bridge started" ); - while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { - let params = match &self.config.mode { - MappingMode::Auto => auto_map(&packet), - MappingMode::Template => { - let Some(text) = extract_text(&packet) else { - tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); - continue; + // When debouncing is enabled we store the most recent params and only + // send after the window elapses without a new packet arriving. + let mut pending_params: Option = None; + let sleep = tokio::time::sleep(tokio::time::Duration::MAX); + tokio::pin!(sleep); + + loop { + tokio::select! { + biased; + + packet = context.recv_with_cancellation(&mut input_rx) => { + let Some(packet) = packet else { + break; }; - self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) - }, - MappingMode::Raw => raw_payload(&packet), - }; - let Some(params) = params else { - continue; - }; + let params = match &self.config.mode { + MappingMode::Auto => auto_map(&packet), + MappingMode::Template => { + let Some(text) = extract_text(&packet) else { + tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); + continue; + }; + self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) + }, + MappingMode::Raw => raw_payload(&packet), + }; - tracing::debug!( - node = %node_id, - target_node = %target, - "param_bridge sending UpdateParams" - ); + let Some(params) = params else { + continue; + }; + + if let Some(d) = debounce { + pending_params = Some(params); + sleep.as_mut().reset(tokio::time::Instant::now() + d); + } else { + Self::send_params(&context, &node_id, target, params).await; + } + } - if let Err(e) = context.tune_sibling(target, params).await { - tracing::warn!( - node = %node_id, - target_node = %target, - error = %e, - "param_bridge failed to send UpdateParams" - ); + () = &mut sleep, if pending_params.is_some() => { + if let Some(params) = pending_params.take() { + Self::send_params(&context, &node_id, target, params).await; + } + // Reset sleep to far future so it doesn't fire again. + sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(86400)); + } } } + // Flush any pending debounced params before shutting down. + if let Some(params) = pending_params.take() { + Self::send_params(&context, &node_id, target, params).await; + } + state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); tracing::info!(node = %node_id, "param_bridge stopped"); Ok(()) } } +impl ParamBridgeNode { + async fn send_params(context: &NodeContext, node_id: &str, target: &str, params: JsonValue) { + tracing::debug!( + node = %node_id, + target_node = %target, + "param_bridge sending UpdateParams" + ); + + if let Err(e) = context.tune_sibling(target, params).await { + tracing::warn!( + node = %node_id, + target_node = %target, + error = %e, + "param_bridge failed to send UpdateParams" + ); + } + } +} + pub fn register(registry: &mut streamkit_core::NodeRegistry) { use schemars::schema_for; + use streamkit_core::registry::StaticPins; let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { Ok(v) => v, @@ -255,10 +324,11 @@ pub fn register(registry: &mut streamkit_core::NodeRegistry) { }, }; - registry.register_dynamic_with_description( + registry.register_static_with_description( "core::param_bridge", |params| Ok(Box::new(ParamBridgeNode::new(params)?)), schema, + StaticPins { inputs: ParamBridgeNode::input_pins(), outputs: vec![] }, vec!["core".to_string(), "control".to_string()], false, "Bridges data-plane packets to control-plane UpdateParams messages. \ @@ -369,6 +439,13 @@ mod tests { assert_eq!(result, json!("prefix: hello")); } + #[test] + fn apply_template_no_whitespace_placeholder() { + let tmpl = json!("prefix: {{text}}"); + let result = apply_template(&tmpl, "hello"); + assert_eq!(result, json!("prefix: hello")); + } + #[test] fn apply_template_nested_object() { let tmpl = json!({ @@ -502,4 +579,11 @@ mod tests { let params = json!({"target_node": "foo", "unknown_field": true}); assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); } + + #[test] + fn config_debounce_ms() { + let params = json!({"target_node": "t", "debounce_ms": 100}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert_eq!(node.config.debounce_ms, Some(100)); + } } diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 29c36108..8493d1c7 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -10,7 +10,7 @@ # param_bridge → UpdateParams → slint subtitle → compositor layer # # Requires: -# - plugin::native::whisper loaded (with a model, e.g. base.en) +# - plugin::native::whisper loaded (with a model, e.g. tiny.en) # - plugin::native::slint loaded # just build-plugin-native-whisper && just build-plugin-native-slint && just copy-plugins-native @@ -69,14 +69,14 @@ nodes: resampler: kind: audio::resampler params: - sample_rate: 16000 - channels: 1 + target_sample_rate: 16000 needs: opus_decoder whisper: kind: plugin::native::whisper params: - model: base.en + model_path: models/ggml-tiny.en-q5_1.bin + language: en needs: resampler # --- Subtitle rendering (Slint) --- @@ -101,6 +101,7 @@ nodes: properties: text: "{{ text }}" visible: true + debounce_ms: 100 needs: in: whisper connection_mode: best_effort From 7a39ace46a51af32a794e915bd658274775fd6b8 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 17:56:24 +0000 Subject: [PATCH 05/14] fix(nodes): normalize template placeholders before substitution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes sequential replacement corruption when substituted text contains the literal string '{{text}}'. Normalize '{{ text }}' → '{{text}}' first, then replace once. Adds regression test for this case. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index a9460c00..20ec35cd 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -152,8 +152,8 @@ fn auto_map(packet: &Packet) -> Option { fn apply_template(template: &JsonValue, text: &str) -> JsonValue { match template { JsonValue::String(s) => { - let replaced = s.replace("{{ text }}", text); - JsonValue::String(replaced.replace("{{text}}", text)) + let normalized = s.replace("{{ text }}", "{{text}}"); + JsonValue::String(normalized.replace("{{text}}", text)) }, JsonValue::Array(arr) => { JsonValue::Array(arr.iter().map(|v| apply_template(v, text)).collect()) @@ -494,6 +494,15 @@ mod tests { assert_eq!(result, json!({"count": 42, "flag": true, "text": "hello"})); } + #[test] + fn apply_template_text_containing_placeholder_literal() { + // Regression: if substituted text contains "{{text}}", the second + // replace pass must NOT re-replace it. + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, "contains {{text}} marker"); + assert_eq!(result, json!("contains {{text}} marker")); + } + // ── raw_payload ───────────────────────────────────────────────── #[test] From 1560532aa4af64d748630593bdfda9244fcb1bc6 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 18:39:42 +0000 Subject: [PATCH 06/14] fix(slint): rename reserved 'visible' property in subtitle.slint 'visible' is a built-in property on all Slint elements (including Window). Declaring 'in property visible' causes a Slint compilation error ('Cannot override property visible') that was silently swallowed at the plugin FFI boundary, surfacing only as the generic 'Plugin failed to create instance' message. Rename to 'show' (consistent with lower_third.slint) and update the sample pipeline template to match. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- samples/pipelines/dynamic/video_moq_webcam_subtitles.yml | 4 ++-- samples/slint/system/subtitle.slint | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 8493d1c7..36e39633 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -90,7 +90,7 @@ nodes: slint_file: samples/slint/system/subtitle.slint properties: text: "" - visible: true + show: true subtitle_bridge: kind: core::param_bridge @@ -100,7 +100,7 @@ nodes: template: properties: text: "{{ text }}" - visible: true + show: true debounce_ms: 100 needs: in: whisper diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint index e2ce0349..e77bc37e 100644 --- a/samples/slint/system/subtitle.slint +++ b/samples/slint/system/subtitle.slint @@ -8,11 +8,11 @@ // word-wrapped text and a fade-in animation on updates. // // Properties updated at runtime via param_bridge → UpdateParams: -// text, visible +// text, show export component Subtitle inherits Window { in property text: ""; - in property visible: true; + in property show: true; width: 1280px; height: 120px; @@ -23,7 +23,7 @@ export component Subtitle inherits Window { width: 100%; height: 100%; background: transparent; - opacity: root.visible ? 1.0 : 0.0; + opacity: root.show ? 1.0 : 0.0; animate opacity { duration: 300ms; easing: ease-in-out; } // Dark backdrop. From 8044c9ddbd974cb3ab1fe2bb561dcaaa90bc617d Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 18:46:43 +0000 Subject: [PATCH 07/14] feat(nodes): emit telemetry from param_bridge for stream view visibility Add TelemetryEmitter to param_bridge that emits 'stt.result' events with text_preview when forwarding UpdateParams containing text. This surfaces transcribed text in the stream view's telemetry timeline. Also add a core::telemetry_tap node to the subtitle sample pipeline between whisper and param_bridge so raw STT results (with segments) appear in telemetry too. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 37 +++++++++++++++++-- .../dynamic/video_moq_webcam_subtitles.yml | 11 +++++- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index 20ec35cd..239ba7c4 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use streamkit_core::telemetry::TelemetryEmitter; use streamkit_core::types::{Packet, PacketType}; use streamkit_core::{ state_helpers, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, @@ -220,6 +221,12 @@ impl ProcessorNode for ParamBridgeNode { )); } + let telemetry = TelemetryEmitter::new( + node_id.clone(), + context.session_id.clone(), + context.telemetry_tx.clone(), + ); + let mut input_rx = context.take_input("in")?; state_helpers::emit_running(&context.state_tx, &node_id); @@ -268,13 +275,13 @@ impl ProcessorNode for ParamBridgeNode { pending_params = Some(params); sleep.as_mut().reset(tokio::time::Instant::now() + d); } else { - Self::send_params(&context, &node_id, target, params).await; + Self::send_params(&context, &telemetry, &node_id, target, params).await; } } () = &mut sleep, if pending_params.is_some() => { if let Some(params) = pending_params.take() { - Self::send_params(&context, &node_id, target, params).await; + Self::send_params(&context, &telemetry, &node_id, target, params).await; } // Reset sleep to far future so it doesn't fire again. sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(86400)); @@ -284,7 +291,7 @@ impl ProcessorNode for ParamBridgeNode { // Flush any pending debounced params before shutting down. if let Some(params) = pending_params.take() { - Self::send_params(&context, &node_id, target, params).await; + Self::send_params(&context, &telemetry, &node_id, target, params).await; } state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); @@ -294,13 +301,35 @@ impl ProcessorNode for ParamBridgeNode { } impl ParamBridgeNode { - async fn send_params(context: &NodeContext, node_id: &str, target: &str, params: JsonValue) { + async fn send_params( + context: &NodeContext, + telemetry: &TelemetryEmitter, + node_id: &str, + target: &str, + params: JsonValue, + ) { tracing::debug!( node = %node_id, target_node = %target, "param_bridge sending UpdateParams" ); + // Emit telemetry so the stream view can display forwarded text. + let text_preview = params + .get("properties") + .and_then(|p| p.get("text")) + .and_then(|t| t.as_str()) + .or_else(|| params.get("text").and_then(|t| t.as_str())); + if let Some(text) = text_preview { + telemetry.emit( + "stt.result", + serde_json::json!({ + "text_preview": text, + "target_node": target, + }), + ); + } + if let Err(e) = context.tune_sibling(target, params).await { tracing::warn!( node = %node_id, diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 36e39633..9dc07560 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -6,7 +6,7 @@ # transcription rendered as a Slint subtitle overlay. # # Data flow (subtitles): -# mic → opus_decoder → resampler → whisper → [best_effort] → +# mic → opus_decoder → resampler → whisper → stt_tap → [best_effort] → # param_bridge → UpdateParams → slint subtitle → compositor layer # # Requires: @@ -79,6 +79,13 @@ nodes: language: en needs: resampler + # Tap whisper output so transcriptions appear in the stream view telemetry. + stt_tap: + kind: core::telemetry_tap + params: + packet_types: ["Transcription"] + needs: whisper + # --- Subtitle rendering (Slint) --- subtitles: @@ -103,7 +110,7 @@ nodes: show: true debounce_ms: 100 needs: - in: whisper + in: stt_tap connection_mode: best_effort # --- Video decode + compositing --- From 2b91476ee86de6a5aa4576d77a3b0c7e8b5012a1 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 18:52:28 +0000 Subject: [PATCH 08/14] feat(pipeline): add VAD filtering and telemetry_out to subtitle pipeline Add Silero VAD configuration to the Whisper node (vad_threshold: 0.4, min_silence_duration_ms: 600) so silence is filtered before inference, improving transcription responsiveness. Replace telemetry_tap with core::telemetry_out (matching other dynamic pipelines like voice-agent-openai and speech-translate) to surface STT results in the stream view telemetry timeline via best_effort side branch. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- .../dynamic/video_moq_webcam_subtitles.yml | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 9dc07560..3d4fee86 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -6,12 +6,15 @@ # transcription rendered as a Slint subtitle overlay. # # Data flow (subtitles): -# mic → opus_decoder → resampler → whisper → stt_tap → [best_effort] → +# mic → opus_decoder → resampler → whisper → [best_effort] → # param_bridge → UpdateParams → slint subtitle → compositor layer # +# whisper → [best_effort] → telemetry_out (stream view) +# # Requires: # - plugin::native::whisper loaded (with a model, e.g. tiny.en) # - plugin::native::slint loaded +# - Silero VAD model: models/silero_vad.onnx # just build-plugin-native-whisper && just build-plugin-native-slint && just copy-plugins-native name: Webcam PiP + Live Subtitles (MoQ) @@ -77,14 +80,23 @@ nodes: params: model_path: models/ggml-tiny.en-q5_1.bin language: en + vad_model_path: models/silero_vad.onnx + vad_threshold: 0.4 + min_silence_duration_ms: 600 + max_segment_duration_secs: 30.0 + emit_vad_events: true + n_threads: 0 needs: resampler - # Tap whisper output so transcriptions appear in the stream view telemetry. - stt_tap: - kind: core::telemetry_tap + # Surface STT results in the stream view telemetry timeline. + stt_telemetry: + kind: core::telemetry_out params: packet_types: ["Transcription"] - needs: whisper + max_events_per_sec: 20 + needs: + node: whisper + mode: best_effort # --- Subtitle rendering (Slint) --- @@ -110,7 +122,7 @@ nodes: show: true debounce_ms: 100 needs: - in: stt_tap + in: whisper connection_mode: best_effort # --- Video decode + compositing --- From 81128d3957e3a2629ac213970ae8d309addb8ddc Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Thu, 9 Apr 2026 19:01:03 +0000 Subject: [PATCH 09/14] fix(nodes): handle control_rx Shutdown in param_bridge select loop Without this, the engine's shutdown_node() always hits the 5-second timeout and force-aborts the node because param_bridge never reads control_rx. This also prevented the pending debounce flush from executing on shutdown. Extracts control_rx from NodeContext before the loop to avoid borrow conflicts with recv_with_cancellation (which borrows context immutably). Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index 239ba7c4..ca21222f 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use streamkit_core::control::NodeControlMessage; use streamkit_core::telemetry::TelemetryEmitter; use streamkit_core::types::{Packet, PacketType}; use streamkit_core::{ @@ -227,6 +228,13 @@ impl ProcessorNode for ParamBridgeNode { context.telemetry_tx.clone(), ); + // Take control_rx out of context so we can select on it alongside + // recv_with_cancellation (which borrows context immutably). + let mut control_rx = { + let (_, rx) = tokio::sync::mpsc::channel(1); + std::mem::replace(&mut context.control_rx, rx) + }; + let mut input_rx = context.take_input("in")?; state_helpers::emit_running(&context.state_tx, &node_id); @@ -250,6 +258,16 @@ impl ProcessorNode for ParamBridgeNode { tokio::select! { biased; + Some(ctrl) = control_rx.recv() => { + match ctrl { + NodeControlMessage::Shutdown => { + tracing::info!(node = %node_id, "param_bridge received shutdown"); + break; + }, + NodeControlMessage::UpdateParams(_) | NodeControlMessage::Start => {}, + } + } + packet = context.recv_with_cancellation(&mut input_rx) => { let Some(packet) = packet else { break; From e7550d8f03a7a2fa3f980c3fe5ed6ec6d57935f7 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 10 Apr 2026 11:30:50 +0000 Subject: [PATCH 10/14] fix(param_bridge): dedup identical params, decouple telemetry, add subtitle transition - Dedup: skip UpdateParams identical to last-sent value to avoid redundant Slint re-renders during VAD boundary refinement. - Telemetry: extract text preview before mapping so it works regardless of the target node's JSON shape (decouples from properties.text). - Debounce reset: use 1-year duration instead of 24h to avoid spurious wakeup on long-running sessions (Duration::MAX overflows Instant). - Docs: add note about raw_payload weight with Transcription packets; explain one-time control_rx swap overhead. - Subtitle transition: fade-in + slide-up when text arrives, fade-out + slide-down when cleared (driven by active = show && text != ""). Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 64 +++++++++++++++++++-------- samples/slint/system/subtitle.slint | 17 ++++--- 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index ca21222f..8734ff07 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -168,6 +168,11 @@ fn apply_template(template: &JsonValue, text: &str) -> JsonValue { } /// Extract the raw JSON payload from a packet (for raw mode). +/// +/// **Note:** `Transcription` packets serialize the full `TranscriptionData` +/// struct (including per-segment timing and confidence). For transcriptions +/// with many segments this can produce a non-trivial JSON tree — prefer +/// `auto` or `template` mode for the subtitle use case. fn raw_payload(packet: &Packet) -> Option { match packet { Packet::Custom(c) => Some(c.data.clone()), @@ -229,7 +234,10 @@ impl ProcessorNode for ParamBridgeNode { ); // Take control_rx out of context so we can select on it alongside - // recv_with_cancellation (which borrows context immutably). + // recv_with_cancellation (which borrows context immutably). The + // dummy channel is a one-time allocation that is never read — + // other nodes avoid this because they don't use + // recv_with_cancellation. let mut control_rx = { let (_, rx) = tokio::sync::mpsc::channel(1); std::mem::replace(&mut context.control_rx, rx) @@ -248,9 +256,15 @@ impl ProcessorNode for ParamBridgeNode { "param_bridge started" ); - // When debouncing is enabled we store the most recent params and only - // send after the window elapses without a new packet arriving. - let mut pending_params: Option = None; + // When debouncing is enabled we store the most recent params (and the + // pre-mapping text preview for telemetry) and only send after the + // window elapses without a new packet arriving. + let mut pending_params: Option<(JsonValue, Option)> = None; + + // Dedup: skip UpdateParams that are identical to the last-sent value. + // This avoids redundant Slint re-renders when Whisper emits duplicate + // segments during VAD boundary refinement. + let mut last_sent: Option = None; let sleep = tokio::time::sleep(tokio::time::Duration::MAX); tokio::pin!(sleep); @@ -273,14 +287,18 @@ impl ProcessorNode for ParamBridgeNode { break; }; + // Extract text preview for telemetry — done before mapping + // so it's independent of the target-specific JSON shape. + let text_preview = extract_text(&packet); + let params = match &self.config.mode { MappingMode::Auto => auto_map(&packet), MappingMode::Template => { - let Some(text) = extract_text(&packet) else { + let Some(ref text) = text_preview else { tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); continue; }; - self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &text)) + self.config.template.as_ref().map(|tmpl| apply_template(tmpl, text)) }, MappingMode::Raw => raw_payload(&packet), }; @@ -290,26 +308,38 @@ impl ProcessorNode for ParamBridgeNode { }; if let Some(d) = debounce { - pending_params = Some(params); + pending_params = Some((params, text_preview)); sleep.as_mut().reset(tokio::time::Instant::now() + d); } else { - Self::send_params(&context, &telemetry, &node_id, target, params).await; + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() == Some(¶ms) { + continue; + } + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; } } () = &mut sleep, if pending_params.is_some() => { - if let Some(params) = pending_params.take() { - Self::send_params(&context, &telemetry, &node_id, target, params).await; + if let Some((params, text_preview)) = pending_params.take() { + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() != Some(¶ms) { + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } } // Reset sleep to far future so it doesn't fire again. - sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(86400)); + // Cannot use Duration::MAX — Instant + Duration::MAX overflows. + sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(365 * 24 * 3600)); } } } // Flush any pending debounced params before shutting down. - if let Some(params) = pending_params.take() { - Self::send_params(&context, &telemetry, &node_id, target, params).await; + if let Some((params, text_preview)) = pending_params.take() { + if last_sent.as_ref() != Some(¶ms) { + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } } state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); @@ -325,6 +355,7 @@ impl ParamBridgeNode { node_id: &str, target: &str, params: JsonValue, + text_preview: Option<&str>, ) { tracing::debug!( node = %node_id, @@ -333,11 +364,8 @@ impl ParamBridgeNode { ); // Emit telemetry so the stream view can display forwarded text. - let text_preview = params - .get("properties") - .and_then(|p| p.get("text")) - .and_then(|t| t.as_str()) - .or_else(|| params.get("text").and_then(|t| t.as_str())); + // text_preview is extracted from the packet before mapping, so it + // works regardless of the target node's expected JSON shape. if let Some(text) = text_preview { telemetry.emit( "stt.result", diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint index e77bc37e..8829cf9e 100644 --- a/samples/slint/system/subtitle.slint +++ b/samples/slint/system/subtitle.slint @@ -5,7 +5,8 @@ // Subtitle overlay for real-time transcription display. // // Renders a semi-transparent panel at the bottom of the frame with -// word-wrapped text and a fade-in animation on updates. +// word-wrapped text. Fades in with a subtle slide-up when text arrives; +// fades out when text is cleared or `show` is set to false. // // Properties updated at runtime via param_bridge → UpdateParams: // text, show @@ -14,26 +15,30 @@ export component Subtitle inherits Window { in property text: ""; in property show: true; + // Active when show is true and there is text to display. + property active: show && text != ""; + width: 1280px; height: 120px; background: transparent; - // Background panel with semi-transparent dark fill. + // Outer container — fades the entire subtitle panel in/out. Rectangle { width: 100%; height: 100%; background: transparent; - opacity: root.show ? 1.0 : 0.0; - animate opacity { duration: 300ms; easing: ease-in-out; } + opacity: root.active ? 1.0 : 0.0; + animate opacity { duration: 400ms; easing: ease-in-out; } - // Dark backdrop. + // Dark backdrop with slide-up transition. Rectangle { x: 40px; - y: 10px; + y: root.active ? 10px : 30px; width: root.width - 80px; height: root.height - 20px; background: #000000cc; border-radius: 8px; + animate y { duration: 400ms; easing: ease-out; } // Subtitle text with word wrap. Text { From bcd8315ce7958f9a0790b2f1e04c8228536851f9 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 10 Apr 2026 11:31:01 +0000 Subject: [PATCH 11/14] style: cargo fmt Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/param_bridge.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs index 8734ff07..02970442 100644 --- a/crates/nodes/src/core/param_bridge.rs +++ b/crates/nodes/src/core/param_bridge.rs @@ -338,7 +338,15 @@ impl ProcessorNode for ParamBridgeNode { // Flush any pending debounced params before shutting down. if let Some((params, text_preview)) = pending_params.take() { if last_sent.as_ref() != Some(¶ms) { - Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + Self::send_params( + &context, + &telemetry, + &node_id, + target, + params, + text_preview.as_deref(), + ) + .await; } } From 6d2350e91495a828f1933c28a08ce81817962d3d Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 10 Apr 2026 12:19:49 +0000 Subject: [PATCH 12/14] fix(sample): remove show:true from subtitle template Let show remain an independent kill switch via controls/API. The Slint active property (show && text != "") already handles auto-hide when there is no text to display. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- samples/pipelines/dynamic/video_moq_webcam_subtitles.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 3d4fee86..60173b67 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -119,7 +119,6 @@ nodes: template: properties: text: "{{ text }}" - show: true debounce_ms: 100 needs: in: whisper From 4bd72ed22f3acb05b3ccc7e0bf0ecc4934dc179a Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 10 Apr 2026 12:28:29 +0000 Subject: [PATCH 13/14] fix(sample): text transition effect + connection_mode syntax - Subtitle: move transition to text element (fade + slide-up), not the background overlay. Backdrop appears/disappears instantly. - Fix connection_mode: was silently ignored at node level; use Map variant syntax (in: {node, mode}) so best_effort is actually applied. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- .../dynamic/video_moq_webcam_subtitles.yml | 5 ++-- samples/slint/system/subtitle.slint | 24 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index 60173b67..c304a25e 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -121,8 +121,9 @@ nodes: text: "{{ text }}" debounce_ms: 100 needs: - in: whisper - connection_mode: best_effort + in: + node: whisper + mode: best_effort # --- Video decode + compositing --- diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint index 8829cf9e..2cf06fff 100644 --- a/samples/slint/system/subtitle.slint +++ b/samples/slint/system/subtitle.slint @@ -4,9 +4,11 @@ // Subtitle overlay for real-time transcription display. // -// Renders a semi-transparent panel at the bottom of the frame with -// word-wrapped text. Fades in with a subtle slide-up when text arrives; -// fades out when text is cleared or `show` is set to false. +// Renders a semi-transparent dark panel at the bottom of the frame. +// The text fades in with a slight upward slide when transcription +// arrives, and fades out when text is cleared. The backdrop itself +// appears/disappears instantly — the transition emphasis is on the +// text content. // // Properties updated at runtime via param_bridge → UpdateParams: // text, show @@ -22,28 +24,25 @@ export component Subtitle inherits Window { height: 120px; background: transparent; - // Outer container — fades the entire subtitle panel in/out. Rectangle { width: 100%; height: 100%; background: transparent; - opacity: root.active ? 1.0 : 0.0; - animate opacity { duration: 400ms; easing: ease-in-out; } - // Dark backdrop with slide-up transition. + // Dark backdrop — appears/disappears with the panel (no animation). Rectangle { x: 40px; - y: root.active ? 10px : 30px; + y: 10px; width: root.width - 80px; height: root.height - 20px; background: #000000cc; border-radius: 8px; - animate y { duration: 400ms; easing: ease-out; } + opacity: root.active ? 1.0 : 0.0; - // Subtitle text with word wrap. + // Subtitle text — fades in with a slight slide-up. Text { x: 20px; - y: 10px; + y: root.active ? 10px : 20px; width: parent.width - 40px; height: parent.height - 20px; text: root.text; @@ -53,6 +52,9 @@ export component Subtitle inherits Window { wrap: word-wrap; vertical-alignment: center; horizontal-alignment: center; + opacity: root.active ? 1.0 : 0.0; + animate opacity { duration: 400ms; easing: ease-in-out; } + animate y { duration: 400ms; easing: ease-out; } } } } From db3a883af5645530534a00c731cc7f29a59d8b35 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 10 Apr 2026 19:24:20 +0000 Subject: [PATCH 14/14] feat(sample): switch subtitle demo from Whisper to Parakeet TDT Parakeet TDT is ~10x faster than Whisper on CPU with competitive accuracy. Updates the subtitle pipeline to use plugin::native::parakeet with the INT8 model and built-in VAD. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- .../dynamic/video_moq_webcam_subtitles.yml | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml index c304a25e..87050b88 100644 --- a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -2,26 +2,28 @@ # # SPDX-License-Identifier: MPL-2.0 -# Live subtitle demo: webcam PiP over colorbars with real-time Whisper +# Live subtitle demo: webcam PiP over colorbars with real-time Parakeet # transcription rendered as a Slint subtitle overlay. # # Data flow (subtitles): -# mic → opus_decoder → resampler → whisper → [best_effort] → +# mic → opus_decoder → resampler → parakeet → [best_effort] → # param_bridge → UpdateParams → slint subtitle → compositor layer # -# whisper → [best_effort] → telemetry_out (stream view) +# parakeet → [best_effort] → telemetry_out (stream view) # # Requires: -# - plugin::native::whisper loaded (with a model, e.g. tiny.en) +# - plugin::native::parakeet loaded (with Parakeet TDT model) # - plugin::native::slint loaded # - Silero VAD model: models/silero_vad.onnx -# just build-plugin-native-whisper && just build-plugin-native-slint && just copy-plugins-native +# just build-plugin-native-parakeet && just build-plugin-native-slint && just copy-plugins-native +# just download-parakeet-models name: Webcam PiP + Live Subtitles (MoQ) description: >- Composites the user's webcam as picture-in-picture over colorbars with - real-time Whisper transcription displayed as subtitles via a Slint overlay. - Demonstrates the param_bridge node for cross-node control messaging. + real-time Parakeet TDT transcription displayed as subtitles via a Slint + overlay. Demonstrates the param_bridge node for cross-node control + messaging. ~10x faster than Whisper on CPU. mode: dynamic client: gateway_path: /moq/video @@ -75,17 +77,15 @@ nodes: target_sample_rate: 16000 needs: opus_decoder - whisper: - kind: plugin::native::whisper + parakeet: + kind: plugin::native::parakeet params: - model_path: models/ggml-tiny.en-q5_1.bin - language: en + model_dir: models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8 + num_threads: 4 + use_vad: true vad_model_path: models/silero_vad.onnx - vad_threshold: 0.4 - min_silence_duration_ms: 600 - max_segment_duration_secs: 30.0 - emit_vad_events: true - n_threads: 0 + vad_threshold: 0.5 + min_silence_duration_ms: 700 needs: resampler # Surface STT results in the stream view telemetry timeline. @@ -95,7 +95,7 @@ nodes: packet_types: ["Transcription"] max_events_per_sec: 20 needs: - node: whisper + node: parakeet mode: best_effort # --- Subtitle rendering (Slint) --- @@ -122,7 +122,7 @@ nodes: debounce_ms: 100 needs: in: - node: whisper + node: parakeet mode: best_effort # --- Video decode + compositing ---