From 36a9ba45f46772addf1b3ddcc38cf59939981334 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Wed, 28 Jan 2026 21:21:53 +0100 Subject: [PATCH 1/4] unify common sending and receiving operations --- client/src/connection/mod.rs | 72 ++++---------------------------- client/src/connection/ssh/mod.rs | 5 ++- client/src/connection/video.rs | 1 + server/src/connection/mod.rs | 32 ++++++++++---- server/src/connection/ssh/mod.rs | 7 +++- server/src/connection/video.rs | 1 + 6 files changed, 41 insertions(+), 77 deletions(-) diff --git a/client/src/connection/mod.rs b/client/src/connection/mod.rs index 83bbc22..2084815 100644 --- a/client/src/connection/mod.rs +++ b/client/src/connection/mod.rs @@ -1,27 +1,20 @@ mod ssh; pub mod video; -use std::any::type_name; - use anyhow::{Context, Error}; -use futures::{SinkExt, StreamExt}; use review_server::{ config::{StreamConfig, device::DeviceConfig}, + connection::FramedTcpConnection, version::VersionInfo, }; -use serde::{Serialize, de::DeserializeOwned}; use tokio::net::TcpStream; -use tokio_util::{ - bytes::{Bytes, BytesMut}, - codec::{Framed, LengthDelimitedCodec}, -}; use tracing::info; use crate::config::ClientOptions; #[derive(Debug)] pub struct Connection { - framed: Framed, + framed: FramedTcpConnection, } impl Connection { @@ -36,9 +29,9 @@ impl Connection { stream.set_nodelay(true).context("could not set nodelay")?; - let framed = Framed::new(stream, LengthDelimitedCodec::new()); + let framed = FramedTcpConnection::new(stream); - Ok(Connection { framed }) + Ok(Self { framed }) } pub async fn initialize_communication( @@ -50,6 +43,7 @@ impl Connection { .context("error while authenticating")?; let version_info: VersionInfo = self + .framed .receive() .await .context("could not receive version info")?; @@ -70,63 +64,11 @@ impl Connection { info!("sending out stream config {:?}", &stream_config); - self.send(&stream_config) + self.framed + .send(&stream_config) .await .context("could not send device config")?; Ok(device_config) } - - async fn receive(&mut self) -> Result { - let msg = self - .framed - .next() - .await - .context(format!( - "connection closed before message of type {} was received", - type_name::(), - ))? - .context(format!( - "could not receive message of type {}", - type_name::() - ))?; - - let stream_config = bson::deserialize_from_slice(&msg).context(format!( - "could not deserialize message of type {}", - type_name::(), - ))?; - - Ok(stream_config) - } - - async fn receive_raw(&mut self) -> Result { - self.framed - .next() - .await - .context("connection closed before raw message was received".to_string())? - .context("could not receive raw message".to_string()) - } - - async fn send(&mut self, value: &T) -> Result<(), Error> { - let msg = bson::serialize_to_vec(value) - .context(format!("could not serialize type {}", type_name::()))?; - - self.framed - .send(msg.into()) - .await - .context(format!( - "could not send serialized message of type {}", - type_name::() - )) - .map(|_| ()) - } - - #[allow(unused)] - async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> { - self.framed - .send(msg) - .await - .context("could not send raw message".to_string()) - .map(|_| ()) - } } diff --git a/client/src/connection/ssh/mod.rs b/client/src/connection/ssh/mod.rs index e73f136..b2544bc 100644 --- a/client/src/connection/ssh/mod.rs +++ b/client/src/connection/ssh/mod.rs @@ -15,6 +15,7 @@ use keys::get_keys_to_check; impl Connection { pub async fn authenticate(&mut self, client_options: ClientOptions) -> Result<(), Error> { let token: AuthentificationToken = self + .framed .receive() .await .context("could not receive authentification token")?; @@ -71,11 +72,13 @@ impl Connection { let signatures: Signatures = signatures.try_into()?; - self.send(&signatures) + self.framed + .send(&signatures) .await .context("could not send over all public keys to check")?; let authorized_key: AuthorizedPublicKey = self + .framed .receive() .await .context("could not receive authorized key index")?; diff --git a/client/src/connection/video.rs b/client/src/connection/video.rs index b16e2e8..4550c9a 100644 --- a/client/src/connection/video.rs +++ b/client/src/connection/video.rs @@ -26,6 +26,7 @@ impl VideoConnection { let compressed_frame = self .conn + .framed .receive_raw() .await .context("could not reveive next frame")?; diff --git a/server/src/connection/mod.rs b/server/src/connection/mod.rs index b892c12..920a63e 100644 --- a/server/src/connection/mod.rs +++ b/server/src/connection/mod.rs @@ -16,14 +16,14 @@ use crate::version::VersionInfo; #[derive(Debug)] pub struct Connection { - framed: Framed, + framed: FramedTcpConnection, } impl Connection { pub fn new(stream: TcpStream) -> Self { - let framed = Framed::new(stream, LengthDelimitedCodec::new()); + let framed = FramedTcpConnection::new(stream); - Connection { framed } + Self { framed } } pub async fn initialize_communication(&mut self) -> Result { @@ -41,11 +41,13 @@ impl Connection { info!("sending out version information"); - self.send(&version_info) + self.framed + .send(&version_info) .await .context("could not send out version information")?; let stream_config: StreamConfig = self + .framed .receive() .await .context("could not receive stream config")?; @@ -54,8 +56,21 @@ impl Connection { Ok(stream_config) } +} + +#[derive(Debug)] +pub struct FramedTcpConnection { + framed: Framed, +} + +impl FramedTcpConnection { + pub fn new(stream: TcpStream) -> Self { + let framed = Framed::new(stream, LengthDelimitedCodec::new()); + + Self { framed } + } - async fn receive(&mut self) -> Result { + pub async fn receive(&mut self) -> Result { let msg = self .framed .next() @@ -77,8 +92,7 @@ impl Connection { Ok(stream_config) } - #[allow(unused)] - async fn receive_raw(&mut self) -> Result { + pub async fn receive_raw(&mut self) -> Result { self.framed .next() .await @@ -86,7 +100,7 @@ impl Connection { .context("could not receive raw message".to_string()) } - async fn send(&mut self, value: &T) -> Result<(), Error> { + pub async fn send(&mut self, value: &T) -> Result<(), Error> { let msg = bson::serialize_to_vec(value) .context(format!("could not serialize type {}", type_name::()))?; @@ -100,7 +114,7 @@ impl Connection { .map(|_| ()) } - async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> { + pub async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> { self.framed .send(msg) .await diff --git a/server/src/connection/ssh/mod.rs b/server/src/connection/ssh/mod.rs index 691a582..66f2647 100644 --- a/server/src/connection/ssh/mod.rs +++ b/server/src/connection/ssh/mod.rs @@ -74,7 +74,8 @@ pub struct AuthorizedPublicKey { impl Connection { pub async fn authenticate(&mut self) -> Result { let token = AuthentificationToken::new(); - self.send(&token) + self.framed + .send(&token) .await .context("could not send authentification token to client")?; @@ -93,6 +94,7 @@ impl Connection { let authorized_keys = get_authorized_keys().context("could not get authorized keys")?; let signatures: Signatures = self + .framed .receive() .await .context("could not receive public keys")?; @@ -112,7 +114,8 @@ impl Connection { index: authorized_key_index, }; - self.send(&authorized_key_message) + self.framed + .send(&authorized_key_message) .await .context("could not send index of authorized key")?; diff --git a/server/src/connection/video.rs b/server/src/connection/video.rs index c466b3d..cce892a 100644 --- a/server/src/connection/video.rs +++ b/server/src/connection/video.rs @@ -48,6 +48,7 @@ impl VideoConnection { ); self.conn + .framed .send_raw(encoded_buffer.into()) .await .context("could not send next frame")?; From 3e9990c34a10959a0f85d49e639fe5cb45835710 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Wed, 28 Jan 2026 21:25:49 +0100 Subject: [PATCH 2/4] remove unused function temp --- client/src/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/config.rs b/client/src/config.rs index 82b2244..2b9713b 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -97,6 +97,7 @@ fn resolve_option>(cli_value: Option, variable_name: &str, de ) } +/* fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) -> bool { let cli_value = if cli_value { Some(true) } else { None }; resolve_with( @@ -116,6 +117,7 @@ fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) - default, ) } +*/ fn resolve_with( cli_value: Option, From 64e505b8cb995ab3d75b4766b17641514c5cde0f Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Wed, 28 Jan 2026 21:27:34 +0100 Subject: [PATCH 3/4] update version in readme (late) --- README.org | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.org b/README.org index 662b09b..1fdea9c 100644 --- a/README.org +++ b/README.org @@ -26,8 +26,8 @@ Please refer to the SSH section for further details. #+BEGIN_HTML

- - + + From 19dc67354a64dc9abb2d6fa6d315d7010ebbea2f Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Wed, 28 Jan 2026 21:33:01 +0100 Subject: [PATCH 4/4] allow unused receive_raw function --- server/src/connection/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/connection/mod.rs b/server/src/connection/mod.rs index 920a63e..c8c5d4a 100644 --- a/server/src/connection/mod.rs +++ b/server/src/connection/mod.rs @@ -92,6 +92,7 @@ impl FramedTcpConnection { Ok(stream_config) } + #[allow(unused)] pub async fn receive_raw(&mut self) -> Result { self.framed .next()