feat(engine,nodes): add cross-node control messaging via param_bridge#280
feat(engine,nodes): add cross-node control messaging via param_bridge#280staging-devin-ai-integration[bot] wants to merge 9 commits intomainfrom
Conversation
Introduces a generalizable pattern for cross-node control messaging
within pipeline graphs, enabling any node to send UpdateParams to
sibling nodes by name.
Phase 1 — Engine control channel in NodeContext:
- Add engine_control_tx: Option<mpsc::Sender<EngineControlMessage>>
field to NodeContext, wired in DynamicEngine::initialize_node()
- Add tune_sibling() convenience method for sending TuneNode messages
- Set to None in oneshot/stateless pipelines (not supported)
Phase 2 — core::param_bridge node:
- Terminal node that bridges data-plane packets to control-plane
UpdateParams messages on a configured target node
- Three mapping modes:
* Auto: smart per-packet-type (Transcription/Text → properties.text,
Custom → forward data as-is)
* Template: user-supplied JSON with {{ text }} placeholders
* Raw: forward extracted payload unchanged
- Designed for best_effort side branches to never stall main data flow
Phase 3 — Compositor word-wrap:
- Add word_wrap: bool field to TextOverlayConfig (default false)
- When true, uses transform.rect.width as wrap boundary
- Backward compatible — existing overlays unchanged
Phase 4 — Demo pipeline + Slint subtitle component:
- samples/slint/system/subtitle.slint: semi-transparent panel with
word-wrapped text and fade animation
- samples/pipelines/dynamic/video_moq_webcam_subtitles.yml: webcam PiP
with Whisper STT → param_bridge → Slint subtitle overlay
Data flow: mic → opus_decoder → resampler → whisper → [best_effort] →
param_bridge → UpdateParams → slint → compositor layer
Signed-off-by: Devin AI <devin@streamkit.dev>
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Fix template mode sending spurious UpdateParams with empty text when receiving unsupported packet types (Audio, Video, Binary). Now skips them consistently with auto and raw modes. Add comprehensive unit tests for all param_bridge helper functions: extract_text, auto_map, apply_template, raw_payload, and config validation (24 tests). Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
crates/core/src/node.rs
Outdated
| tx.send(crate::control::EngineControlMessage::TuneNode { | ||
| node_id: target_node_id.to_string(), | ||
| message: crate::control::NodeControlMessage::UpdateParams(params), | ||
| }) | ||
| .await | ||
| .map_err(|_| "Engine control channel closed".to_string()) |
There was a problem hiding this comment.
🚩 Blocking send in tune_sibling may delay shutdown by up to 2 seconds
The tune_sibling method at crates/core/src/node.rs:381-386 uses tx.send().await on the engine's main control channel (engine_control_tx). During engine shutdown (crates/engine/src/dynamic_actor.rs:1550-1639), the engine stops reading from control_rx while waiting for nodes to exit. If a param_bridge node is mid-send and the channel buffer (128 slots) happens to be full, the node blocks and cannot exit its loop until the engine's 2-second timeout fires and the task is aborted. This produces a misleading warning: "Node did not shut down within 2s, this indicates a bug".
In practice this is unlikely (the 128-capacity channel is rarely full), and the timeout+abort mechanism handles it correctly, but using try_send instead of send().await would make the param_bridge more shutdown-friendly and avoid the false warning. The sample YAML already marks the connection as best_effort, so dropping a single UpdateParams during shutdown is acceptable.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Valid observation. In practice the 128-slot buffer makes this unlikely, and the timeout+abort handles the edge case correctly. Using try_send is a reasonable improvement but would change the semantics slightly (silently dropping messages when the buffer is full during normal operation, not just shutdown). I'd defer this to human review — if the maintainer prefers try_send semantics here I'm happy to switch it.
- Use let-else instead of if-let for template mode extract_text - Move test module to end of file (items_after_test_module) - Allow unwrap_used in test module (matches repo convention) - Remove unused variable in test Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- tune_sibling() now returns Result<(), StreamKitError> instead of String
- Add optional debounce_ms config to coalesce rapid UpdateParams
- Make placeholder matching whitespace-insensitive ({{text}} and {{ text }})
- Document auto_map asymmetry (Slint-oriented default) in MappingMode doc
- Add extension path comment for future placeholders (language, confidence)
- Align error strings between early check and tune_sibling
- Register with StaticPins to fix schema endpoint ERROR log
- Fix sample pipeline: target_sample_rate (not sample_rate/channels),
model_path with tiny model, add debounce_ms to subtitle_bridge
- Add tests for debounce_ms config and whitespace-insensitive placeholders
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Fixes sequential replacement corruption when substituted text contains
the literal string '{{text}}'. Normalize '{{ text }}' → '{{text}}'
first, then replace once.
Adds regression test for this case.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
'visible' is a built-in property on all Slint elements (including
Window). Declaring 'in property <bool> visible' causes a Slint
compilation error ('Cannot override property visible') that was
silently swallowed at the plugin FFI boundary, surfacing only as the
generic 'Plugin failed to create instance' message.
Rename to 'show' (consistent with lower_third.slint) and update the
sample pipeline template to match.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Slint subtitle fix (1560532)Root cause identified: This error was invisible because the Fix: Renamed Verified with a standalone Slint compiler test — |
Add TelemetryEmitter to param_bridge that emits 'stt.result' events with text_preview when forwarding UpdateParams containing text. This surfaces transcribed text in the stream view's telemetry timeline. Also add a core::telemetry_tap node to the subtitle sample pipeline between whisper and param_bridge so raw STT results (with segments) appear in telemetry too. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Add Silero VAD configuration to the Whisper node (vad_threshold: 0.4, min_silence_duration_ms: 600) so silence is filtered before inference, improving transcription responsiveness. Replace telemetry_tap with core::telemetry_out (matching other dynamic pipelines like voice-agent-openai and speech-translate) to surface STT results in the stream view telemetry timeline via best_effort side branch. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Without this, the engine's shutdown_node() always hits the 5-second timeout and force-aborts the node because param_bridge never reads control_rx. This also prevented the pending debounce flush from executing on shutdown. Extracts control_rx from NodeContext before the loop to avoid borrow conflicts with recv_with_cancellation (which borrows context immutably). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Adds a new
core::param_bridgeterminal node that bridges data-plane packets to control-planeUpdateParamsmessages, enabling cross-node parameter control without custom code. This powers use cases like live subtitle overlays where transcription text drives a Slint UI property.Key changes
New node:
core::param_bridge(crates/nodes/src/core/param_bridge.rs)template(JSON template with{{text}}placeholders),raw(forward full packet payload),auto_map(wrap in{ "properties": {...} }for Slint compatibility)debounce_ms) usingtokio::select!with a pinned sleep timer to coalesce rapid updates{{ text }}and{{text}}both work; prevents double-replacement when substituted text contains literal{{text}})register_static_with_description(avoids FFI instantiation)stt.resulttelemetry events withtext_previewwhen forwarding text, so transcribed text appears in the stream view's telemetry timelinecontrol_rxforShutdownto support graceful termination (avoids 5s timeout onRemoveNode)Engine support (
crates/engine/)BestEffortconnection mode — connects when both endpoints exist, silently skips otherwiseconnection_mode: best_effortin pipeline YAMLtune_sibling()error handling (crates/core/src/node.rs)Result<(), StreamKitError>instead ofStringfor consistency with the rest of the codebaseSample pipeline (
samples/pipelines/dynamic/video_moq_webcam_subtitles.yml)vad_threshold: 0.4,min_silence_duration_ms: 600) for faster transcriptioncore::telemetry_outas best_effort side branch surfaces STT results in stream viewSlint subtitle component (
samples/slint/system/subtitle.slint)showproperty (notvisible, which is a reserved Slint built-in — originalvisiblecaused a silent compilation failure at the FFI boundary)Commits
feat(engine,nodes): core param_bridge implementation + BestEffort connectionsfix(nodes): skip unsupported packets in template mode, add unit testsstyle(nodes): fix clippy lintsfix(nodes): address Devin Review findings (debounce, error types, placeholders, auto_map docs, schema endpoint, sample pipeline fixes)fix(nodes): normalize template placeholders before substitution (prevents double-replacement bug)fix(slint): rename reservedvisibleproperty in subtitle.slintfeat(nodes): emit telemetry from param_bridge for stream view visibilityfeat(pipeline): add VAD filtering and telemetry_out to subtitle pipelinefix(nodes): handle control_rx Shutdown in param_bridge select loopReview & Testing Checklist for Human
visible→showrename fixes a Slint compilation error that was silently swallowed at the FFI boundary. Run the subtitle pipeline and confirm the Slint node initializesUpdateParamsstt_telemetry(telemetry_out) and param_bridge telemetry emission should surface transcribed text in the stream view's telemetry timelinevad_threshold: 0.4andmin_silence_duration_ms: 600, silence should be filtered before Whisper inference, improving responsivenessNotes
whisper_backend_init_gpu: no GPU found). The VAD filter reduces wasted inference on silence.min_silence_duration_msandmax_segment_duration_secscan be further tuned.just lintfailure inplugins/native/aac-encoder/src/lib.rsis a pre-existing formatting issue unrelated to this PR.native_source_plugin_entry!macro silently discards pluginnew()errors — thevisibleproperty bug was invisible because of this. A follow-up to surface these errors would improve debuggability.Link to Devin session: https://staging.itsdev.in/sessions/a750af18ee254481a97c4ac581ba129e
Requested by: @streamer45