diff --git a/.gitignore b/.gitignore index b62eca13..9f52f762 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,7 @@ autoregen.sh # debian files *.deb *.build +!**/meson.build *.buildinfo *.changes debian/.debhelper/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..2019d39c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,105 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "gstc" +version = "0.1.0" +dependencies = [ + "serde_json", +] + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..ac9a98c1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] +members = [ + "libgstc/rust/gstc", +] +resolver = "2" diff --git a/examples/libgstc/rust/README.md b/examples/libgstc/rust/README.md new file mode 100644 index 00000000..f6fabb67 --- /dev/null +++ b/examples/libgstc/rust/README.md @@ -0,0 +1,67 @@ +# Rust `libgstc` Examples + +These examples show how to control GStreamer Daemon (`gstd`) using the Rust `gstc` client library. + +## Prerequisites + +- `gstd` must be running and listening on `127.0.0.1:5000` +- Rust examples assume the daemon is reachable at that address and port +- Some examples require GStreamer plugins such as `autovideosink`, `playbin`, `qtmux`, `avenc_mpeg4`, and `lamemp3enc` + +## How to Build +To build the examples, run the following command from the repository root: +```bash +cargo build --examples +``` +The examples will be available at the following path: + ```bash +target/debug/examples/ +``` + +## How To Run + +You can run the examples directly as an executable or +with Cargo from the Repository root: + +```bash +cargo run --example simple_pipeline +``` + +Examples that take an argument can be run like this: + +```bash +cargo run --example gapless_playback -- /path/to/video.mp4 +``` + +## Examples + +### `simple_pipeline` + +Creates a `videotestsrc ! autovideosink` pipeline, starts playback, waits for Enter, then stops and deletes the pipeline. + +### `pipeline_lifecycle` + +Creates a `videotestsrc ! fakesink` pipeline, sets it to `PLAYING`, polls until the daemon reports the expected state, then stops and deletes the pipeline. + +### `wait_on_bus` + +Creates a finite pipeline with `videotestsrc num-buffers=300`, waits for an EOS message on the bus, prints the raw bus message, then cleans up the pipeline. + +### `dynamic_property_change` + +Creates a `videotestsrc` pipeline and changes the `pattern` property once per second while the pipeline is running. Press Enter to stop it. + +### `gapless_playback` + +Plays a media file with `playbin`, waits for EOS, then seeks back to the start to continue playback. Press Enter to stop it. + +Run with: + +```bash +cargo run --example gapless_playback -- /path/to/video.mp4 +``` + +### `mp4_recording` + +Creates a live audio/video recording pipeline that writes to `mp4_recording.mp4`. When you press Enter, it injects EOS, waits for the EOS bus message, then stops and deletes the pipeline. + diff --git a/examples/libgstc/rust/dynamic_property_change_rust.rs b/examples/libgstc/rust/dynamic_property_change_rust.rs new file mode 100644 index 00000000..e97f1a00 --- /dev/null +++ b/examples/libgstc/rust/dynamic_property_change_rust.rs @@ -0,0 +1,65 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; +use std::io; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.pipeline_create("pipe", "videotestsrc name=vts ! autovideosink")?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop the pipeline..."); + let stop_flag = Arc::new(AtomicBool::new(false)); + let thread_stop_flag = Arc::clone(&stop_flag); + thread::spawn(move || { + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + thread_stop_flag.store(true, Ordering::Relaxed); + }); + + let mut format = 0; + loop { + client.element_set("pipe", "vts", "pattern", &format.to_string())?; + format = (format + 1) % 10; + + if stop_flag.load(Ordering::Relaxed) { + break; + } + + thread::sleep(Duration::from_secs(1)); + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/gapless_playback_rust.rs b/examples/libgstc/rust/gapless_playback_rust.rs new file mode 100644 index 00000000..58d8864e --- /dev/null +++ b/examples/libgstc/rust/gapless_playback_rust.rs @@ -0,0 +1,90 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; +use std::env; +use std::io; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; + +const RATE: f64 = 1.0; +const FORMAT: i32 = 3; +const FLAGS: i32 = 1; +const START_TYPE: i32 = 1; +const START: i64 = 0; +const STOP_TYPE: i32 = 1; +const STOP: i64 = -1; + +fn main() -> Result<(), Status> { + let video = match env::args().nth(1) { + Some(path) => path, + None => { + eprintln!("Please provide a video to play"); + return Err(Status::NullArgument); + } + }; + + let abs_path = PathBuf::from(&video) + .canonicalize() + .unwrap_or_else(|_| PathBuf::from(&video)); + let uri = format!("file://{}", abs_path.display()); + + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.pipeline_create("pipe", &format!("playbin uri={}", uri))?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop the pipeline..."); + let stop_flag = Arc::new(AtomicBool::new(false)); + let thread_stop_flag = Arc::clone(&stop_flag); + thread::spawn(move || { + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + thread_stop_flag.store(true, Ordering::Relaxed); + }); + + while !stop_flag.load(Ordering::Relaxed) { + let message = client.pipeline_bus_wait("pipe", "eos", -1)?; + if message.status != Status::Ok { + eprintln!("Unable to read from bus: {}", message.status.code()); + break; + } + + println!("EOS message received!"); + + client.pipeline_seek( + "pipe", RATE, FORMAT, FLAGS, START_TYPE, START, STOP_TYPE, STOP, + )?; + println!("Pipeline reset!"); + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/mp4_recording_rust.rs b/examples/libgstc/rust/mp4_recording_rust.rs new file mode 100644 index 00000000..c172999c --- /dev/null +++ b/examples/libgstc/rust/mp4_recording_rust.rs @@ -0,0 +1,74 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; +use std::{env, io}; + +fn main() -> Result<(), Box> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + let output = env::current_dir().map(|dir| dir.join("mp4_recording.mp4"))?; + let output = output.to_string_lossy(); + + client.pipeline_create( + "pipe", + &format!( + "qtmux name=mux ! filesink location=\"{}\" \ + videotestsrc is-live=true ! avenc_mpeg4 ! mux. \ + audiotestsrc is-live=true ! lamemp3enc ! mux.", + output + ), + )?; + println!("Pipeline created successfully!"); + println!("Recording to: {}", output); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop pipeline..."); + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + + client.pipeline_inject_eos("pipe")?; + println!("EOS sent!"); + + print!("Waiting for EOS... "); + match client.pipeline_bus_wait("pipe", "eos", 10_000_000_000) { + Ok(_bus_message) => { + println!("received!"); + } + Err(Status::BusTimeout) => { + println!("timeout!"); + eprintln!("EOS not received, file may be unreadable"); + } + Err(status) => { + println!("error!"); + eprintln!("An error occurred waiting for EOS: {}", status.code()); + } + } + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + println!("Recording finalized at: {}", output); + + Ok(()) +} diff --git a/examples/libgstc/rust/pipeline_lifecycle_rust.rs b/examples/libgstc/rust/pipeline_lifecycle_rust.rs new file mode 100644 index 00000000..033fc5fc --- /dev/null +++ b/examples/libgstc/rust/pipeline_lifecycle_rust.rs @@ -0,0 +1,59 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; +use std::thread; +use std::time::{Duration, Instant}; + +fn wait_for_state( + client: &Client, + pipeline_name: &str, + expected: &str, + timeout: Duration, +) -> Result { + let start = Instant::now(); + loop { + let state = client.pipeline_get_state(pipeline_name)?; + if state == expected { + return Ok(state); + } + + if start.elapsed() >= timeout { + return Err(Status::Timeout); + } + + thread::sleep(Duration::from_millis(100)); + } +} + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, 5000, false)?; + + client.pipeline_create("pipe", "videotestsrc is-live=true ! fakesink")?; + client.pipeline_play("pipe")?; + + let state = wait_for_state(&client, "pipe", "PLAYING", Duration::from_secs(5))?; + println!("pipeline state: {}", state); + + client.pipeline_stop("pipe")?; + client.pipeline_delete("pipe")?; + + Ok(()) +} diff --git a/examples/libgstc/rust/simple_pipeline_rust.rs b/examples/libgstc/rust/simple_pipeline_rust.rs new file mode 100644 index 00000000..ebccca2c --- /dev/null +++ b/examples/libgstc/rust/simple_pipeline_rust.rs @@ -0,0 +1,47 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; +use std::io; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, -1, true)?; + + client.ping()?; + println!("GStreamer daemon is alive!"); + + client.pipeline_create("pipe", "videotestsrc ! autovideosink")?; + println!("Pipeline created successfully!"); + + client.pipeline_play("pipe")?; + println!("Pipeline set to playing!"); + + println!("Press enter to stop pipeline..."); + let mut line = String::new(); + let _ = io::stdin().read_line(&mut line); + + client.pipeline_stop("pipe")?; + println!("Pipeline set to null!"); + + client.pipeline_delete("pipe")?; + println!("Pipeline deleted!"); + + Ok(()) +} diff --git a/examples/libgstc/rust/wait_on_bus_rust.rs b/examples/libgstc/rust/wait_on_bus_rust.rs new file mode 100644 index 00000000..913e8c9f --- /dev/null +++ b/examples/libgstc/rust/wait_on_bus_rust.rs @@ -0,0 +1,45 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +use gstc::{Client, Status}; + +fn main() -> Result<(), Status> { + let client = Client::new("127.0.0.1", 5000, 5000, false)?; + + let pipeline_name = "video"; + let pipeline_desc = "videotestsrc num-buffers=300 ! videoconvert ! autovideosink"; + + client.pipeline_create(pipeline_name, pipeline_desc)?; + client.pipeline_play(pipeline_name)?; + + let bus_message = client.pipeline_bus_wait(pipeline_name, "eos", -1)?; + if bus_message.status != Status::Ok { + let _ = client.pipeline_stop(pipeline_name); + let _ = client.pipeline_delete(pipeline_name); + return Err(bus_message.status); + } + + println!("received EOS: {}", bus_message.raw_response); + + client.pipeline_stop(pipeline_name)?; + client.pipeline_delete(pipeline_name)?; + + Ok(()) +} diff --git a/libgstc/rust/gstc/Cargo.toml b/libgstc/rust/gstc/Cargo.toml new file mode 100644 index 00000000..fa10c891 --- /dev/null +++ b/libgstc/rust/gstc/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "gstc" +version = "0.1.0" +edition = "2021" +description = "Rust client for RidgeRun GStreamer Daemon (GstD)" +license = "BSD-3-Clause" + +[dependencies] +serde_json = "1" + +[[example]] +name = "pipeline_lifecycle" +path = "../../../examples/libgstc/rust/pipeline_lifecycle_rust.rs" + +[[example]] +name = "wait_on_bus" +path = "../../../examples/libgstc/rust/wait_on_bus_rust.rs" + +[[example]] +name = "simple_pipeline" +path = "../../../examples/libgstc/rust/simple_pipeline_rust.rs" + +[[example]] +name = "dynamic_property_change" +path = "../../../examples/libgstc/rust/dynamic_property_change_rust.rs" + +[[example]] +name = "gapless_playback" +path = "../../../examples/libgstc/rust/gapless_playback_rust.rs" + +[[example]] +name = "mp4_recording" +path = "../../../examples/libgstc/rust/mp4_recording_rust.rs" diff --git a/libgstc/rust/gstc/README.md b/libgstc/rust/gstc/README.md new file mode 100644 index 00000000..715765ec --- /dev/null +++ b/libgstc/rust/gstc/README.md @@ -0,0 +1,57 @@ +# gstc (Rust) + +Rust client library for RidgeRun GstD. + +## Build and run + +### Cargo + +A `Cargo.toml` is maintained for crate workflows, for example, (`cargo run`, `cargo test`, `cargo fmt`, `cargo clippy`). The crate can be added as a dependency in another Cargo project: + +```bash +[dependencies] +gstc = { path = "" } +``` + +To build the library (.rlib), run: +```bash +cargo build +``` +The library will be located at: + ```bash +target/debug/libgstc.rlib +``` + +To build the examples, refer to the following file: + ```bash +examples/libgstc/rust/README.md +``` + +### Meson (project-integrated build) + +This approach was tested with the recently released Workspaces support for +Meson version 1.11.0. The following example was tried: +```bash +rust = import('rust') +cargo_ws = rust.workspace() +cargo_dep = ws.subproject('serde_json').dependency() +``` + +However, Meson ran into an issue building the dependency as a subproject: +```bash +error: environment variable `OUT_DIR` not defined at compile time + --> ../subprojects/serde_core-1.0.228/src/crate_root.rs:165:26 + | +165 | include!(concat!(env!("OUT_DIR"), "/private.rs")); + | ^^^^^^^^^^^^^^^ + | + ::: ../subprojects/serde_core-1.0.228/src/lib.rs:111:1 + | +111 | crate_root!(); + | ------------- in this macro invocation + | + = help: Cargo sets build script variables at run time. Use `std::env::var("OUT_DIR")` instead + = note: this error originates in the macro `env` which comes from the expansion of the macro `crate_root` (in Nightly builds, run with -Z macro-backtrace for more info) + +error: aborting due to 1 previous error +``` diff --git a/libgstc/rust/gstc/src/client.rs b/libgstc/rust/gstc/src/client.rs new file mode 100644 index 00000000..56ba5cc4 --- /dev/null +++ b/libgstc/rust/gstc/src/client.rs @@ -0,0 +1,443 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::json::{json_child_char_array, json_child_string, json_get_int, json_is_null_field}; +use crate::transport::{ConnectionSettings, Transport}; +use crate::Status; +use std::sync::Mutex; +use std::thread; +use std::thread::JoinHandle; + +pub struct Client { + transport: Mutex, +} + +#[derive(Debug, Clone)] +pub struct BusMessage { + pub status: Status, + pub raw_response: String, +} + +impl Client { + pub fn new( + address: impl Into, + port: u16, + wait_time_ms: i32, + keep_connection_open: bool, + ) -> Result { + let settings = ConnectionSettings { + address: address.into(), + port, + wait_time_ms, + keep_connection_open, + }; + + let transport = Transport::new(settings)?; + + Ok(Self { + transport: Mutex::new(transport), + }) + } + + pub fn ping(&self) -> Result<(), Status> { + self.cmd_send("read /") + } + + pub fn debug(&self, threshold: &str, colors: bool, reset: bool) -> Result<(), Status> { + self.cmd_update("/debug/enable", "true")?; + self.cmd_update("/debug/threshold", threshold)?; + self.cmd_update("/debug/color", bool_str(colors))?; + self.cmd_update("/debug/reset", bool_str(reset)) + } + + pub fn pipeline_create(&self, pipeline_name: &str, pipeline_desc: &str) -> Result<(), Status> { + self.cmd_create( + "/pipelines", + &format!("{} {}", pipeline_name, pipeline_desc), + ) + } + + pub fn pipeline_create_ref( + &self, + pipeline_name: &str, + pipeline_desc: &str, + ) -> Result<(), Status> { + self.cmd_send(&format!( + "pipeline_create_ref {} {}", + pipeline_name, pipeline_desc + )) + } + + pub fn pipeline_delete(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_delete("/pipelines", pipeline_name) + } + + pub fn pipeline_delete_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_delete_ref {}", pipeline_name)) + } + + pub fn pipeline_play(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "playing") + } + + pub fn pipeline_play_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_play_ref {}", pipeline_name)) + } + + pub fn pipeline_pause(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "paused") + } + + pub fn pipeline_stop(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_update(&format!("/pipelines/{}/state", pipeline_name), "null") + } + + pub fn pipeline_stop_ref(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_send(&format!("pipeline_stop_ref {}", pipeline_name)) + } + + pub fn pipeline_get_graph(&self, pipeline_name: &str) -> Result { + self.cmd_read( + &format!("/pipelines/{}/graph", pipeline_name), + self.default_wait_time_ms()?, + ) + } + + pub fn pipeline_get_state(&self, pipeline_name: &str) -> Result { + let response = self.cmd_read( + &format!("/pipelines/{}/state", pipeline_name), + self.default_wait_time_ms()?, + )?; + + json_child_string(&response, "response", "value") + } + + pub fn pipeline_verbose(&self, pipeline_name: &str, value: bool) -> Result<(), Status> { + self.cmd_update( + &format!("/pipelines/{}/verbose", pipeline_name), + bool_str(value), + ) + } + + pub fn element_get( + &self, + pipeline_name: &str, + element: &str, + property: &str, + ) -> Result { + let response = self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/properties/{}", + pipeline_name, element, property + ), + self.default_wait_time_ms()?, + )?; + + json_child_string(&response, "response", "value") + } + + pub fn element_set( + &self, + pipeline_name: &str, + element: &str, + property: &str, + value: &str, + ) -> Result<(), Status> { + self.cmd_update( + &format!( + "/pipelines/{}/elements/{}/properties/{}", + pipeline_name, element, property + ), + value, + ) + } + + pub fn element_properties_list( + &self, + pipeline_name: &str, + element: &str, + ) -> Result, Status> { + let response = self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/properties", + pipeline_name, element + ), + self.default_wait_time_ms()?, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_flush_start(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + "flush_start", + ) + } + + pub fn pipeline_flush_stop(&self, pipeline_name: &str, reset: bool) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + &format!("flush_stop {}", bool_str(reset)), + ) + } + + pub fn pipeline_inject_eos(&self, pipeline_name: &str) -> Result<(), Status> { + self.cmd_create(&format!("/pipelines/{}/event", pipeline_name), "eos") + } + + #[allow(clippy::too_many_arguments)] + pub fn pipeline_seek( + &self, + pipeline_name: &str, + rate: f64, + format: i32, + flags: i32, + start_type: i32, + start: i64, + stop_type: i32, + stop: i64, + ) -> Result<(), Status> { + self.cmd_create( + &format!("/pipelines/{}/event", pipeline_name), + &format!( + "seek {:.6} {} {} {} {} {} {}", + rate, format, flags, start_type, start, stop_type, stop + ), + ) + } + + pub fn pipeline_list_elements(&self, pipeline_name: &str) -> Result, Status> { + let response = self.cmd_read( + &format!("/pipelines/{}/elements/", pipeline_name), + self.default_wait_time_ms()?, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_list(&self) -> Result, Status> { + let response = self.cmd_read("/pipelines", self.default_wait_time_ms()?)?; + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_bus_wait( + &self, + pipeline_name: &str, + message_name: &str, + timeout_ns: i64, + ) -> Result { + self.cmd_update( + &format!("/pipelines/{}/bus/types", pipeline_name), + message_name, + )?; + self.cmd_update( + &format!("/pipelines/{}/bus/timeout", pipeline_name), + &format!("{}", timeout_ns), + )?; + + let raw = self.cmd_read(&format!("/pipelines/{}/bus/message", pipeline_name), -1)?; + + match json_is_null_field(&raw, "response") { + Ok(true) => Err(Status::BusTimeout), + Ok(false) => Ok(BusMessage { + status: Status::Ok, + raw_response: raw, + }), + Err(err) => Err(err), + } + } + + pub fn pipeline_bus_wait_async( + &self, + pipeline_name: String, + message_name: String, + timeout_ns: i64, + callback: F, + ) -> Result, Status> + where + F: FnOnce(BusMessage) + Send + 'static, + { + let mut settings = self.transport_settings()?; + settings.keep_connection_open = false; + + let handle = thread::Builder::new() + .name("gstc-bus-wait".to_string()) + .spawn(move || { + let client = match Client::new( + settings.address, + settings.port, + settings.wait_time_ms, + settings.keep_connection_open, + ) { + Ok(client) => client, + Err(err) => return err, + }; + + match client.pipeline_bus_wait(&pipeline_name, &message_name, timeout_ns) { + Ok(message) => { + let status = message.status; + callback(message); + status + } + Err(err) => err, + } + }) + .map_err(|_| Status::ThreadError)?; + + Ok(handle) + } + + pub fn pipeline_emit_action( + &self, + pipeline_name: &str, + element: &str, + action: &str, + ) -> Result<(), Status> { + self.cmd_create( + &format!( + "/pipelines/{}/elements/{}/actions/{}", + pipeline_name, element, action + ), + action, + ) + } + + pub fn pipeline_list_signals( + &self, + pipeline_name: &str, + element: &str, + ) -> Result, Status> { + let response = self.cmd_read( + &format!("/pipelines/{}/elements/{}/signals", pipeline_name, element), + self.default_wait_time_ms()?, + )?; + + json_child_char_array(&response, "response", "nodes", "name") + } + + pub fn pipeline_signal_connect( + &self, + pipeline_name: &str, + element: &str, + signal: &str, + timeout: i32, + ) -> Result { + self.cmd_update( + &format!( + "/pipelines/{}/elements/{}/signals/{}/timeout", + pipeline_name, element, signal + ), + &format!("{}", timeout), + )?; + + self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/signals/{}/callback", + pipeline_name, element, signal + ), + self.default_wait_time_ms()?, + ) + } + + pub fn pipeline_signal_disconnect( + &self, + pipeline_name: &str, + element: &str, + signal: &str, + ) -> Result<(), Status> { + self.cmd_read( + &format!( + "/pipelines/{}/elements/{}/signals/{}/disconnect", + pipeline_name, element, signal + ), + self.default_wait_time_ms()?, + ) + .map(|_| ()) + } + + fn cmd_send(&self, request: &str) -> Result<(), Status> { + self.cmd_send_get_response(request, self.default_wait_time_ms()?) + .map(|_| ()) + } + + fn cmd_send_get_response(&self, request: &str, timeout_ms: i32) -> Result { + let response = self.send_request(request, timeout_ms)?; + let code = json_get_int(&response, "code")?; + let status = Status::from_code(code); + + if status.is_ok() { + Ok(response) + } else { + Err(status) + } + } + + fn cmd_create(&self, where_: &str, what: &str) -> Result<(), Status> { + self.cmd_send(&format!("create {} {}", where_, what)) + } + + fn cmd_read(&self, what: &str, timeout_ms: i32) -> Result { + self.cmd_send_get_response(&format!("read {}", what), timeout_ms) + } + + fn cmd_update(&self, what: &str, how: &str) -> Result<(), Status> { + self.cmd_send(&format!("update {} {}", what, how)) + } + + fn cmd_delete(&self, where_: &str, what: &str) -> Result<(), Status> { + self.cmd_send(&format!("delete {} {}", where_, what)) + } + + fn send_request(&self, request: &str, timeout_ms: i32) -> Result { + let mut guard = self.transport.lock().map_err(|_| Status::SocketError)?; + guard.send_command(request, timeout_ms) + } + + fn default_wait_time_ms(&self) -> Result { + let guard = self.transport.lock().map_err(|_| Status::SocketError)?; + Ok(guard.wait_time_ms()) + } + + fn transport_settings(&self) -> Result { + let guard = self.transport.lock().map_err(|_| Status::SocketError)?; + Ok(guard.clone_settings()) + } +} + +fn bool_str(value: bool) -> &'static str { + if value { + "true" + } else { + "false" + } +} diff --git a/libgstc/rust/gstc/src/json.rs b/libgstc/rust/gstc/src/json.rs new file mode 100644 index 00000000..8a6fd16f --- /dev/null +++ b/libgstc/rust/gstc/src/json.rs @@ -0,0 +1,93 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::Status; +use serde_json::Value; + +pub(crate) fn json_get_int(json: &str, name: &str) -> Result { + let value = parse_json(json)?; + let number = value.get(name).ok_or(Status::NotFound)?; + let number = number.as_i64().ok_or(Status::TypeError)?; + + number.try_into().map_err(|_| Status::TypeError) +} + +pub(crate) fn json_is_null_field(json: &str, name: &str) -> Result { + let value = parse_json(json)?; + Ok(value.get(name).ok_or(Status::NotFound)?.is_null()) +} + +pub(crate) fn json_child_string( + json: &str, + parent_name: &str, + data_name: &str, +) -> Result { + let value = parse_json(json)?; + let parent = value.get(parent_name).ok_or(Status::NotFound)?; + + parent + .get(data_name) + .ok_or(Status::NotFound)? + .as_str() + .map(ToOwned::to_owned) + .ok_or(Status::TypeError) +} + +pub(crate) fn json_child_char_array( + json: &str, + parent_name: &str, + array_name: &str, + element_name: &str, +) -> Result, Status> { + let value = parse_json(json)?; + let parent = value.get(parent_name).ok_or(Status::NotFound)?; + let array = parent + .get(array_name) + .ok_or(Status::TypeError)? + .as_array() + .ok_or(Status::TypeError)?; + + array + .iter() + .map(|item| { + item.get(element_name) + .ok_or(Status::NotFound)? + .as_str() + .map(ToOwned::to_owned) + .ok_or(Status::TypeError) + }) + .collect() +} + +fn parse_json(json: &str) -> Result { + serde_json::from_str(json).map_err(|_| Status::Malformed) +} diff --git a/libgstc/rust/gstc/src/lib.rs b/libgstc/rust/gstc/src/lib.rs new file mode 100644 index 00000000..3bf2d84f --- /dev/null +++ b/libgstc/rust/gstc/src/lib.rs @@ -0,0 +1,39 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +mod client; +mod json; +mod status; +mod transport; + +pub use client::{BusMessage, Client}; +pub use status::Status; diff --git a/libgstc/rust/gstc/src/status.rs b/libgstc/rust/gstc/src/status.rs new file mode 100644 index 00000000..59299d21 --- /dev/null +++ b/libgstc/rust/gstc/src/status.rs @@ -0,0 +1,128 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use std::fmt; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Status { + Ok, + NullArgument, + Unreachable, + Timeout, + Oom, + TypeError, + Malformed, + NotFound, + SendError, + RecvError, + SocketError, + ThreadError, + BusTimeout, + SocketTimeout, + LongResponse, + Unknown(i32), +} + +impl Status { + pub fn from_code(code: i32) -> Status { + match code { + 0 => Status::Ok, + -1 => Status::NullArgument, + -2 => Status::Unreachable, + -3 => Status::Timeout, + -4 => Status::Oom, + -5 => Status::TypeError, + -6 => Status::Malformed, + -7 => Status::NotFound, + -8 => Status::SendError, + -9 => Status::RecvError, + -10 => Status::SocketError, + -11 => Status::ThreadError, + -12 => Status::BusTimeout, + -13 => Status::SocketTimeout, + -14 => Status::LongResponse, + other => Status::Unknown(other), + } + } + + pub fn code(self) -> i32 { + match self { + Status::Ok => 0, + Status::NullArgument => -1, + Status::Unreachable => -2, + Status::Timeout => -3, + Status::Oom => -4, + Status::TypeError => -5, + Status::Malformed => -6, + Status::NotFound => -7, + Status::SendError => -8, + Status::RecvError => -9, + Status::SocketError => -10, + Status::ThreadError => -11, + Status::BusTimeout => -12, + Status::SocketTimeout => -13, + Status::LongResponse => -14, + Status::Unknown(code) => code, + } + } + + pub fn is_ok(self) -> bool { + matches!(self, Status::Ok) + } +} + +impl fmt::Display for Status { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = match *self { + Status::Ok => "GSTC_OK", + Status::NullArgument => "GSTC_NULL_ARGUMENT", + Status::Unreachable => "GSTC_UNREACHABLE", + Status::Timeout => "GSTC_TIMEOUT", + Status::Oom => "GSTC_OOM", + Status::TypeError => "GSTC_TYPE_ERROR", + Status::Malformed => "GSTC_MALFORMED", + Status::NotFound => "GSTC_NOT_FOUND", + Status::SendError => "GSTC_SEND_ERROR", + Status::RecvError => "GSTC_RECV_ERROR", + Status::SocketError => "GSTC_SOCKET_ERROR", + Status::ThreadError => "GSTC_THREAD_ERROR", + Status::BusTimeout => "GSTC_BUS_TIMEOUT", + Status::SocketTimeout => "GSTC_SOCKET_TIMEOUT", + Status::LongResponse => "GSTC_LONG_RESPONSE", + Status::Unknown(_) => "GSTC_UNKNOWN", + }; + + write!(f, "{} ({})", name, self.code()) + } +} + +impl std::error::Error for Status {} diff --git a/libgstc/rust/gstc/src/transport.rs b/libgstc/rust/gstc/src/transport.rs new file mode 100644 index 00000000..ff11d9ec --- /dev/null +++ b/libgstc/rust/gstc/src/transport.rs @@ -0,0 +1,156 @@ +/* + * This file is part of GStreamer Daemon + * Copyright 2015-2026 RidgeRun, LLC (http://www.ridgerun.com) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +use crate::Status; +use std::io::{Read, Write}; +use std::net::{Shutdown, TcpStream}; +use std::time::Duration; + +const MAX_RESPONSE_LENGTH: usize = 10 * 1024 * 1024; + +#[derive(Clone, Debug)] +pub(crate) struct ConnectionSettings { + pub(crate) address: String, + pub(crate) port: u16, + pub(crate) wait_time_ms: i32, + pub(crate) keep_connection_open: bool, +} + +pub(crate) struct Transport { + settings: ConnectionSettings, + stream: Option, +} + +impl Transport { + pub(crate) fn new(settings: ConnectionSettings) -> Result { + let mut transport = Self { + settings, + stream: None, + }; + + if transport.settings.keep_connection_open { + let stream = transport.open_socket()?; + transport.stream = Some(stream); + } + + Ok(transport) + } + + pub(crate) fn wait_time_ms(&self) -> i32 { + self.settings.wait_time_ms + } + + pub(crate) fn clone_settings(&self) -> ConnectionSettings { + self.settings.clone() + } + + fn open_socket(&self) -> Result { + TcpStream::connect((self.settings.address.as_str(), self.settings.port)) + .map_err(|_| Status::Unreachable) + } + + pub(crate) fn send_command( + &mut self, + request: &str, + timeout_ms: i32, + ) -> Result { + if request.is_empty() { + return Err(Status::NullArgument); + } + + if self.settings.keep_connection_open { + if let Some(stream) = self.stream.as_mut() { + Self::write_then_read(stream, request, timeout_ms) + } else { + Err(Status::SocketError) + } + } else { + let mut stream = self.open_socket()?; + let ret = Self::write_then_read(&mut stream, request, timeout_ms); + let _ = stream.shutdown(Shutdown::Both); + ret + } + } + + fn write_then_read( + stream: &mut TcpStream, + request: &str, + timeout_ms: i32, + ) -> Result { + let timeout = if timeout_ms < 0 { + None + } else { + Some(Duration::from_millis(timeout_ms as u64)) + }; + + stream + .set_read_timeout(timeout) + .map_err(|_| Status::SocketError)?; + + stream + .write_all(request.as_bytes()) + .map_err(|_| Status::SendError)?; + + let mut response = Vec::::new(); + let mut buffer = [0_u8; 1024]; + + loop { + let n = match stream.read(&mut buffer) { + Ok(0) => return Err(Status::RecvError), + Ok(n) => n, + Err(err) if err.kind() == std::io::ErrorKind::TimedOut => { + return Err(Status::SocketTimeout) + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + return Err(Status::SocketTimeout) + } + Err(_) => return Err(Status::RecvError), + }; + + if let Some(zero_idx) = buffer[..n].iter().position(|b| *b == 0) { + response.extend_from_slice(&buffer[..zero_idx]); + break; + } + + response.extend_from_slice(&buffer[..n]); + if response.len() >= MAX_RESPONSE_LENGTH { + return Err(Status::LongResponse); + } + } + + if response.len() >= MAX_RESPONSE_LENGTH { + return Err(Status::LongResponse); + } + + String::from_utf8(response).map_err(|_| Status::Malformed) + } +}