From ed182a1c1bb18ba83e26856e690d1d5846ea9e97 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 27 Mar 2026 15:43:22 -0600 Subject: [PATCH 01/30] Add Cargo.toml for Rust Client --- libgstc/rust/gstc/Cargo.toml | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 libgstc/rust/gstc/Cargo.toml diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml new file mode 100644 index 00000000..896ce0b6 --- /dev/null +++ b/libgstc/rust/gstc/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "gstc_rust" +version = "0.1.0" +edition = "2021" +description = "Rust client for RidgeRun GStreamer Daemon (GstD)" +license = "BSD-3-Clause" + +[dependencies] From 5e41bd04390a6dd28f7d06ddc0af8efd013861af Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 27 Mar 2026 15:46:48 -0600 Subject: [PATCH 02/30] Add Rust client source code --- libgstc/rust/gstc/README.md | 16 + libgstc/rust/gstc/src/client.rs | 457 +++++++++++++++++++++++++++++ libgstc/rust/gstc/src/json.rs | 211 +++++++++++++ libgstc/rust/gstc/src/lib.rs | 39 +++ libgstc/rust/gstc/src/status.rs | 85 ++++++ libgstc/rust/gstc/src/transport.rs | 153 ++++++++++ 6 files changed, 961 insertions(+) create mode 100644 libgstc/rust/gstc/README.md create mode 100644 libgstc/rust/gstc/src/client.rs create mode 100644 libgstc/rust/gstc/src/json.rs create mode 100644 libgstc/rust/gstc/src/lib.rs create mode 100644 libgstc/rust/gstc/src/status.rs create mode 100644 libgstc/rust/gstc/src/transport.rs diff --git a/libgstc/rust/gstc/README.md b/libgstc/rust/gstc/README.md new file mode 100644 index 00000000..fa38778d --- /dev/null +++ b/libgstc/rust/gstc/README.md @@ -0,0 +1,16 @@ +# gstc_rust (Rust) + +Rust client library for RidgeRun GstD. + +## Build and run + +### Cargo + +A `Cargo.toml` is maintained for crate workflows, for example, (`cargo run`, `cargo test`, `cargo fmt`, `cargo clippy`). The crate can be added as a dependency in another Cargo project: + +```bash +[dependencies] +gstc_rust = { path = "" } +``` + + diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs new file mode 100644 index 00000000..d951e40f --- /dev/null +++ b/libgstc/rust/gstc/src/client.rs @@ -0,0 +1,457 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::json::{json_child_char_array, json_child_string, json_get_int, json_is_null_field}; +use crate::transport::{ConnectionSettings, Transport}; +use crate::Status; +use std::sync::Mutex; +use std::thread; +use std::thread::JoinHandle; + +pub struct Client { + settings: ConnectionSettings, + transport: Mutex, +} + +#[derive(Debug, Clone)] +pub struct BusMessage { + pub status: Status, + pub raw_response: String, +} + +impl Client { + pub fn new( + address: impl Into, + port: u16, + wait_time_ms: i32, + keep_connection_open: bool, + ) -> Result { + let settings = ConnectionSettings { + address: address.into(), + port, + wait_time_ms, + keep_connection_open, + }; + + let transport = Transport::new(settings.clone())?; + + Ok(Self { + settings, + transport: Mutex::new(transport), + }) + } + + pub fn ping(&self) -> Result<(), Status> { + self.cmd_send("read /") + } + + pub fn debug(&self, threshold: &str, colors: bool, reset: bool) -> Result<(), Status> { + self.cmd_update("/debug/enable", "true")?; + self.cmd_update("/debug/threshold", threshold)?; + self.cmd_update("/debug/color", bool_str(colors))?; + self.cmd_update("/debug/reset", bool_str(reset)) + } + + pub fn pipeline_create(&self, pipeline_name: &str, pipeline_desc: &str) -> Result<(), Status> { + self.cmd_create( + "/pipelines", + &format!("{} {}", pipeline_name, pipeline_desc), + ) + } + + pub fn pipeline_create_ref( + &self, + pipeline_name: &str, + pipeline_desc: &str, + ) -> Result<(), Status> { + self.cmd_send(&format!( + "pipeline_crete_ref {} {}", + pipeline_name, pipeline_desc + )) + } + + pub fn pipeline_delete(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_delete("/pipelines", pipeline_name) + } + + pub fn pipeline_delete_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_delete_ref {}", pipeline_name)) + } + + pub fn pipeline_play(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "playing") + } + + pub fn pipeline_play_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_play_ref {}", pipeline_name)) + } + + pub fn pipeline_pause(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "paused") + } + + pub fn pipeline_stop(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "null") + } + + pub fn pipeline_stop_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_stop_ref {}", pipeline_name)) + } + + pub fn pipeline_get_graph(&self, pipeline_name: &str) -> Result { + self.cmd_read( + &format!("/pipelines/{}/graph", pipeline_name), + self.settings.wait_time_ms, + ) + } + + pub fn pipeline_get_state(&self, pipeline_name: &str) -> Result { + let response = self.cmd_read( + &format!("/pipelines/{}/state", pipeline_name), + self.settings.wait_time_ms, + )?; + + json_child_string(&response, "response", "value") + } + + pub fn pipeline_verbose(&self, pipeline_name: &str, value: bool) -> Result<(), Status> { + self.cmd_update( + &format!("/pipelines/{}/verbose", pipeline_name), + bool_str(value), + ) + } + + pub fn element_get( + &self, + pipeline_name: &str, + element: &str, + property: &str, + ) -> Result { + let response = self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/properties/{}", + pipeline_name, element, property + ), + self.settings.wait_time_ms, + )?; + + json_child_string(&response, "response", "value") + } + + pub fn element_set( + &self, + pipeline_name: &str, + element: &str, + property: &str, + value: &str, + ) -> Result<(), Status> { + let _ = self.cmd_update( + &format!( + "/pipelines/{}/elements/{}/properties/{}", + pipeline_name, element, property + ), + value, + ); + + Ok(()) + } + + pub fn element_properties_list( + &self, + pipeline_name: &str, + element: &str, + ) -> Result, Status> { + let response = self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/properties", + pipeline_name, element + ), + self.settings.wait_time_ms, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_flush_start(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + "flush_start", + ) + } + + pub fn pipeline_flush_stop(&self, pipeline_name: &str, reset: bool) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + &format!("flush_stop {}", bool_str(reset)), + ) + } + + pub fn pipeline_inject_eos(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_create(&format!("/pipelines/{}/event", pipeline_name), "eos") + } + + #[allow(clippy::too_many_arguments)] + pub fn pipeline_seek( + &self, + pipeline_name: &str, + rate: f64, + format: i32, + flags: i32, + start_type: i32, + start: i64, + stop_type: i32, + stop: i64, + ) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + &format!( + "seek {:.6} {} {} {} {} {} {}", + rate, format, flags, start_type, start, stop_type, stop + ), + ) + } + + pub fn pipeline_list_elements(&self, pipeline_name: &str) -> Result, Status> { + let response = self.cmd_read( + &format!("/pipelines/{}/elements/", pipeline_name), + self.settings.wait_time_ms, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_list(&self) -> Result, Status> { + let response = self.cmd_read("/pipelines", self.settings.wait_time_ms)?; + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_bus_wait( + &self, + pipeline_name: &str, + message_name: &str, + timeout_ns: i64, + ) -> Result { + self.cmd_update( + &format!("/pipelines/{}/bus/types", pipeline_name), + message_name, + )?; + self.cmd_update( + &format!("/pipelines/{}/bus/timeout", pipeline_name), + &format!("{}", timeout_ns), + )?; + + let raw = self.cmd_read(&format!("/pipelines/{}/bus/message", pipeline_name), -1)?; + + let status = match json_is_null_field(&raw, "response") { + Ok(is_null) if is_null => Status::BUS_TIMEOUT, + Ok(_) => Status::OK, + Err(err) => err, + }; + + Ok(BusMessage { + status, + raw_response: raw, + }) + } + + pub fn pipeline_bus_wait_async( + &self, + pipeline_name: String, + message_name: String, + timeout_ns: i64, + callback: F, + ) -> Result, Status> + where + F: FnOnce(BusMessage) + Send + 'static, + { + self.cmd_update( + &format!("/pipelines/{}/bus/types", pipeline_name), + &message_name, + )?; + self.cmd_update( + &format!("/pipelines/{}/bus/timeout", pipeline_name), + &format!("{}", timeout_ns), + )?; + + let settings = ConnectionSettings { + keep_connection_open: false, + ..self.settings.clone() + }; + + let handle = thread::Builder::new() + .name("gstc-bus-wait".to_string()) + .spawn(move || { + let client = match Client::new( + settings.address, + settings.port, + settings.wait_time_ms, + settings.keep_connection_open, + ) { + Ok(client) => client, + Err(err) => return err, + }; + + match client.pipeline_bus_wait(&pipeline_name, &message_name, timeout_ns) { + Ok(message) => { + let status = message.status; + callback(message); + status + } + Err(err) => err, + } + }) + .map_err(|_| Status::THREAD_ERROR)?; + + Ok(handle) + } + + pub fn pipeline_emit_action( + &self, + pipeline_name: &str, + element: &str, + action: &str, + ) -> Result<(), Status> { + self.cmd_create( + &format!( + "/pipelines/{}/elements/{}/actions/{}", + pipeline_name, element, action + ), + action, + ) + } + + pub fn pipeline_list_signals( + &self, + pipeline_name: &str, + element: &str, + ) -> Result, Status> { + let response = self.cmd_read( + &format!("/pipelines/{}/elements/{}/signals", pipeline_name, element), + self.settings.wait_time_ms, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_signal_connect( + &self, + pipeline_name: &str, + element: &str, + signal: &str, + timeout: i32, + ) -> Result { + self.cmd_update( + &format!( + "/pipelines/{}/elements/{}/signals/{}/timeout", + pipeline_name, element, signal + ), + &format!("{}", timeout), + )?; + + self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/signals/{}/callback", + pipeline_name, element, signal + ), + self.settings.wait_time_ms, + ) + } + + pub fn pipeline_signal_disconnect( + &self, + pipeline_name: &str, + element: &str, + signal: &str, + ) -> Result<(), Status> { + let _ = self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/signals/{}/disconnect", + pipeline_name, element, signal + ), + self.settings.wait_time_ms, + )?; + Ok(()) + } + + fn cmd_send(&self, request: &str) -> Result<(), Status> { + let response = self.send_request(request, self.settings.wait_time_ms)?; + let code = json_get_int(&response, "code")?; + let status = Status(code); + + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } + + fn cmd_send_get_response(&self, request: &str, timeout_ms: i32) -> Result { + let response = self.send_request(request, timeout_ms)?; + let code = json_get_int(&response, "code")?; + let status = Status(code); + + if status.is_ok() { + Ok(response) + } else { + Err(status) + } + } + + fn cmd_create(&self, where_: &str, what: &str) -> Result<(), Status> { + self.cmd_send(&format!("create {} {}", where_, what)) + } + + fn cmd_read(&self, what: &str, timeout_ms: i32) -> Result { + self.cmd_send_get_response(&format!("read {}", what), timeout_ms) + } + + fn cmd_update(&self, what: &str, how: &str) -> Result<(), Status> { + self.cmd_send(&format!("update {} {}", what, how)) + } + + fn cmd_delete(&self, where_: &str, what: &str) -> Result<(), Status> { + self.cmd_send(&format!("delete {} {}", where_, what)) + } + + fn send_request(&self, request: &str, timeout_ms: i32) -> Result { + let mut guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + guard.send_command(request, timeout_ms) + } +} + +fn bool_str(value: bool) -> &'static str { + if value { + "true" + } else { + "false" + } +} diff --git a/libgstc/rust/gstc/src/json.rs b/libgstc/rust/gstc/src/json.rs new file mode 100644 index 00000000..2db037a2 --- /dev/null +++ b/libgstc/rust/gstc/src/json.rs @@ -0,0 +1,211 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::Status; + +pub(crate) fn json_get_int(json: &str, name: &str) -> Result { + let start = find_key_value_start(json, name).ok_or(Status::NOT_FOUND)?; + parse_json_int(json, start) +} + +pub(crate) fn json_is_null_field(json: &str, name: &str) -> Result { + let start = find_key_value_start(json, name).ok_or(Status::NOT_FOUND)?; + let start = skip_ws(json, start); + Ok(json[start..].starts_with("null")) +} + +pub(crate) fn json_child_string( + json: &str, + parent_name: &str, + data_name: &str, +) -> Result { + let parent = extract_object_for_key(json, parent_name)?; + extract_string_for_key(parent, data_name) +} + +pub(crate) fn json_child_char_array( + json: &str, + parent_name: &str, + array_name: &str, + element_name: &str, +) -> Result, Status> { + let parent = extract_object_for_key(json, parent_name)?; + let array = extract_array_for_key(parent, array_name)?; + + let mut out = Vec::new(); + let mut cursor = 1usize; + while cursor < array.len() { + let cursor_ws = skip_ws(array, cursor); + if cursor_ws >= array.len() || array.as_bytes()[cursor_ws] == b']' { + break; + } + + if array.as_bytes()[cursor_ws] != b'{' { + return Err(Status::TYPE_ERROR); + } + + let (obj, next_idx) = extract_balanced(array, cursor_ws, b'{', b'}')?; + out.push(extract_string_for_key(obj, element_name)?); + cursor = skip_past_comma(array, next_idx); + } + + Ok(out) +} + +fn skip_ws(s: &str, mut idx: usize) -> usize { + while idx < s.len() && s.as_bytes()[idx].is_ascii_whitespace() { + idx += 1; + } + idx +} + +fn skip_past_comma(s: &str, idx: usize) -> usize { + let idx = skip_ws(s, idx); + if idx < s.len() && s.as_bytes()[idx] == b',' { + idx + 1 + } else { + idx + } +} + +fn find_key_value_start(json: &str, key: &str) -> Option { + let pattern = format!("\"{}\"", key); + if let Some(found) = json.find(&pattern) { + let key_idx = found + pattern.len(); + let colon_idx = json[key_idx..].find(':')?; + return Some(key_idx + colon_idx + 1); + } + None +} + +fn parse_json_int(json: &str, start: usize) -> Result { + let start = skip_ws(json, start); + if start >= json.len() { + return Err(Status::MALFORMED); + } + + let bytes = json.as_bytes(); + let mut end = start; + if bytes[end] == b'-' { + end += 1; + } + while end < json.len() && bytes[end].is_ascii_digit() { + end += 1; + } + + if end == start || (end == start + 1 && bytes[start] == b'-') { + return Err(Status::TYPE_ERROR); + } + + json[start..end] + .parse::() + .map_err(|_| Status::TYPE_ERROR) +} + +fn extract_balanced( + json: &str, + start: usize, + open: u8, + close: u8, +) -> Result<(&str, usize), Status> { + let bytes = json.as_bytes(); + if start >= bytes.len() || bytes[start] != open { + return Err(Status::TYPE_ERROR); + } + + let mut depth = 0i32; + let mut i = start; + let mut in_string = false; + let mut escaped = false; + while i < bytes.len() { + let b = bytes[i]; + if in_string { + if escaped { + escaped = false; + } else if b == b'\\' { + escaped = true; + } else if b == b'"' { + in_string = false; + } + } else if b == b'"' { + in_string = true; + } else if b == open { + depth += 1; + } else if b == close { + depth -= 1; + if depth == 0 { + return Ok((&json[start..=i], i + 1)); + } + } + i += 1; + } + + Err(Status::MALFORMED) +} + +fn extract_object_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { + let start = find_key_value_start(json, key).ok_or(Status::NOT_FOUND)?; + let start = skip_ws(json, start); + let (obj, _) = extract_balanced(json, start, b'{', b'}')?; + Ok(obj) +} + +fn extract_array_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { + let start = find_key_value_start(json, key).ok_or(Status::TYPE_ERROR)?; + let start = skip_ws(json, start); + let (arr, _) = extract_balanced(json, start, b'[', b']')?; + Ok(arr) +} + +fn extract_string_for_key(json: &str, key: &str) -> Result { + let start = find_key_value_start(json, key).ok_or(Status::NOT_FOUND)?; + let start = skip_ws(json, start); + if start >= json.len() || json.as_bytes()[start] != b'"' { + return Err(Status::TYPE_ERROR); + } + + let mut i = start + 1; + let mut escaped = false; + while i < json.len() { + let b = json.as_bytes()[i]; + if escaped { + escaped = false; + } else if b == b'\\' { + escaped = true; + } else if b == b'"' { + return Ok(json[start + 1..i].to_string()); + } + i += 1; + } + + Err(Status::MALFORMED) +} diff --git a/libgstc/rust/gstc/src/lib.rs b/libgstc/rust/gstc/src/lib.rs new file mode 100644 index 00000000..15b36efe --- /dev/null +++ b/libgstc/rust/gstc/src/lib.rs @@ -0,0 +1,39 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +mod client; +mod json; +mod status; +mod transport; + +pub use client::{BusMessage, Client}; +pub use status::Status; diff --git a/libgstc/rust/gstc/src/status.rs b/libgstc/rust/gstc/src/status.rs new file mode 100644 index 00000000..f91e1317 --- /dev/null +++ b/libgstc/rust/gstc/src/status.rs @@ -0,0 +1,85 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use std::fmt; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct Status(pub i32); + +impl Status { + pub const OK: Status = Status(0); + pub const NULL_ARGUMENT: Status = Status(-1); + pub const UNREACHABLE: Status = Status(-2); + pub const TIMEOUT: Status = Status(-3); + pub const OOM: Status = Status(-4); + pub const TYPE_ERROR: Status = Status(-5); + pub const MALFORMED: Status = Status(-6); + pub const NOT_FOUND: Status = Status(-7); + pub const SEND_ERROR: Status = Status(-8); + pub const RECV_ERROR: Status = Status(-9); + pub const SOCKET_ERROR: Status = Status(-10); + pub const THREAD_ERROR: Status = Status(-11); + pub const BUS_TIMEOUT: Status = Status(-12); + pub const SOCKET_TIMEOUT: Status = Status(-13); + pub const LONG_RESPONSE: Status = Status(-14); + + pub fn is_ok(self) -> bool { + self == Status::OK + } +} + +impl fmt::Display for Status { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = match *self { + Status::OK => "GSTC_OK", + Status::NULL_ARGUMENT => "GSTC_NULL_ARGUMENT", + Status::UNREACHABLE => "GSTC_UNREACHABLE", + Status::TIMEOUT => "GSTC_TIMEOUT", + Status::OOM => "GSTC_OOM", + Status::TYPE_ERROR => "GSTC_TYPE_ERROR", + Status::MALFORMED => "GSTC_MALFORMED", + Status::NOT_FOUND => "GSTC_NOT_FOUND", + Status::SEND_ERROR => "GSTC_SEND_ERROR", + Status::RECV_ERROR => "GSTC_RECV_ERROR", + Status::SOCKET_ERROR => "GSTC_SOCKET_ERROR", + Status::THREAD_ERROR => "GSTC_THREAD_ERROR", + Status::BUS_TIMEOUT => "GSTC_BUS_TIMEOUT", + Status::SOCKET_TIMEOUT => "GSTC_SOCKET_TIMEOUT", + Status::LONG_RESPONSE => "GSTC_LONG_RESPONSE", + _ => "GSTC_UNKNOWN", + }; + + write!(f, "{} ({})", name, self.0) + } +} + +impl std::error::Error for Status {} diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs new file mode 100644 index 00000000..0685bf6c --- /dev/null +++ b/libgstc/rust/gstc/src/transport.rs @@ -0,0 +1,153 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::Status; +use std::io::{Read, Write}; +use std::net::{Shutdown, TcpStream}; +use std::time::Duration; + +const MAX_RESPONSE_LENGTH: usize = 10 * 1024 * 1024; + +#[derive(Clone, Debug)] +pub(crate) struct ConnectionSettings { + pub(crate) address: String, + pub(crate) port: u16, + pub(crate) wait_time_ms: i32, + pub(crate) keep_connection_open: bool, +} + +pub(crate) struct Transport { + settings: ConnectionSettings, + stream: Option, +} + +impl Transport { + pub(crate) fn new(settings: ConnectionSettings) -> Result { + let mut transport = Self { + settings, + stream: None, + }; + + if transport.settings.keep_connection_open { + let stream = transport.open_socket()?; + transport.stream = Some(stream); + } + + Ok(transport) + } + + fn open_socket(&self) -> Result { + TcpStream::connect((self.settings.address.as_str(), self.settings.port)) + .map_err(|_| Status::UNREACHABLE) + } + + pub(crate) fn send_command( + &mut self, + request: &str, + timeout_ms: i32, + ) -> Result { + if request.is_empty() { + return Err(Status::NULL_ARGUMENT); + } + + if self.settings.keep_connection_open { + if self.stream.is_none() { + let stream = self.open_socket()?; + self.stream = Some(stream); + } + + if let Some(stream) = self.stream.as_mut() { + Self::write_then_read(stream, request, timeout_ms) + } else { + Err(Status::SOCKET_ERROR) + } + } else { + let mut stream = self.open_socket()?; + let ret = Self::write_then_read(&mut stream, request, timeout_ms); + let _ = stream.shutdown(Shutdown::Both); + ret + } + } + + fn write_then_read( + stream: &mut TcpStream, + request: &str, + timeout_ms: i32, + ) -> Result { + if timeout_ms < 0 { + stream + .set_read_timeout(None) + .map_err(|_| Status::SOCKET_ERROR)?; + } else { + stream + .set_read_timeout(Some(Duration::from_millis(timeout_ms as u64))) + .map_err(|_| Status::SOCKET_ERROR)?; + } + + stream + .write_all(request.as_bytes()) + .map_err(|_| Status::SEND_ERROR)?; + + let mut response = Vec::::new(); + let mut buffer = [0_u8; 1024]; + + loop { + let n = match stream.read(&mut buffer) { + Ok(0) => return Err(Status::RECV_ERROR), + Ok(n) => n, + Err(err) if err.kind() == std::io::ErrorKind::TimedOut => { + return Err(Status::SOCKET_TIMEOUT) + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + return Err(Status::SOCKET_TIMEOUT) + } + Err(_) => return Err(Status::RECV_ERROR), + }; + + if let Some(zero_idx) = buffer[..n].iter().position(|b| *b == 0) { + response.extend_from_slice(&buffer[..zero_idx]); + break; + } + + response.extend_from_slice(&buffer[..n]); + if response.len() >= MAX_RESPONSE_LENGTH { + return Err(Status::LONG_RESPONSE); + } + } + + if response.len() >= MAX_RESPONSE_LENGTH { + return Err(Status::LONG_RESPONSE); + } + + String::from_utf8(response).map_err(|_| Status::MALFORMED) + } +} From 1caf51e24c6d649782935274d0f06a0ff2cd22f3 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 27 Mar 2026 16:01:47 -0600 Subject: [PATCH 03/30] Add examples for Rust client --- examples/libgstc/meson.build | 2 + .../rust/dynamic_property_change_rust.rs | 63 +++++++++++++++ .../libgstc/rust/gapless_playback_rust.rs | 78 +++++++++++++++++++ examples/libgstc/rust/mp4_recording_rust.rs | 67 ++++++++++++++++ .../libgstc/rust/pipeline_lifecycle_rust.rs | 59 ++++++++++++++ examples/libgstc/rust/simple_pipeline_rust.rs | 47 +++++++++++ .../rust/videotest_autovideosink_rust.rs | 45 +++++++++++ libgstc/meson.build | 1 + libgstc/rust/gstc/Cargo.toml | 24 ++++++ libgstc/rust/gstc/README.md | 11 +++ libgstc/rust/meson.build | 7 ++ 11 files changed, 404 insertions(+) create mode 100644 examples/libgstc/rust/dynamic_property_change_rust.rs create mode 100644 examples/libgstc/rust/gapless_playback_rust.rs create mode 100644 examples/libgstc/rust/mp4_recording_rust.rs create mode 100644 examples/libgstc/rust/pipeline_lifecycle_rust.rs create mode 100644 examples/libgstc/rust/simple_pipeline_rust.rs create mode 100644 examples/libgstc/rust/videotest_autovideosink_rust.rs create mode 100644 libgstc/rust/meson.build diff --git a/examples/libgstc/meson.build b/examples/libgstc/meson.build index 4e79628d..3a6d7782 100644 --- a/examples/libgstc/meson.build +++ b/examples/libgstc/meson.build @@ -12,3 +12,5 @@ foreach app : app_examples dependencies : [gst_client_deps,lib_gstc_dep], install: false) endforeach + +subdir('rust') diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs new file mode 100644 index 00000000..6b165ec2 --- /dev/null +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -0,0 +1,63 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; +use std::io; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.pipeline_create("pipe", "videotestsrc name=vts ! autovideosink")?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop the pipeline..."); + let (tx, rx) = mpsc::channel::<()>(); + thread::spawn(move || { + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + let _ = tx.send(()); + }); + + let mut format = 0; + loop { + client.element_set("pipe", "vts", "pattern", &format.to_string())?; + format = (format + 1) % 10; + + if rx.try_recv().is_ok() { + break; + } + + thread::sleep(Duration::from_secs(1)); + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs new file mode 100644 index 00000000..3402d91c --- /dev/null +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -0,0 +1,78 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; +use std::env; +use std::io; +use std::path::PathBuf; +use std::sync::mpsc; +use std::thread; + +fn main() -> Result<(), Status> { + let video = match env::args().nth(1) { + Some(path) => path, + None => { + eprintln!("Please provide a video to play"); + return Err(Status::NULL_ARGUMENT); + } + }; + + let abs_path = PathBuf::from(&video) + .canonicalize() + .unwrap_or_else(|_| PathBuf::from(video)); + let uri = format!("file://{}", abs_path.display()); + + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.pipeline_create("pipe", &format!("playbin uri={}", uri))?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop the pipeline..."); + let (tx, rx) = mpsc::channel::<()>(); + thread::spawn(move || { + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + let _ = tx.send(()); + }); + + while rx.try_recv().is_err() { + let message = client.pipeline_bus_wait("pipe", "eos", -1)?; + if message.status != Status::OK { + eprintln!("Unable to read from bus: {}", message.status.0); + break; + } + + println!("EOS message received!"); + + client.pipeline_seek("pipe", 1.0, 3, 1, 1, 0, 1, -1)?; + println!("Pipeline reset!"); + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs new file mode 100644 index 00000000..2275994b --- /dev/null +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -0,0 +1,67 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; +use std::io; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.pipeline_create( + "pipe", + "qtmux name=mux ! filesink location=mp4_recording.mp4 \ + videotestsrc is-live=true ! avenc_mpeg4 ! mux. \ + audiotestsrc is-live=true ! lamemp3enc ! mux.", + )?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop pipeline..."); + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + + client.pipeline_inject_eos("pipe")?; + println!("EOS sent!"); + + print!("Waiting for EOS... "); + let bus_message = client.pipeline_bus_wait("pipe", "eos", 10_000_000_000)?; + if bus_message.status == Status::OK { + println!("received!"); + } else if bus_message.status == Status::BUS_TIMEOUT { + println!("timeout!"); + eprintln!("EOS not received, file may be unreadable"); + } else { + println!("error!"); + eprintln!( + "An error occurred waiting for EOS: {}", + bus_message.status.0 + ); + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs new file mode 100644 index 00000000..51228bf3 --- /dev/null +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -0,0 +1,59 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; +use std::thread; +use std::time::{Duration, Instant}; + +fn wait_for_state( + client: &Client, + pipeline_name: &str, + expected: &str, + timeout: Duration, +) -> Result { + let start = Instant::now(); + loop { + let state = client.pipeline_get_state(pipeline_name)?; + if state == expected { + return Ok(state); + } + + if start.elapsed() >= timeout { + return Err(Status::TIMEOUT); + } + + thread::sleep(Duration::from_millis(100)); + } +} + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, 5000, false)?; + + client.pipeline_create("pipe", "videotestsrc ! fakesink")?; + client.pipeline_play("pipe")?; + + let state = wait_for_state(&client, "pipe", "PLAYING", Duration::from_secs(5))?; + println!("pipeline state: {}", state); + + client.pipeline_stop("pipe")?; + client.pipeline_delete("pipe")?; + + Ok(()) +} diff --git a/examples/libgstc/rust/simple_pipeline_rust.rs b/examples/libgstc/rust/simple_pipeline_rust.rs new file mode 100644 index 00000000..6689cf7a --- /dev/null +++ b/examples/libgstc/rust/simple_pipeline_rust.rs @@ -0,0 +1,47 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; +use std::io; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.ping()?; + println!("GStreamer daemon is alive!"); + + client.pipeline_create("pipe", "videotestsrc ! autovideosink")?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop pipeline..."); + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/videotest_autovideosink_rust.rs b/examples/libgstc/rust/videotest_autovideosink_rust.rs new file mode 100644 index 00000000..d2f8453f --- /dev/null +++ b/examples/libgstc/rust/videotest_autovideosink_rust.rs @@ -0,0 +1,45 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc_rust::{Client, Status}; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, 5000, false)?; + + let pipeline_name = "video"; + let pipeline_desc = "videotestsrc num-buffers=300 ! videoconvert ! autovideosink"; + + client.pipeline_create(pipeline_name, pipeline_desc)?; + client.pipeline_play(pipeline_name)?; + + let bus_message = client.pipeline_bus_wait(pipeline_name, "eos", -1)?; + if bus_message.status != Status::OK { + let _ = client.pipeline_stop(pipeline_name); + let _ = client.pipeline_delete(pipeline_name); + return Err(bus_message.status); + } + + println!("received EOS: {}", bus_message.raw_response); + + client.pipeline_stop(pipeline_name)?; + client.pipeline_delete(pipeline_name)?; + + Ok(()) +} diff --git a/libgstc/meson.build b/libgstc/meson.build index 582cbb8f..228c09de 100644 --- a/libgstc/meson.build +++ b/libgstc/meson.build @@ -1,4 +1,5 @@ subdir('c') +subdir('rust') if not get_option('enable-python').disabled() subdir('python') endif diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml index 896ce0b6..cf4b8e60 100644 --- a/libgstc/rust/gstc/Cargo.toml +++ b/libgstc/rust/gstc/Cargo.toml @@ -6,3 +6,27 @@ description = "Rust client for RidgeRun GStreamer Daemon (GstD)" license = "BSD-3-Clause" [dependencies] + +[[example]] +name = "pipeline_lifecycle" +path = "../../../examples/libgstc/rust/pipeline_lifecycle_rust.rs" + +[[example]] +name = "videotest_autovideosink" +path = "../../../examples/libgstc/rust/videotest_autovideosink_rust.rs" + +[[example]] +name = "simple_pipeline" +path = "../../../examples/libgstc/rust/simple_pipeline_rust.rs" + +[[example]] +name = "dynamic_property_change" +path = "../../../examples/libgstc/rust/dynamic_property_change_rust.rs" + +[[example]] +name = "gapless_playback" +path = "../../../examples/libgstc/rust/gapless_playback_rust.rs" + +[[example]] +name = "mp4_recording" +path = "../../../examples/libgstc/rust/mp4_recording_rust.rs" \ No newline at end of file diff --git a/libgstc/rust/gstc/README.md b/libgstc/rust/gstc/README.md index fa38778d..d979a6a4 100644 --- a/libgstc/rust/gstc/README.md +++ b/libgstc/rust/gstc/README.md @@ -4,6 +4,17 @@ Rust client library for RidgeRun GstD. ## Build and run +### Meson (project-integrated build) + +From repo root: + +```bash +meson setup build +meson compile -C build +``` + +Rust examples are built as Meson executables under `build/examples/libgstc/rust/`. + ### Cargo A `Cargo.toml` is maintained for crate workflows, for example, (`cargo run`, `cargo test`, `cargo fmt`, `cargo clippy`). The crate can be added as a dependency in another Cargo project: diff --git a/libgstc/rust/meson.build b/libgstc/rust/meson.build new file mode 100644 index 00000000..c3544717 --- /dev/null +++ b/libgstc/rust/meson.build @@ -0,0 +1,7 @@ +rust_enabled = add_languages('rust', required : false) + +if rust_enabled + subdir('gstc') +else + message('Rust compiler not found; skipping libgstc Rust API') +endif From e6ae8dd400c1c649448fff93950241aeb3658e4f Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 08:47:50 -0600 Subject: [PATCH 04/30] Update license header --- examples/libgstc/rust/dynamic_property_change_rust.rs | 2 +- examples/libgstc/rust/gapless_playback_rust.rs | 2 +- examples/libgstc/rust/mp4_recording_rust.rs | 2 +- examples/libgstc/rust/pipeline_lifecycle_rust.rs | 2 +- examples/libgstc/rust/simple_pipeline_rust.rs | 2 +- examples/libgstc/rust/videotest_autovideosink_rust.rs | 2 +- libgstc/rust/gstc/src/client.rs | 2 +- libgstc/rust/gstc/src/json.rs | 2 +- libgstc/rust/gstc/src/lib.rs | 2 +- libgstc/rust/gstc/src/status.rs | 2 +- libgstc/rust/gstc/src/transport.rs | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs index 6b165ec2..23abe2a5 100644 --- a/examples/libgstc/rust/dynamic_property_change_rust.rs +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index 3402d91c..3473d0e4 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index 2275994b..0c51cbf1 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs index 51228bf3..bf26bb5e 100644 --- a/examples/libgstc/rust/pipeline_lifecycle_rust.rs +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/examples/libgstc/rust/simple_pipeline_rust.rs b/examples/libgstc/rust/simple_pipeline_rust.rs index 6689cf7a..d7599295 100644 --- a/examples/libgstc/rust/simple_pipeline_rust.rs +++ b/examples/libgstc/rust/simple_pipeline_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/examples/libgstc/rust/videotest_autovideosink_rust.rs b/examples/libgstc/rust/videotest_autovideosink_rust.rs index d2f8453f..71ac50b6 100644 --- a/examples/libgstc/rust/videotest_autovideosink_rust.rs +++ b/examples/libgstc/rust/videotest_autovideosink_rust.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index d951e40f..1c65482a 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are diff --git a/libgstc/rust/gstc/src/json.rs b/libgstc/rust/gstc/src/json.rs index 2db037a2..660ff4fe 100644 --- a/libgstc/rust/gstc/src/json.rs +++ b/libgstc/rust/gstc/src/json.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are diff --git a/libgstc/rust/gstc/src/lib.rs b/libgstc/rust/gstc/src/lib.rs index 15b36efe..3bf2d84f 100644 --- a/libgstc/rust/gstc/src/lib.rs +++ b/libgstc/rust/gstc/src/lib.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are diff --git a/libgstc/rust/gstc/src/status.rs b/libgstc/rust/gstc/src/status.rs index f91e1317..3c2b89b4 100644 --- a/libgstc/rust/gstc/src/status.rs +++ b/libgstc/rust/gstc/src/status.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs index 0685bf6c..49a307f2 100644 --- a/libgstc/rust/gstc/src/transport.rs +++ b/libgstc/rust/gstc/src/transport.rs @@ -1,6 +1,6 @@ /* * This file is part of GStreamer Daemon - * Copyright 2015-2022 Ridgerun, LLC (http://www.ridgerun.com) + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are From aa3275c1612726bd5b9d9bbb381ea98b9637ef77 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 09:14:30 -0600 Subject: [PATCH 05/30] use Atomic bool as stop flag for thread --- .../libgstc/rust/dynamic_property_change_rust.rs | 10 ++++++---- examples/libgstc/rust/gapless_playback_rust.rs | 12 +++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs index 23abe2a5..15f7f7a1 100644 --- a/examples/libgstc/rust/dynamic_property_change_rust.rs +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -20,7 +20,8 @@ use gstc_rust::{Client, Status}; use std::io; -use std::sync::mpsc; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; @@ -34,11 +35,12 @@ fn main() -> Result<(), Status> { println!("Pipeline set to playing!"); println!("Press enter to stop the pipeline..."); - let (tx, rx) = mpsc::channel::<()>(); + let stop_flag = Arc::new(AtomicBool::new(false)); + let thread_stop_flag = Arc::clone(&stop_flag); thread::spawn(move || { let mut line = String::new(); let _ = io::stdin().read_line(&mut line); - let _ = tx.send(()); + thread_stop_flag.store(true, Ordering::Relaxed); }); let mut format = 0; @@ -46,7 +48,7 @@ fn main() -> Result<(), Status> { client.element_set("pipe", "vts", "pattern", &format.to_string())?; format = (format + 1) % 10; - if rx.try_recv().is_ok() { + if stop_flag.load(Ordering::Relaxed) { break; } diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index 3473d0e4..c89db333 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -22,7 +22,8 @@ use gstc_rust::{Client, Status}; use std::env; use std::io; use std::path::PathBuf; -use std::sync::mpsc; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; fn main() -> Result<(), Status> { @@ -36,7 +37,7 @@ fn main() -> Result<(), Status> { let abs_path = PathBuf::from(&video) .canonicalize() - .unwrap_or_else(|_| PathBuf::from(video)); + .unwrap_or_else(|_| PathBuf::from(&video)); let uri = format!("file://{}", abs_path.display()); let client = Client::new("127.0.0.1", 5000, -1, true)?; @@ -48,14 +49,15 @@ fn main() -> Result<(), Status> { println!("Pipeline set to playing!"); println!("Press enter to stop the pipeline..."); - let (tx, rx) = mpsc::channel::<()>(); + let stop_flag = Arc::new(AtomicBool::new(false)); + let thread_stop_flag = Arc::clone(&stop_flag); thread::spawn(move || { let mut line = String::new(); let _ = io::stdin().read_line(&mut line); - let _ = tx.send(()); + thread_stop_flag.store(true, Ordering::Relaxed); }); - while rx.try_recv().is_err() { + while !stop_flag.load(Ordering::Relaxed) { let message = client.pipeline_bus_wait("pipe", "eos", -1)?; if message.status != Status::OK { eprintln!("Unable to read from bus: {}", message.status.0); From 0c57b977d296a1fc1c32e705728a359ce48fb7bd Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 10:48:59 -0600 Subject: [PATCH 06/30] Use pattern matching instead of if-else --- examples/libgstc/rust/mp4_recording_rust.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index 0c51cbf1..fd072179 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -44,17 +44,19 @@ fn main() -> Result<(), Status> { print!("Waiting for EOS... "); let bus_message = client.pipeline_bus_wait("pipe", "eos", 10_000_000_000)?; - if bus_message.status == Status::OK { - println!("received!"); - } else if bus_message.status == Status::BUS_TIMEOUT { - println!("timeout!"); - eprintln!("EOS not received, file may be unreadable"); - } else { - println!("error!"); - eprintln!( + match bus_message.status { + Status::OK => println!("received!"), + Status::BUS_TIMEOUT => { + println!("timeout!"); + eprintln!("EOS not received, file may be unreadable"); + }, + _ => { + println!("error!"); + eprintln!( "An error occurred waiting for EOS: {}", bus_message.status.0 - ); + ); + }, } client.pipeline_stop("pipe")?; From c7ad4046d497f631e8720fa1320731479e65e027 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 11:10:12 -0600 Subject: [PATCH 07/30] Rename examples and add missing meson.build files --- .gitignore | 1 + examples/libgstc/rust/meson.build | 25 +++++++++++++++++++ ...ovideosink_rust.rs => wait_on_bus_rust.rs} | 0 libgstc/rust/gstc/Cargo.toml | 4 +-- libgstc/rust/gstc/meson.build | 18 +++++++++++++ 5 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 examples/libgstc/rust/meson.build rename examples/libgstc/rust/{videotest_autovideosink_rust.rs => wait_on_bus_rust.rs} (100%) create mode 100644 libgstc/rust/gstc/meson.build diff --git a/.gitignore b/.gitignore index b62eca13..9f52f762 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,7 @@ autoregen.sh # debian files *.deb *.build +!**/meson.build *.buildinfo *.changes debian/.debhelper/ diff --git a/examples/libgstc/rust/meson.build b/examples/libgstc/rust/meson.build new file mode 100644 index 00000000..5c9162dd --- /dev/null +++ b/examples/libgstc/rust/meson.build @@ -0,0 +1,25 @@ +if is_variable('lib_gstc_rust_dep') + rustc = find_program('rustc') + rust_target_libdir = run_command(rustc, '--print', 'target-libdir', check : true).stdout().strip() + + rust_examples = [ + ['simple_pipeline', 'simple_pipeline_rust.rs'], + ['dynamic_property_change', 'dynamic_property_change_rust.rs'], + ['gapless_playback', 'gapless_playback_rust.rs'], + ['mp4_recording', 'mp4_recording_rust.rs'], + ['pipeline_lifecycle', 'pipeline_lifecycle_rust.rs'], + ['wait_on_bus', 'wait_on_bus_rust.rs'], + ] + + foreach ex : rust_examples + executable(ex[0], ex[1], + rust_args : ['--edition=2021'], + dependencies : [lib_gstc_rust_dep], + build_rpath : rust_target_libdir, + install_rpath : rust_target_libdir, + install : false, + ) + endforeach +else + message('Rust libgstc dependency not found; skipping Rust examples') +endif diff --git a/examples/libgstc/rust/videotest_autovideosink_rust.rs b/examples/libgstc/rust/wait_on_bus_rust.rs similarity index 100% rename from examples/libgstc/rust/videotest_autovideosink_rust.rs rename to examples/libgstc/rust/wait_on_bus_rust.rs diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml index cf4b8e60..1f12dc72 100644 --- a/libgstc/rust/gstc/Cargo.toml +++ b/libgstc/rust/gstc/Cargo.toml @@ -12,8 +12,8 @@ name = "pipeline_lifecycle" path = "../../../examples/libgstc/rust/pipeline_lifecycle_rust.rs" [[example]] -name = "videotest_autovideosink" -path = "../../../examples/libgstc/rust/videotest_autovideosink_rust.rs" +name = "wait_on_bus" +path = "../../../examples/libgstc/rust/wait_on_bus_rust.rs" [[example]] name = "simple_pipeline" diff --git a/libgstc/rust/gstc/meson.build b/libgstc/rust/gstc/meson.build new file mode 100644 index 00000000..b22ad8f2 --- /dev/null +++ b/libgstc/rust/gstc/meson.build @@ -0,0 +1,18 @@ +gstc_rust_sources = [ + 'src/lib.rs', +] + +gstc_rust_lib = static_library('gstc_rust', + gstc_rust_sources, + rust_crate_type : 'rlib', + rust_args : ['--edition=2021'], + install : true, + install_dir : lib_install_dir, +) + +lib_gstc_rust_dep = declare_dependency( + link_with : gstc_rust_lib, +) + +# Generate pkgconfig file +pkgconfig.generate(gstc_rust_lib, description : 'GStreamer Rust Client library to control Gstd') From 09fb394b920ccd0647f5653089f75f439b74b594 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 12:58:40 -0600 Subject: [PATCH 08/30] Keep ConnectionSettings in transport struct The ConnectionSettings are accesible through some helper methods in client.rs. --- libgstc/rust/gstc/src/client.rs | 40 +++++++++++++++++------------- libgstc/rust/gstc/src/transport.rs | 8 ++++++ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 1c65482a..59e14890 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -38,7 +38,6 @@ use std::thread; use std::thread::JoinHandle; pub struct Client { - settings: ConnectionSettings, transport: Mutex, } @@ -62,10 +61,9 @@ impl Client { keep_connection_open, }; - let transport = Transport::new(settings.clone())?; + let transport = Transport::new(settings)?; Ok(Self { - settings, transport: Mutex::new(transport), }) } @@ -130,14 +128,14 @@ impl Client { pub fn pipeline_get_graph(&self, pipeline_name: &str) -> Result { self.cmd_read( &format!("/pipelines/{}/graph", pipeline_name), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, ) } pub fn pipeline_get_state(&self, pipeline_name: &str) -> Result { let response = self.cmd_read( &format!("/pipelines/{}/state", pipeline_name), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; json_child_string(&response, "response", "value") @@ -161,7 +159,7 @@ impl Client { "/pipelines/{}/elements/{}/properties/{}", pipeline_name, element, property ), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; json_child_string(&response, "response", "value") @@ -195,7 +193,7 @@ impl Client { "/pipelines/{}/elements/{}/properties", pipeline_name, element ), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; json_child_char_array(&response, "response", "nodes", "name") @@ -243,14 +241,14 @@ impl Client { pub fn pipeline_list_elements(&self, pipeline_name: &str) -> Result, Status> { let response = self.cmd_read( &format!("/pipelines/{}/elements/", pipeline_name), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; json_child_char_array(&response, "response", "nodes", "name") } pub fn pipeline_list(&self) -> Result, Status> { - let response = self.cmd_read("/pipelines", self.settings.wait_time_ms)?; + let response = self.cmd_read("/pipelines", self.default_wait_time_ms()?)?; json_child_char_array(&response, "response", "nodes", "name") } @@ -302,10 +300,8 @@ impl Client { &format!("{}", timeout_ns), )?; - let settings = ConnectionSettings { - keep_connection_open: false, - ..self.settings.clone() - }; + let mut settings = self.transport_settings()?; + settings.keep_connection_open = false; let handle = thread::Builder::new() .name("gstc-bus-wait".to_string()) @@ -356,7 +352,7 @@ impl Client { ) -> Result, Status> { let response = self.cmd_read( &format!("/pipelines/{}/elements/{}/signals", pipeline_name, element), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; json_child_char_array(&response, "response", "nodes", "name") @@ -382,7 +378,7 @@ impl Client { "/pipelines/{}/elements/{}/signals/{}/callback", pipeline_name, element, signal ), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, ) } @@ -397,13 +393,13 @@ impl Client { "/pipelines/{}/elements/{}/signals/{}/disconnect", pipeline_name, element, signal ), - self.settings.wait_time_ms, + self.default_wait_time_ms()?, )?; Ok(()) } fn cmd_send(&self, request: &str) -> Result<(), Status> { - let response = self.send_request(request, self.settings.wait_time_ms)?; + let response = self.send_request(request, self.default_wait_time_ms()?)?; let code = json_get_int(&response, "code")?; let status = Status(code); @@ -446,6 +442,16 @@ impl Client { let mut guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; guard.send_command(request, timeout_ms) } + + fn default_wait_time_ms(&self) -> Result { + let guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + Ok(guard.wait_time_ms()) + } + + fn transport_settings(&self) -> Result { + let guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + Ok(guard.clone_settings()) + } } fn bool_str(value: bool) -> &'static str { diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs index 49a307f2..1d7713fe 100644 --- a/libgstc/rust/gstc/src/transport.rs +++ b/libgstc/rust/gstc/src/transport.rs @@ -65,6 +65,14 @@ impl Transport { Ok(transport) } + pub(crate) fn wait_time_ms(&self) -> i32 { + self.settings.wait_time_ms + } + + pub(crate) fn clone_settings(&self) -> ConnectionSettings { + self.settings.clone() + } + fn open_socket(&self) -> Result { TcpStream::connect((self.settings.address.as_str(), self.settings.port)) .map_err(|_| Status::UNREACHABLE) From f5ec760534f34dbd348854f6e917038fe0022551 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 13:33:53 -0600 Subject: [PATCH 09/30] Fix typo --- libgstc/rust/gstc/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 59e14890..6c128618 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -92,7 +92,7 @@ impl Client { pipeline_desc: &str, ) -> Result<(), Status> { self.cmd_send(&format!( - "pipeline_crete_ref {} {}", + "pipeline_create_ref {} {}", pipeline_name, pipeline_desc )) } From 2fac0a55f33ade38e58e463abc35e34436e54582 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 14:42:27 -0600 Subject: [PATCH 10/30] Replace Status with an enum --- .../libgstc/rust/gapless_playback_rust.rs | 2 +- examples/libgstc/rust/mp4_recording_rust.rs | 6 +- libgstc/rust/gstc/src/client.rs | 4 +- libgstc/rust/gstc/src/status.rs | 81 ++++++++++++++----- 4 files changed, 68 insertions(+), 25 deletions(-) diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index c89db333..ac1339df 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -60,7 +60,7 @@ fn main() -> Result<(), Status> { while !stop_flag.load(Ordering::Relaxed) { let message = client.pipeline_bus_wait("pipe", "eos", -1)?; if message.status != Status::OK { - eprintln!("Unable to read from bus: {}", message.status.0); + eprintln!("Unable to read from bus: {}", message.status.code()); break; } diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index fd072179..12189c23 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -52,9 +52,9 @@ fn main() -> Result<(), Status> { }, _ => { println!("error!"); - eprintln!( - "An error occurred waiting for EOS: {}", - bus_message.status.0 + eprintln!( + "An error occurred waiting for EOS: {}", + bus_message.status.code() ); }, } diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 6c128618..fe4f03bf 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -401,7 +401,7 @@ impl Client { fn cmd_send(&self, request: &str) -> Result<(), Status> { let response = self.send_request(request, self.default_wait_time_ms()?)?; let code = json_get_int(&response, "code")?; - let status = Status(code); + let status = Status::from_code(code); if status.is_ok() { Ok(()) @@ -413,7 +413,7 @@ impl Client { fn cmd_send_get_response(&self, request: &str, timeout_ms: i32) -> Result { let response = self.send_request(request, timeout_ms)?; let code = json_get_int(&response, "code")?; - let status = Status(code); + let status = Status::from_code(code); if status.is_ok() { Ok(response) diff --git a/libgstc/rust/gstc/src/status.rs b/libgstc/rust/gstc/src/status.rs index 3c2b89b4..fb84132f 100644 --- a/libgstc/rust/gstc/src/status.rs +++ b/libgstc/rust/gstc/src/status.rs @@ -33,27 +33,70 @@ use std::fmt; #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct Status(pub i32); +pub enum Status { + OK, + NULL_ARGUMENT, + UNREACHABLE, + TIMEOUT, + OOM, + TYPE_ERROR, + MALFORMED, + NOT_FOUND, + SEND_ERROR, + RECV_ERROR, + SOCKET_ERROR, + THREAD_ERROR, + BUS_TIMEOUT, + SOCKET_TIMEOUT, + LONG_RESPONSE, + UNKNOWN(i32), +} impl Status { - pub const OK: Status = Status(0); - pub const NULL_ARGUMENT: Status = Status(-1); - pub const UNREACHABLE: Status = Status(-2); - pub const TIMEOUT: Status = Status(-3); - pub const OOM: Status = Status(-4); - pub const TYPE_ERROR: Status = Status(-5); - pub const MALFORMED: Status = Status(-6); - pub const NOT_FOUND: Status = Status(-7); - pub const SEND_ERROR: Status = Status(-8); - pub const RECV_ERROR: Status = Status(-9); - pub const SOCKET_ERROR: Status = Status(-10); - pub const THREAD_ERROR: Status = Status(-11); - pub const BUS_TIMEOUT: Status = Status(-12); - pub const SOCKET_TIMEOUT: Status = Status(-13); - pub const LONG_RESPONSE: Status = Status(-14); + pub fn from_code(code: i32) -> Status { + match code { + 0 => Status::OK, + -1 => Status::NULL_ARGUMENT, + -2 => Status::UNREACHABLE, + -3 => Status::TIMEOUT, + -4 => Status::OOM, + -5 => Status::TYPE_ERROR, + -6 => Status::MALFORMED, + -7 => Status::NOT_FOUND, + -8 => Status::SEND_ERROR, + -9 => Status::RECV_ERROR, + -10 => Status::SOCKET_ERROR, + -11 => Status::THREAD_ERROR, + -12 => Status::BUS_TIMEOUT, + -13 => Status::SOCKET_TIMEOUT, + -14 => Status::LONG_RESPONSE, + other => Status::UNKNOWN(other), + } + } + + pub fn code(self) -> i32 { + match self { + Status::OK => 0, + Status::NULL_ARGUMENT => -1, + Status::UNREACHABLE => -2, + Status::TIMEOUT => -3, + Status::OOM => -4, + Status::TYPE_ERROR => -5, + Status::MALFORMED => -6, + Status::NOT_FOUND => -7, + Status::SEND_ERROR => -8, + Status::RECV_ERROR => -9, + Status::SOCKET_ERROR => -10, + Status::THREAD_ERROR => -11, + Status::BUS_TIMEOUT => -12, + Status::SOCKET_TIMEOUT => -13, + Status::LONG_RESPONSE => -14, + Status::UNKNOWN(code) => code, + } + } pub fn is_ok(self) -> bool { - self == Status::OK + matches!(self, Status::OK) } } @@ -75,10 +118,10 @@ impl fmt::Display for Status { Status::BUS_TIMEOUT => "GSTC_BUS_TIMEOUT", Status::SOCKET_TIMEOUT => "GSTC_SOCKET_TIMEOUT", Status::LONG_RESPONSE => "GSTC_LONG_RESPONSE", - _ => "GSTC_UNKNOWN", + Status::UNKNOWN(_) => "GSTC_UNKNOWN", }; - write!(f, "{} ({})", name, self.0) + write!(f, "{} ({})", name, self.code()) } } From 0ae81b78d1b80e48ae60a98bdf26680cac9000e7 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 14:49:55 -0600 Subject: [PATCH 11/30] Remove unnecessary socket reconnection attempt --- libgstc/rust/gstc/src/transport.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs index 1d7713fe..38a322ee 100644 --- a/libgstc/rust/gstc/src/transport.rs +++ b/libgstc/rust/gstc/src/transport.rs @@ -88,11 +88,6 @@ impl Transport { } if self.settings.keep_connection_open { - if self.stream.is_none() { - let stream = self.open_socket()?; - self.stream = Some(stream); - } - if let Some(stream) = self.stream.as_mut() { Self::write_then_read(stream, request, timeout_ms) } else { From 9c781869d8bf3946f925a599cb590c269b25c51b Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 15:01:44 -0600 Subject: [PATCH 12/30] Refactor timeout calculation --- libgstc/rust/gstc/src/transport.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs index 38a322ee..ec12dfd3 100644 --- a/libgstc/rust/gstc/src/transport.rs +++ b/libgstc/rust/gstc/src/transport.rs @@ -106,15 +106,15 @@ impl Transport { request: &str, timeout_ms: i32, ) -> Result { - if timeout_ms < 0 { - stream - .set_read_timeout(None) - .map_err(|_| Status::SOCKET_ERROR)?; + let timeout = if timeout_ms < 0 { + None } else { - stream - .set_read_timeout(Some(Duration::from_millis(timeout_ms as u64))) - .map_err(|_| Status::SOCKET_ERROR)?; - } + Some(Duration::from_millis(timeout_ms as u64)) + }; + + stream + .set_read_timeout(timeout) + .map_err(|_| Status::SOCKET_ERROR)?; stream .write_all(request.as_bytes()) From 9586059322d86b9047c731e6805fe5dd663826d0 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 15:35:14 -0600 Subject: [PATCH 13/30] Reformat Rust library building in Meson --- libgstc/meson.build | 4 +++- libgstc/rust/gstc/meson.build | 18 ------------------ libgstc/rust/meson.build | 23 +++++++++++++++++++++-- meson_options.txt | 1 + 4 files changed, 25 insertions(+), 21 deletions(-) delete mode 100644 libgstc/rust/gstc/meson.build diff --git a/libgstc/meson.build b/libgstc/meson.build index 228c09de..06f5a163 100644 --- a/libgstc/meson.build +++ b/libgstc/meson.build @@ -1,5 +1,7 @@ subdir('c') -subdir('rust') +if not get_option('enable-rust').disabled() + subdir('rust') +endif if not get_option('enable-python').disabled() subdir('python') endif diff --git a/libgstc/rust/gstc/meson.build b/libgstc/rust/gstc/meson.build deleted file mode 100644 index b22ad8f2..00000000 --- a/libgstc/rust/gstc/meson.build +++ /dev/null @@ -1,18 +0,0 @@ -gstc_rust_sources = [ - 'src/lib.rs', -] - -gstc_rust_lib = static_library('gstc_rust', - gstc_rust_sources, - rust_crate_type : 'rlib', - rust_args : ['--edition=2021'], - install : true, - install_dir : lib_install_dir, -) - -lib_gstc_rust_dep = declare_dependency( - link_with : gstc_rust_lib, -) - -# Generate pkgconfig file -pkgconfig.generate(gstc_rust_lib, description : 'GStreamer Rust Client library to control Gstd') diff --git a/libgstc/rust/meson.build b/libgstc/rust/meson.build index c3544717..71839ba4 100644 --- a/libgstc/rust/meson.build +++ b/libgstc/rust/meson.build @@ -1,7 +1,26 @@ -rust_enabled = add_languages('rust', required : false) +rust_enabled = add_languages('rust', required : get_option('enable-rust')) if rust_enabled - subdir('gstc') + gstc_rust_sources = [ + 'gstc/src/lib.rs', + ] + + gstc_rust_lib = static_library('gstc_rust', + gstc_rust_sources, + rust_crate_type : 'rlib', + rust_args : ['--edition=2021'], + install : true, + install_dir : lib_install_dir, + ) + + lib_gstc_rust_dep = declare_dependency( + link_with : gstc_rust_lib, + ) + + pkgconfig.generate( + gstc_rust_lib, + description : 'GStreamer Rust Client library to control Gstd' + ) else message('Rust compiler not found; skipping libgstc Rust API') endif diff --git a/meson_options.txt b/meson_options.txt index bfa1f6af..1c017f0a 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -5,6 +5,7 @@ option('enable-gtk-doc', type : 'boolean', value : false, description : 'Use gtk option('enable-systemd', type : 'feature', value : 'auto', description : 'Enable systemd gstd.service install') option('enable-initd', type : 'feature', value : 'disabled', description : 'Enable init script install') option('enable-python', type : 'feature', value : 'auto', description : 'Install the pygstc library') +option('enable-rust', type : 'feature', value : 'auto', description : 'Build the Rust gstc library') # String options option('with-gstd-runstatedir', type : 'string', value : '/var/run/gstd', description : 'Specify the location of the gstd\'s PID file') From 157f0667b967374b29bf4065eea271c7847c624c Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 15:37:04 -0600 Subject: [PATCH 14/30] Use CamelCase in Rust Status enum values --- .../libgstc/rust/gapless_playback_rust.rs | 4 +- examples/libgstc/rust/mp4_recording_rust.rs | 4 +- .../libgstc/rust/pipeline_lifecycle_rust.rs | 2 +- examples/libgstc/rust/wait_on_bus_rust.rs | 2 +- libgstc/rust/gstc/src/client.rs | 12 +- libgstc/rust/gstc/src/json.rs | 26 ++-- libgstc/rust/gstc/src/status.rs | 130 +++++++++--------- libgstc/rust/gstc/src/transport.rs | 24 ++-- 8 files changed, 102 insertions(+), 102 deletions(-) diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index ac1339df..807ec58b 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -31,7 +31,7 @@ fn main() -> Result<(), Status> { Some(path) => path, None => { eprintln!("Please provide a video to play"); - return Err(Status::NULL_ARGUMENT); + return Err(Status::NullArgument); } }; @@ -59,7 +59,7 @@ fn main() -> Result<(), Status> { while !stop_flag.load(Ordering::Relaxed) { let message = client.pipeline_bus_wait("pipe", "eos", -1)?; - if message.status != Status::OK { + if message.status != Status::Ok { eprintln!("Unable to read from bus: {}", message.status.code()); break; } diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index 12189c23..5968bce5 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -45,8 +45,8 @@ fn main() -> Result<(), Status> { print!("Waiting for EOS... "); let bus_message = client.pipeline_bus_wait("pipe", "eos", 10_000_000_000)?; match bus_message.status { - Status::OK => println!("received!"), - Status::BUS_TIMEOUT => { + Status::Ok => println!("received!"), + Status::BusTimeout => { println!("timeout!"); eprintln!("EOS not received, file may be unreadable"); }, diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs index bf26bb5e..f5f31d89 100644 --- a/examples/libgstc/rust/pipeline_lifecycle_rust.rs +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -36,7 +36,7 @@ fn wait_for_state( } if start.elapsed() >= timeout { - return Err(Status::TIMEOUT); + return Err(Status::Timeout); } thread::sleep(Duration::from_millis(100)); diff --git a/examples/libgstc/rust/wait_on_bus_rust.rs b/examples/libgstc/rust/wait_on_bus_rust.rs index 71ac50b6..f004225f 100644 --- a/examples/libgstc/rust/wait_on_bus_rust.rs +++ b/examples/libgstc/rust/wait_on_bus_rust.rs @@ -30,7 +30,7 @@ fn main() -> Result<(), Status> { client.pipeline_play(pipeline_name)?; let bus_message = client.pipeline_bus_wait(pipeline_name, "eos", -1)?; - if bus_message.status != Status::OK { + if bus_message.status != Status::Ok { let _ = client.pipeline_stop(pipeline_name); let _ = client.pipeline_delete(pipeline_name); return Err(bus_message.status); diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index fe4f03bf..0fb8fb68 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -270,8 +270,8 @@ impl Client { let raw = self.cmd_read(&format!("/pipelines/{}/bus/message", pipeline_name), -1)?; let status = match json_is_null_field(&raw, "response") { - Ok(is_null) if is_null => Status::BUS_TIMEOUT, - Ok(_) => Status::OK, + Ok(is_null) if is_null => Status::BusTimeout, + Ok(_) => Status::Ok, Err(err) => err, }; @@ -325,7 +325,7 @@ impl Client { Err(err) => err, } }) - .map_err(|_| Status::THREAD_ERROR)?; + .map_err(|_| Status::ThreadError)?; Ok(handle) } @@ -439,17 +439,17 @@ impl Client { } fn send_request(&self, request: &str, timeout_ms: i32) -> Result { - let mut guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + let mut guard = self.transport.lock().map_err(|_| Status::SocketError)?; guard.send_command(request, timeout_ms) } fn default_wait_time_ms(&self) -> Result { - let guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + let guard = self.transport.lock().map_err(|_| Status::SocketError)?; Ok(guard.wait_time_ms()) } fn transport_settings(&self) -> Result { - let guard = self.transport.lock().map_err(|_| Status::SOCKET_ERROR)?; + let guard = self.transport.lock().map_err(|_| Status::SocketError)?; Ok(guard.clone_settings()) } } diff --git a/libgstc/rust/gstc/src/json.rs b/libgstc/rust/gstc/src/json.rs index 660ff4fe..05cd71c4 100644 --- a/libgstc/rust/gstc/src/json.rs +++ b/libgstc/rust/gstc/src/json.rs @@ -33,12 +33,12 @@ use crate::Status; pub(crate) fn json_get_int(json: &str, name: &str) -> Result { - let start = find_key_value_start(json, name).ok_or(Status::NOT_FOUND)?; + let start = find_key_value_start(json, name).ok_or(Status::NotFound)?; parse_json_int(json, start) } pub(crate) fn json_is_null_field(json: &str, name: &str) -> Result { - let start = find_key_value_start(json, name).ok_or(Status::NOT_FOUND)?; + let start = find_key_value_start(json, name).ok_or(Status::NotFound)?; let start = skip_ws(json, start); Ok(json[start..].starts_with("null")) } @@ -70,7 +70,7 @@ pub(crate) fn json_child_char_array( } if array.as_bytes()[cursor_ws] != b'{' { - return Err(Status::TYPE_ERROR); + return Err(Status::TypeError); } let (obj, next_idx) = extract_balanced(array, cursor_ws, b'{', b'}')?; @@ -110,7 +110,7 @@ fn find_key_value_start(json: &str, key: &str) -> Option { fn parse_json_int(json: &str, start: usize) -> Result { let start = skip_ws(json, start); if start >= json.len() { - return Err(Status::MALFORMED); + return Err(Status::Malformed); } let bytes = json.as_bytes(); @@ -123,12 +123,12 @@ fn parse_json_int(json: &str, start: usize) -> Result { } if end == start || (end == start + 1 && bytes[start] == b'-') { - return Err(Status::TYPE_ERROR); + return Err(Status::TypeError); } json[start..end] .parse::() - .map_err(|_| Status::TYPE_ERROR) + .map_err(|_| Status::TypeError) } fn extract_balanced( @@ -139,7 +139,7 @@ fn extract_balanced( ) -> Result<(&str, usize), Status> { let bytes = json.as_bytes(); if start >= bytes.len() || bytes[start] != open { - return Err(Status::TYPE_ERROR); + return Err(Status::TypeError); } let mut depth = 0i32; @@ -169,28 +169,28 @@ fn extract_balanced( i += 1; } - Err(Status::MALFORMED) + Err(Status::Malformed) } fn extract_object_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { - let start = find_key_value_start(json, key).ok_or(Status::NOT_FOUND)?; + let start = find_key_value_start(json, key).ok_or(Status::NotFound)?; let start = skip_ws(json, start); let (obj, _) = extract_balanced(json, start, b'{', b'}')?; Ok(obj) } fn extract_array_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { - let start = find_key_value_start(json, key).ok_or(Status::TYPE_ERROR)?; + let start = find_key_value_start(json, key).ok_or(Status::TypeError)?; let start = skip_ws(json, start); let (arr, _) = extract_balanced(json, start, b'[', b']')?; Ok(arr) } fn extract_string_for_key(json: &str, key: &str) -> Result { - let start = find_key_value_start(json, key).ok_or(Status::NOT_FOUND)?; + let start = find_key_value_start(json, key).ok_or(Status::NotFound)?; let start = skip_ws(json, start); if start >= json.len() || json.as_bytes()[start] != b'"' { - return Err(Status::TYPE_ERROR); + return Err(Status::TypeError); } let mut i = start + 1; @@ -207,5 +207,5 @@ fn extract_string_for_key(json: &str, key: &str) -> Result { i += 1; } - Err(Status::MALFORMED) + Err(Status::Malformed) } diff --git a/libgstc/rust/gstc/src/status.rs b/libgstc/rust/gstc/src/status.rs index fb84132f..59299d21 100644 --- a/libgstc/rust/gstc/src/status.rs +++ b/libgstc/rust/gstc/src/status.rs @@ -34,91 +34,91 @@ use std::fmt; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Status { - OK, - NULL_ARGUMENT, - UNREACHABLE, - TIMEOUT, - OOM, - TYPE_ERROR, - MALFORMED, - NOT_FOUND, - SEND_ERROR, - RECV_ERROR, - SOCKET_ERROR, - THREAD_ERROR, - BUS_TIMEOUT, - SOCKET_TIMEOUT, - LONG_RESPONSE, - UNKNOWN(i32), + Ok, + NullArgument, + Unreachable, + Timeout, + Oom, + TypeError, + Malformed, + NotFound, + SendError, + RecvError, + SocketError, + ThreadError, + BusTimeout, + SocketTimeout, + LongResponse, + Unknown(i32), } impl Status { pub fn from_code(code: i32) -> Status { match code { - 0 => Status::OK, - -1 => Status::NULL_ARGUMENT, - -2 => Status::UNREACHABLE, - -3 => Status::TIMEOUT, - -4 => Status::OOM, - -5 => Status::TYPE_ERROR, - -6 => Status::MALFORMED, - -7 => Status::NOT_FOUND, - -8 => Status::SEND_ERROR, - -9 => Status::RECV_ERROR, - -10 => Status::SOCKET_ERROR, - -11 => Status::THREAD_ERROR, - -12 => Status::BUS_TIMEOUT, - -13 => Status::SOCKET_TIMEOUT, - -14 => Status::LONG_RESPONSE, - other => Status::UNKNOWN(other), + 0 => Status::Ok, + -1 => Status::NullArgument, + -2 => Status::Unreachable, + -3 => Status::Timeout, + -4 => Status::Oom, + -5 => Status::TypeError, + -6 => Status::Malformed, + -7 => Status::NotFound, + -8 => Status::SendError, + -9 => Status::RecvError, + -10 => Status::SocketError, + -11 => Status::ThreadError, + -12 => Status::BusTimeout, + -13 => Status::SocketTimeout, + -14 => Status::LongResponse, + other => Status::Unknown(other), } } pub fn code(self) -> i32 { match self { - Status::OK => 0, - Status::NULL_ARGUMENT => -1, - Status::UNREACHABLE => -2, - Status::TIMEOUT => -3, - Status::OOM => -4, - Status::TYPE_ERROR => -5, - Status::MALFORMED => -6, - Status::NOT_FOUND => -7, - Status::SEND_ERROR => -8, - Status::RECV_ERROR => -9, - Status::SOCKET_ERROR => -10, - Status::THREAD_ERROR => -11, - Status::BUS_TIMEOUT => -12, - Status::SOCKET_TIMEOUT => -13, - Status::LONG_RESPONSE => -14, - Status::UNKNOWN(code) => code, + Status::Ok => 0, + Status::NullArgument => -1, + Status::Unreachable => -2, + Status::Timeout => -3, + Status::Oom => -4, + Status::TypeError => -5, + Status::Malformed => -6, + Status::NotFound => -7, + Status::SendError => -8, + Status::RecvError => -9, + Status::SocketError => -10, + Status::ThreadError => -11, + Status::BusTimeout => -12, + Status::SocketTimeout => -13, + Status::LongResponse => -14, + Status::Unknown(code) => code, } } pub fn is_ok(self) -> bool { - matches!(self, Status::OK) + matches!(self, Status::Ok) } } impl fmt::Display for Status { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let name = match *self { - Status::OK => "GSTC_OK", - Status::NULL_ARGUMENT => "GSTC_NULL_ARGUMENT", - Status::UNREACHABLE => "GSTC_UNREACHABLE", - Status::TIMEOUT => "GSTC_TIMEOUT", - Status::OOM => "GSTC_OOM", - Status::TYPE_ERROR => "GSTC_TYPE_ERROR", - Status::MALFORMED => "GSTC_MALFORMED", - Status::NOT_FOUND => "GSTC_NOT_FOUND", - Status::SEND_ERROR => "GSTC_SEND_ERROR", - Status::RECV_ERROR => "GSTC_RECV_ERROR", - Status::SOCKET_ERROR => "GSTC_SOCKET_ERROR", - Status::THREAD_ERROR => "GSTC_THREAD_ERROR", - Status::BUS_TIMEOUT => "GSTC_BUS_TIMEOUT", - Status::SOCKET_TIMEOUT => "GSTC_SOCKET_TIMEOUT", - Status::LONG_RESPONSE => "GSTC_LONG_RESPONSE", - Status::UNKNOWN(_) => "GSTC_UNKNOWN", + Status::Ok => "GSTC_OK", + Status::NullArgument => "GSTC_NULL_ARGUMENT", + Status::Unreachable => "GSTC_UNREACHABLE", + Status::Timeout => "GSTC_TIMEOUT", + Status::Oom => "GSTC_OOM", + Status::TypeError => "GSTC_TYPE_ERROR", + Status::Malformed => "GSTC_MALFORMED", + Status::NotFound => "GSTC_NOT_FOUND", + Status::SendError => "GSTC_SEND_ERROR", + Status::RecvError => "GSTC_RECV_ERROR", + Status::SocketError => "GSTC_SOCKET_ERROR", + Status::ThreadError => "GSTC_THREAD_ERROR", + Status::BusTimeout => "GSTC_BUS_TIMEOUT", + Status::SocketTimeout => "GSTC_SOCKET_TIMEOUT", + Status::LongResponse => "GSTC_LONG_RESPONSE", + Status::Unknown(_) => "GSTC_UNKNOWN", }; write!(f, "{} ({})", name, self.code()) diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs index ec12dfd3..ff11d9ec 100644 --- a/libgstc/rust/gstc/src/transport.rs +++ b/libgstc/rust/gstc/src/transport.rs @@ -75,7 +75,7 @@ impl Transport { fn open_socket(&self) -> Result { TcpStream::connect((self.settings.address.as_str(), self.settings.port)) - .map_err(|_| Status::UNREACHABLE) + .map_err(|_| Status::Unreachable) } pub(crate) fn send_command( @@ -84,14 +84,14 @@ impl Transport { timeout_ms: i32, ) -> Result { if request.is_empty() { - return Err(Status::NULL_ARGUMENT); + return Err(Status::NullArgument); } if self.settings.keep_connection_open { if let Some(stream) = self.stream.as_mut() { Self::write_then_read(stream, request, timeout_ms) } else { - Err(Status::SOCKET_ERROR) + Err(Status::SocketError) } } else { let mut stream = self.open_socket()?; @@ -114,26 +114,26 @@ impl Transport { stream .set_read_timeout(timeout) - .map_err(|_| Status::SOCKET_ERROR)?; + .map_err(|_| Status::SocketError)?; stream .write_all(request.as_bytes()) - .map_err(|_| Status::SEND_ERROR)?; + .map_err(|_| Status::SendError)?; let mut response = Vec::::new(); let mut buffer = [0_u8; 1024]; loop { let n = match stream.read(&mut buffer) { - Ok(0) => return Err(Status::RECV_ERROR), + Ok(0) => return Err(Status::RecvError), Ok(n) => n, Err(err) if err.kind() == std::io::ErrorKind::TimedOut => { - return Err(Status::SOCKET_TIMEOUT) + return Err(Status::SocketTimeout) } Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { - return Err(Status::SOCKET_TIMEOUT) + return Err(Status::SocketTimeout) } - Err(_) => return Err(Status::RECV_ERROR), + Err(_) => return Err(Status::RecvError), }; if let Some(zero_idx) = buffer[..n].iter().position(|b| *b == 0) { @@ -143,14 +143,14 @@ impl Transport { response.extend_from_slice(&buffer[..n]); if response.len() >= MAX_RESPONSE_LENGTH { - return Err(Status::LONG_RESPONSE); + return Err(Status::LongResponse); } } if response.len() >= MAX_RESPONSE_LENGTH { - return Err(Status::LONG_RESPONSE); + return Err(Status::LongResponse); } - String::from_utf8(response).map_err(|_| Status::MALFORMED) + String::from_utf8(response).map_err(|_| Status::Malformed) } } From 8c26a023fdace31ce71659240d860959c6f9d301 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Tue, 31 Mar 2026 15:45:02 -0600 Subject: [PATCH 15/30] Add README with how to run Rust examples --- examples/libgstc/rust/README.md | 67 +++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 examples/libgstc/rust/README.md diff --git a/examples/libgstc/rust/README.md b/examples/libgstc/rust/README.md new file mode 100644 index 00000000..093d9dad --- /dev/null +++ b/examples/libgstc/rust/README.md @@ -0,0 +1,67 @@ +# Rust `libgstc` Examples + +These examples show how to control GStreamer Daemon (`gstd`) using the Rust `gstc_rust` client library. + +## Prerequisites + +- `gstd` must be running and listening on `127.0.0.1:5000` +- Rust examples assume the daemon is reachable at that address and port +- Some examples require GStreamer plugins such as `autovideosink`, `playbin`, `qtmux`, `avenc_mpeg4`, and `lamemp3enc` + +## How To Run + +You can run the examples with Cargo from the Rust client crate directory: + +```bash +cd libgstc/rust/gstc +cargo run --example simple_pipeline +``` + +Examples that take an argument can be run like this: + +```bash +cd libgstc/rust/gstc +cargo run --example gapless_playback -- /path/to/video.mp4 +``` + +You can also build them with Meson from the repository root: + +```bash +meson setup build +meson compile -C build +./build/examples/libgstc/rust/simple_pipeline +``` + +## Examples + +### `simple_pipeline` + +Creates a `videotestsrc ! autovideosink` pipeline, starts playback, waits for Enter, then stops and deletes the pipeline. + +### `pipeline_lifecycle` + +Creates a `videotestsrc ! fakesink` pipeline, sets it to `PLAYING`, polls until the daemon reports the expected state, then stops and deletes the pipeline. + +### `wait_on_bus` + +Creates a finite pipeline with `videotestsrc num-buffers=300`, waits for an EOS message on the bus, prints the raw bus message, then cleans up the pipeline. + +### `dynamic_property_change` + +Creates a `videotestsrc` pipeline and changes the `pattern` property once per second while the pipeline is running. Press Enter to stop it. + +### `gapless_playback` + +Plays a media file with `playbin`, waits for EOS, then seeks back to the start to continue playback. Press Enter to stop it. + +Run with: + +```bash +cd libgstc/rust/gstc +cargo run --example gapless_playback -- /path/to/video.mp4 +``` + +### `mp4_recording` + +Creates a live audio/video recording pipeline that writes to `mp4_recording.mp4`. When you press Enter, it injects EOS, waits for the EOS bus message, then stops and deletes the pipeline. + From a5494595075d161253d13560e6944f0460914ef2 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 11 May 2026 11:14:08 -0600 Subject: [PATCH 16/30] Add constants for seek function --- examples/libgstc/rust/gapless_playback_rust.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index 807ec58b..d13bb4e8 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -26,6 +26,14 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; +const RATE: f64 = 1.0; +const FORMAT: i32 = 3; +const FLAGS: i32 = 1; +const START_TYPE: i32 = 1; +const START: i64 = 0; +const STOP_TYPE: i32 = 1; +const STOP: i64 = -1; + fn main() -> Result<(), Status> { let video = match env::args().nth(1) { Some(path) => path, @@ -66,7 +74,8 @@ fn main() -> Result<(), Status> { println!("EOS message received!"); - client.pipeline_seek("pipe", 1.0, 3, 1, 1, 0, 1, -1)?; + client.pipeline_seek("pipe", RATE, FORMAT, FLAGS, START_TYPE, + START, STOP_TYPE, STOP)?; println!("Pipeline reset!"); } From 2997bd5d99c6a289df44ef17844935106cc61582 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 15 May 2026 16:54:28 -0600 Subject: [PATCH 17/30] Reformat Status::BusTimeout as error --- examples/libgstc/rust/mp4_recording_rust.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index 5968bce5..e28dbddf 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -43,20 +43,18 @@ fn main() -> Result<(), Status> { println!("EOS sent!"); print!("Waiting for EOS... "); - let bus_message = client.pipeline_bus_wait("pipe", "eos", 10_000_000_000)?; - match bus_message.status { - Status::Ok => println!("received!"), - Status::BusTimeout => { + match client.pipeline_bus_wait("pipe", "eos", 10_000_000_000) { + Ok(_bus_message) => { + println!("received!"); + } + Err(Status::BusTimeout) => { println!("timeout!"); eprintln!("EOS not received, file may be unreadable"); - }, - _ => { + } + Err(status) => { println!("error!"); - eprintln!( - "An error occurred waiting for EOS: {}", - bus_message.status.code() - ); - }, + eprintln!("An error occurred waiting for EOS: {}", status.code()); + } } client.pipeline_stop("pipe")?; From 76a1631e2d9ef149dde3ebd539756f082377d134 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 15 May 2026 16:56:54 -0600 Subject: [PATCH 18/30] Fix return of cmd_update --- libgstc/rust/gstc/src/client.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 0fb8fb68..67fc28c9 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -172,15 +172,13 @@ impl Client { property: &str, value: &str, ) -> Result<(), Status> { - let _ = self.cmd_update( + self.cmd_update( &format!( "/pipelines/{}/elements/{}/properties/{}", pipeline_name, element, property ), value, - ); - - Ok(()) + ) } pub fn element_properties_list( From 6f46532b374126418764d2670ea537ec782170cf Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 15 May 2026 16:58:51 -0600 Subject: [PATCH 19/30] Treat Status::BusTimeout as error --- libgstc/rust/gstc/src/client.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 67fc28c9..e9e9faad 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -267,16 +267,14 @@ impl Client { let raw = self.cmd_read(&format!("/pipelines/{}/bus/message", pipeline_name), -1)?; - let status = match json_is_null_field(&raw, "response") { - Ok(is_null) if is_null => Status::BusTimeout, - Ok(_) => Status::Ok, - Err(err) => err, - }; - - Ok(BusMessage { - status, - raw_response: raw, - }) + match json_is_null_field(&raw, "response") { + Ok(true) => Err(Status::BusTimeout), + Ok(false) => Ok(BusMessage { + status: Status::Ok, + raw_response: raw, + }), + Err(err) => Err(err), + } } pub fn pipeline_bus_wait_async( From c4b4a7ec02b3db9b1012ab8cbdf10efdb0312e45 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 15 May 2026 17:00:20 -0600 Subject: [PATCH 20/30] Remove repeated function calls --- libgstc/rust/gstc/src/client.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index e9e9faad..5d296e9c 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -287,15 +287,6 @@ impl Client { where F: FnOnce(BusMessage) + Send + 'static, { - self.cmd_update( - &format!("/pipelines/{}/bus/types", pipeline_name), - &message_name, - )?; - self.cmd_update( - &format!("/pipelines/{}/bus/timeout", pipeline_name), - &format!("{}", timeout_ns), - )?; - let mut settings = self.transport_settings()?; settings.keep_connection_open = false; From dbb418baab0e9b240ce6f1c98ceee7f53ce8f967 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Fri, 15 May 2026 17:06:03 -0600 Subject: [PATCH 21/30] Rewrite cmd_send and fix return errors Rewrite cmd_send in terms of cmd_send_get_response and fix unnecessary return function. --- libgstc/rust/gstc/src/client.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index 5d296e9c..b31c4b25 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -375,26 +375,19 @@ impl Client { element: &str, signal: &str, ) -> Result<(), Status> { - let _ = self.cmd_read( + self.cmd_read( &format!( "/pipelines/{}/elements/{}/signals/{}/disconnect", pipeline_name, element, signal ), self.default_wait_time_ms()?, - )?; - Ok(()) + ) + .map(|_| ()) } fn cmd_send(&self, request: &str) -> Result<(), Status> { - let response = self.send_request(request, self.default_wait_time_ms()?)?; - let code = json_get_int(&response, "code")?; - let status = Status::from_code(code); - - if status.is_ok() { - Ok(()) - } else { - Err(status) - } + self.cmd_send_get_response(request, self.default_wait_time_ms()?) + .map(|_| ()) } fn cmd_send_get_response(&self, request: &str, timeout_ms: i32) -> Result { From 8d7f461157bf239ba25bcb9b172ac531b343daa7 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 17:47:17 -0600 Subject: [PATCH 22/30] Remove meson build logic for Rust API Remove meson Rust logic because external depencies in Rust are needed and meson support is not working as intended. --- examples/libgstc/meson.build | 2 -- examples/libgstc/rust/meson.build | 25 ------------------------- libgstc/meson.build | 3 --- libgstc/rust/meson.build | 26 -------------------------- meson_options.txt | 1 - 5 files changed, 57 deletions(-) delete mode 100644 examples/libgstc/rust/meson.build delete mode 100644 libgstc/rust/meson.build diff --git a/examples/libgstc/meson.build b/examples/libgstc/meson.build index 3a6d7782..4e79628d 100644 --- a/examples/libgstc/meson.build +++ b/examples/libgstc/meson.build @@ -12,5 +12,3 @@ foreach app : app_examples dependencies : [gst_client_deps,lib_gstc_dep], install: false) endforeach - -subdir('rust') diff --git a/examples/libgstc/rust/meson.build b/examples/libgstc/rust/meson.build deleted file mode 100644 index 5c9162dd..00000000 --- a/examples/libgstc/rust/meson.build +++ /dev/null @@ -1,25 +0,0 @@ -if is_variable('lib_gstc_rust_dep') - rustc = find_program('rustc') - rust_target_libdir = run_command(rustc, '--print', 'target-libdir', check : true).stdout().strip() - - rust_examples = [ - ['simple_pipeline', 'simple_pipeline_rust.rs'], - ['dynamic_property_change', 'dynamic_property_change_rust.rs'], - ['gapless_playback', 'gapless_playback_rust.rs'], - ['mp4_recording', 'mp4_recording_rust.rs'], - ['pipeline_lifecycle', 'pipeline_lifecycle_rust.rs'], - ['wait_on_bus', 'wait_on_bus_rust.rs'], - ] - - foreach ex : rust_examples - executable(ex[0], ex[1], - rust_args : ['--edition=2021'], - dependencies : [lib_gstc_rust_dep], - build_rpath : rust_target_libdir, - install_rpath : rust_target_libdir, - install : false, - ) - endforeach -else - message('Rust libgstc dependency not found; skipping Rust examples') -endif diff --git a/libgstc/meson.build b/libgstc/meson.build index 06f5a163..582cbb8f 100644 --- a/libgstc/meson.build +++ b/libgstc/meson.build @@ -1,7 +1,4 @@ subdir('c') -if not get_option('enable-rust').disabled() - subdir('rust') -endif if not get_option('enable-python').disabled() subdir('python') endif diff --git a/libgstc/rust/meson.build b/libgstc/rust/meson.build deleted file mode 100644 index 71839ba4..00000000 --- a/libgstc/rust/meson.build +++ /dev/null @@ -1,26 +0,0 @@ -rust_enabled = add_languages('rust', required : get_option('enable-rust')) - -if rust_enabled - gstc_rust_sources = [ - 'gstc/src/lib.rs', - ] - - gstc_rust_lib = static_library('gstc_rust', - gstc_rust_sources, - rust_crate_type : 'rlib', - rust_args : ['--edition=2021'], - install : true, - install_dir : lib_install_dir, - ) - - lib_gstc_rust_dep = declare_dependency( - link_with : gstc_rust_lib, - ) - - pkgconfig.generate( - gstc_rust_lib, - description : 'GStreamer Rust Client library to control Gstd' - ) -else - message('Rust compiler not found; skipping libgstc Rust API') -endif diff --git a/meson_options.txt b/meson_options.txt index 1c017f0a..bfa1f6af 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -5,7 +5,6 @@ option('enable-gtk-doc', type : 'boolean', value : false, description : 'Use gtk option('enable-systemd', type : 'feature', value : 'auto', description : 'Enable systemd gstd.service install') option('enable-initd', type : 'feature', value : 'disabled', description : 'Enable init script install') option('enable-python', type : 'feature', value : 'auto', description : 'Install the pygstc library') -option('enable-rust', type : 'feature', value : 'auto', description : 'Build the Rust gstc library') # String options option('with-gstd-runstatedir', type : 'string', value : '/var/run/gstd', description : 'Specify the location of the gstd\'s PID file') From 4c58a72137f62a974bbbef21c97a0d3838a94e07 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 17:53:15 -0600 Subject: [PATCH 23/30] Replace custom json handling logic for serde_json dep --- libgstc/rust/gstc/Cargo.toml | 3 +- libgstc/rust/gstc/src/json.rs | 192 +++++++--------------------------- 2 files changed, 39 insertions(+), 156 deletions(-) diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml index 1f12dc72..415b12fb 100644 --- a/libgstc/rust/gstc/Cargo.toml +++ b/libgstc/rust/gstc/Cargo.toml @@ -6,6 +6,7 @@ description = "Rust client for RidgeRun GStreamer Daemon (GstD)" license = "BSD-3-Clause" [dependencies] +serde_json = "1" [[example]] name = "pipeline_lifecycle" @@ -29,4 +30,4 @@ path = "../../../examples/libgstc/rust/gapless_playback_rust.rs" [[example]] name = "mp4_recording" -path = "../../../examples/libgstc/rust/mp4_recording_rust.rs" \ No newline at end of file +path = "../../../examples/libgstc/rust/mp4_recording_rust.rs" diff --git a/libgstc/rust/gstc/src/json.rs b/libgstc/rust/gstc/src/json.rs index 05cd71c4..8a6fd16f 100644 --- a/libgstc/rust/gstc/src/json.rs +++ b/libgstc/rust/gstc/src/json.rs @@ -31,16 +31,19 @@ */ use crate::Status; +use serde_json::Value; pub(crate) fn json_get_int(json: &str, name: &str) -> Result { - let start = find_key_value_start(json, name).ok_or(Status::NotFound)?; - parse_json_int(json, start) + let value = parse_json(json)?; + let number = value.get(name).ok_or(Status::NotFound)?; + let number = number.as_i64().ok_or(Status::TypeError)?; + + number.try_into().map_err(|_| Status::TypeError) } pub(crate) fn json_is_null_field(json: &str, name: &str) -> Result { - let start = find_key_value_start(json, name).ok_or(Status::NotFound)?; - let start = skip_ws(json, start); - Ok(json[start..].starts_with("null")) + let value = parse_json(json)?; + Ok(value.get(name).ok_or(Status::NotFound)?.is_null()) } pub(crate) fn json_child_string( @@ -48,8 +51,15 @@ pub(crate) fn json_child_string( parent_name: &str, data_name: &str, ) -> Result { - let parent = extract_object_for_key(json, parent_name)?; - extract_string_for_key(parent, data_name) + let value = parse_json(json)?; + let parent = value.get(parent_name).ok_or(Status::NotFound)?; + + parent + .get(data_name) + .ok_or(Status::NotFound)? + .as_str() + .map(ToOwned::to_owned) + .ok_or(Status::TypeError) } pub(crate) fn json_child_char_array( @@ -58,154 +68,26 @@ pub(crate) fn json_child_char_array( array_name: &str, element_name: &str, ) -> Result, Status> { - let parent = extract_object_for_key(json, parent_name)?; - let array = extract_array_for_key(parent, array_name)?; - - let mut out = Vec::new(); - let mut cursor = 1usize; - while cursor < array.len() { - let cursor_ws = skip_ws(array, cursor); - if cursor_ws >= array.len() || array.as_bytes()[cursor_ws] == b']' { - break; - } - - if array.as_bytes()[cursor_ws] != b'{' { - return Err(Status::TypeError); - } - - let (obj, next_idx) = extract_balanced(array, cursor_ws, b'{', b'}')?; - out.push(extract_string_for_key(obj, element_name)?); - cursor = skip_past_comma(array, next_idx); - } - - Ok(out) -} - -fn skip_ws(s: &str, mut idx: usize) -> usize { - while idx < s.len() && s.as_bytes()[idx].is_ascii_whitespace() { - idx += 1; - } - idx + let value = parse_json(json)?; + let parent = value.get(parent_name).ok_or(Status::NotFound)?; + let array = parent + .get(array_name) + .ok_or(Status::TypeError)? + .as_array() + .ok_or(Status::TypeError)?; + + array + .iter() + .map(|item| { + item.get(element_name) + .ok_or(Status::NotFound)? + .as_str() + .map(ToOwned::to_owned) + .ok_or(Status::TypeError) + }) + .collect() } -fn skip_past_comma(s: &str, idx: usize) -> usize { - let idx = skip_ws(s, idx); - if idx < s.len() && s.as_bytes()[idx] == b',' { - idx + 1 - } else { - idx - } -} - -fn find_key_value_start(json: &str, key: &str) -> Option { - let pattern = format!("\"{}\"", key); - if let Some(found) = json.find(&pattern) { - let key_idx = found + pattern.len(); - let colon_idx = json[key_idx..].find(':')?; - return Some(key_idx + colon_idx + 1); - } - None -} - -fn parse_json_int(json: &str, start: usize) -> Result { - let start = skip_ws(json, start); - if start >= json.len() { - return Err(Status::Malformed); - } - - let bytes = json.as_bytes(); - let mut end = start; - if bytes[end] == b'-' { - end += 1; - } - while end < json.len() && bytes[end].is_ascii_digit() { - end += 1; - } - - if end == start || (end == start + 1 && bytes[start] == b'-') { - return Err(Status::TypeError); - } - - json[start..end] - .parse::() - .map_err(|_| Status::TypeError) -} - -fn extract_balanced( - json: &str, - start: usize, - open: u8, - close: u8, -) -> Result<(&str, usize), Status> { - let bytes = json.as_bytes(); - if start >= bytes.len() || bytes[start] != open { - return Err(Status::TypeError); - } - - let mut depth = 0i32; - let mut i = start; - let mut in_string = false; - let mut escaped = false; - while i < bytes.len() { - let b = bytes[i]; - if in_string { - if escaped { - escaped = false; - } else if b == b'\\' { - escaped = true; - } else if b == b'"' { - in_string = false; - } - } else if b == b'"' { - in_string = true; - } else if b == open { - depth += 1; - } else if b == close { - depth -= 1; - if depth == 0 { - return Ok((&json[start..=i], i + 1)); - } - } - i += 1; - } - - Err(Status::Malformed) -} - -fn extract_object_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { - let start = find_key_value_start(json, key).ok_or(Status::NotFound)?; - let start = skip_ws(json, start); - let (obj, _) = extract_balanced(json, start, b'{', b'}')?; - Ok(obj) -} - -fn extract_array_for_key<'a>(json: &'a str, key: &str) -> Result<&'a str, Status> { - let start = find_key_value_start(json, key).ok_or(Status::TypeError)?; - let start = skip_ws(json, start); - let (arr, _) = extract_balanced(json, start, b'[', b']')?; - Ok(arr) -} - -fn extract_string_for_key(json: &str, key: &str) -> Result { - let start = find_key_value_start(json, key).ok_or(Status::NotFound)?; - let start = skip_ws(json, start); - if start >= json.len() || json.as_bytes()[start] != b'"' { - return Err(Status::TypeError); - } - - let mut i = start + 1; - let mut escaped = false; - while i < json.len() { - let b = json.as_bytes()[i]; - if escaped { - escaped = false; - } else if b == b'\\' { - escaped = true; - } else if b == b'"' { - return Ok(json[start + 1..i].to_string()); - } - i += 1; - } - - Err(Status::Malformed) +fn parse_json(json: &str) -> Result { + serde_json::from_str(json).map_err(|_| Status::Malformed) } From 0be7a58f1105a4a3dd1b71a45bc86d38b4e061b7 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 18:08:49 -0600 Subject: [PATCH 24/30] Replace gstc_rust with gstc for the crate name --- examples/libgstc/rust/dynamic_property_change_rust.rs | 2 +- examples/libgstc/rust/gapless_playback_rust.rs | 2 +- examples/libgstc/rust/mp4_recording_rust.rs | 2 +- examples/libgstc/rust/pipeline_lifecycle_rust.rs | 2 +- examples/libgstc/rust/simple_pipeline_rust.rs | 2 +- examples/libgstc/rust/wait_on_bus_rust.rs | 2 +- libgstc/rust/gstc/Cargo.toml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs index 15f7f7a1..bc8f1b88 100644 --- a/examples/libgstc/rust/dynamic_property_change_rust.rs +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; use std::io; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index d13bb4e8..49ca8311 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; use std::env; use std::io; use std::path::PathBuf; diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index e28dbddf..3d2e9483 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; use std::io; fn main() -> Result<(), Status> { diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs index f5f31d89..73355f14 100644 --- a/examples/libgstc/rust/pipeline_lifecycle_rust.rs +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; use std::thread; use std::time::{Duration, Instant}; diff --git a/examples/libgstc/rust/simple_pipeline_rust.rs b/examples/libgstc/rust/simple_pipeline_rust.rs index d7599295..ebccca2c 100644 --- a/examples/libgstc/rust/simple_pipeline_rust.rs +++ b/examples/libgstc/rust/simple_pipeline_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; use std::io; fn main() -> Result<(), Status> { diff --git a/examples/libgstc/rust/wait_on_bus_rust.rs b/examples/libgstc/rust/wait_on_bus_rust.rs index f004225f..913e8c9f 100644 --- a/examples/libgstc/rust/wait_on_bus_rust.rs +++ b/examples/libgstc/rust/wait_on_bus_rust.rs @@ -18,7 +18,7 @@ * Boston, MA 02110-1301, USA. */ -use gstc_rust::{Client, Status}; +use gstc::{Client, Status}; fn main() -> Result<(), Status> { let client = Client::new("127.0.0.1", 5000, 5000, false)?; diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml index 415b12fb..fa10c891 100644 --- a/libgstc/rust/gstc/Cargo.toml +++ b/libgstc/rust/gstc/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "gstc_rust" +name = "gstc" version = "0.1.0" edition = "2021" description = "Rust client for RidgeRun GStreamer Daemon (GstD)" From 0667e3afe9c7791b09d552e66a1ad82b0834d4c8 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 18:27:11 -0600 Subject: [PATCH 25/30] Fix recording artifact save location --- examples/libgstc/rust/mp4_recording_rust.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index 3d2e9483..b3b504de 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -19,18 +19,25 @@ */ use gstc::{Client, Status}; -use std::io; +use std::{env, io}; -fn main() -> Result<(), Status> { +fn main() -> Result<(), Box> { let client = Client::new("127.0.0.1", 5000, -1, true)?; + let output = env::current_dir().map(|dir| dir.join("mp4_recording.mp4"))?; + let output = output.to_string_lossy(); + let output = output.replace('\\', "\\\\").replace('"', "\\\""); client.pipeline_create( "pipe", - "qtmux name=mux ! filesink location=mp4_recording.mp4 \ + &format!( + "qtmux name=mux ! filesink location=\"{}\" \ videotestsrc is-live=true ! avenc_mpeg4 ! mux. \ audiotestsrc is-live=true ! lamemp3enc ! mux.", + output + ), )?; println!("Pipeline created successfully!"); + println!("Recording to: {}", output); client.pipeline_play("pipe")?; println!("Pipeline set to playing!"); @@ -62,6 +69,7 @@ fn main() -> Result<(), Status> { client.pipeline_delete("pipe")?; println!("Pipeline deleted!"); + println!("Recording finalized at: {}", output); Ok(()) } From 42c2098c7fd7a85643b7da4a33d913bfa52d37d3 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 18:28:01 -0600 Subject: [PATCH 26/30] Add root Cargo files --- Cargo.lock | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 5 +++ 2 files changed, 110 insertions(+) create mode 100644 Cargo.lock create mode 100644 Cargo.toml diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..2019d39c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,105 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "gstc" +version = "0.1.0" +dependencies = [ + "serde_json", +] + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..ac9a98c1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] +members = [ + "libgstc/rust/gstc", +] +resolver = "2" From a0cb09c43ce49f248cce8c66ee6c70dc870beae8 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 18:28:32 -0600 Subject: [PATCH 27/30] Update documentation --- examples/libgstc/rust/README.md | 26 ++++++++--------- libgstc/rust/gstc/README.md | 52 ++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/examples/libgstc/rust/README.md b/examples/libgstc/rust/README.md index 093d9dad..f6fabb67 100644 --- a/examples/libgstc/rust/README.md +++ b/examples/libgstc/rust/README.md @@ -1,6 +1,6 @@ # Rust `libgstc` Examples -These examples show how to control GStreamer Daemon (`gstd`) using the Rust `gstc_rust` client library. +These examples show how to control GStreamer Daemon (`gstd`) using the Rust `gstc` client library. ## Prerequisites @@ -8,30 +8,31 @@ These examples show how to control GStreamer Daemon (`gstd`) using the Rust `gst - Rust examples assume the daemon is reachable at that address and port - Some examples require GStreamer plugins such as `autovideosink`, `playbin`, `qtmux`, `avenc_mpeg4`, and `lamemp3enc` +## How to Build +To build the examples, run the following command from the repository root: +```bash +cargo build --examples +``` +The examples will be available at the following path: + ```bash +target/debug/examples/ +``` + ## How To Run -You can run the examples with Cargo from the Rust client crate directory: +You can run the examples directly as an executable or +with Cargo from the Repository root: ```bash -cd libgstc/rust/gstc cargo run --example simple_pipeline ``` Examples that take an argument can be run like this: ```bash -cd libgstc/rust/gstc cargo run --example gapless_playback -- /path/to/video.mp4 ``` -You can also build them with Meson from the repository root: - -```bash -meson setup build -meson compile -C build -./build/examples/libgstc/rust/simple_pipeline -``` - ## Examples ### `simple_pipeline` @@ -57,7 +58,6 @@ Plays a media file with `playbin`, waits for EOS, then seeks back to the start t Run with: ```bash -cd libgstc/rust/gstc cargo run --example gapless_playback -- /path/to/video.mp4 ``` diff --git a/libgstc/rust/gstc/README.md b/libgstc/rust/gstc/README.md index d979a6a4..715765ec 100644 --- a/libgstc/rust/gstc/README.md +++ b/libgstc/rust/gstc/README.md @@ -1,27 +1,57 @@ -# gstc_rust (Rust) +# gstc (Rust) Rust client library for RidgeRun GstD. ## Build and run -### Meson (project-integrated build) +### Cargo -From repo root: +A `Cargo.toml` is maintained for crate workflows, for example, (`cargo run`, `cargo test`, `cargo fmt`, `cargo clippy`). The crate can be added as a dependency in another Cargo project: ```bash -meson setup build -meson compile -C build +[dependencies] +gstc = { path = "" } ``` -Rust examples are built as Meson executables under `build/examples/libgstc/rust/`. +To build the library (.rlib), run: +```bash +cargo build +``` +The library will be located at: + ```bash +target/debug/libgstc.rlib +``` -### Cargo +To build the examples, refer to the following file: + ```bash +examples/libgstc/rust/README.md +``` -A `Cargo.toml` is maintained for crate workflows, for example, (`cargo run`, `cargo test`, `cargo fmt`, `cargo clippy`). The crate can be added as a dependency in another Cargo project: +### Meson (project-integrated build) +This approach was tested with the recently released Workspaces support for +Meson version 1.11.0. The following example was tried: ```bash -[dependencies] -gstc_rust = { path = "" } +rust = import('rust') +cargo_ws = rust.workspace() +cargo_dep = ws.subproject('serde_json').dependency() ``` - +However, Meson ran into an issue building the dependency as a subproject: +```bash +error: environment variable `OUT_DIR` not defined at compile time + --> ../subprojects/serde_core-1.0.228/src/crate_root.rs:165:26 + | +165 | include!(concat!(env!("OUT_DIR"), "/private.rs")); + | ^^^^^^^^^^^^^^^ + | + ::: ../subprojects/serde_core-1.0.228/src/lib.rs:111:1 + | +111 | crate_root!(); + | ------------- in this macro invocation + | + = help: Cargo sets build script variables at run time. Use `std::env::var("OUT_DIR")` instead + = note: this error originates in the macro `env` which comes from the expansion of the macro `crate_root` (in Nightly builds, run with -Z macro-backtrace for more info) + +error: aborting due to 1 previous error +``` From 5ef83e267b28df7fd288a003efb0ceb3918a21e3 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Mon, 18 May 2026 18:29:04 -0600 Subject: [PATCH 28/30] Apply format to rust files --- examples/libgstc/rust/dynamic_property_change_rust.rs | 2 +- examples/libgstc/rust/gapless_playback_rust.rs | 7 ++++--- libgstc/rust/gstc/src/client.rs | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs index bc8f1b88..e97f1a00 100644 --- a/examples/libgstc/rust/dynamic_property_change_rust.rs +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -20,8 +20,8 @@ use gstc::{Client, Status}; use std::io; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread; use std::time::Duration; diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs index 49ca8311..58d8864e 100644 --- a/examples/libgstc/rust/gapless_playback_rust.rs +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -22,8 +22,8 @@ use gstc::{Client, Status}; use std::env; use std::io; use std::path::PathBuf; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread; const RATE: f64 = 1.0; @@ -74,8 +74,9 @@ fn main() -> Result<(), Status> { println!("EOS message received!"); - client.pipeline_seek("pipe", RATE, FORMAT, FLAGS, START_TYPE, - START, STOP_TYPE, STOP)?; + client.pipeline_seek( + "pipe", RATE, FORMAT, FLAGS, START_TYPE, START, STOP_TYPE, STOP, + )?; println!("Pipeline reset!"); } diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs index b31c4b25..56ba5cc4 100644 --- a/libgstc/rust/gstc/src/client.rs +++ b/libgstc/rust/gstc/src/client.rs @@ -387,7 +387,7 @@ impl Client { fn cmd_send(&self, request: &str) -> Result<(), Status> { self.cmd_send_get_response(request, self.default_wait_time_ms()?) - .map(|_| ()) + .map(|_| ()) } fn cmd_send_get_response(&self, request: &str, timeout_ms: i32) -> Result { From cdf6c7f623c02b3912b8f2813174d4ac8e995f3e Mon Sep 17 00:00:00 2001 From: lmerayo Date: Wed, 27 May 2026 14:02:41 -0600 Subject: [PATCH 29/30] Add is-live=true to avoid consuming CPU --- examples/libgstc/rust/pipeline_lifecycle_rust.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs index 73355f14..033fc5fc 100644 --- a/examples/libgstc/rust/pipeline_lifecycle_rust.rs +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -46,7 +46,7 @@ fn wait_for_state( fn main() -> Result<(), Status> { let client = Client::new("127.0.0.1", 5000, 5000, false)?; - client.pipeline_create("pipe", "videotestsrc ! fakesink")?; + client.pipeline_create("pipe", "videotestsrc is-live=true ! fakesink")?; client.pipeline_play("pipe")?; let state = wait_for_state(&client, "pipe", "PLAYING", Duration::from_secs(5))?; From 206f7ef5ae44a2ecdaa0e0974122b933a8730885 Mon Sep 17 00:00:00 2001 From: lmerayo Date: Wed, 27 May 2026 14:04:05 -0600 Subject: [PATCH 30/30] Remove character escaping --- examples/libgstc/rust/mp4_recording_rust.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs index b3b504de..c172999c 100644 --- a/examples/libgstc/rust/mp4_recording_rust.rs +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -25,7 +25,6 @@ fn main() -> Result<(), Box> { let client = Client::new("127.0.0.1", 5000, -1, true)?; let output = env::current_dir().map(|dir| dir.join("mp4_recording.mp4"))?; let output = output.to_string_lossy(); - let output = output.replace('\\', "\\\\").replace('"', "\\\""); client.pipeline_create( "pipe",