diff --git a/README.org b/README.org
index 662b09b..1fdea9c 100644
--- a/README.org
+++ b/README.org
@@ -26,8 +26,8 @@ Please refer to the SSH section for further details.
#+BEGIN_HTML
-
-
+
+
diff --git a/client/src/config.rs b/client/src/config.rs
index 82b2244..2b9713b 100644
--- a/client/src/config.rs
+++ b/client/src/config.rs
@@ -97,6 +97,7 @@ fn resolve_option>(cli_value: Option, variable_name: &str, de
)
}
+/*
fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) -> bool {
let cli_value = if cli_value { Some(true) } else { None };
resolve_with(
@@ -116,6 +117,7 @@ fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) -
default,
)
}
+*/
fn resolve_with(
cli_value: Option,
diff --git a/client/src/connection/mod.rs b/client/src/connection/mod.rs
index 83bbc22..2084815 100644
--- a/client/src/connection/mod.rs
+++ b/client/src/connection/mod.rs
@@ -1,27 +1,20 @@
mod ssh;
pub mod video;
-use std::any::type_name;
-
use anyhow::{Context, Error};
-use futures::{SinkExt, StreamExt};
use review_server::{
config::{StreamConfig, device::DeviceConfig},
+ connection::FramedTcpConnection,
version::VersionInfo,
};
-use serde::{Serialize, de::DeserializeOwned};
use tokio::net::TcpStream;
-use tokio_util::{
- bytes::{Bytes, BytesMut},
- codec::{Framed, LengthDelimitedCodec},
-};
use tracing::info;
use crate::config::ClientOptions;
#[derive(Debug)]
pub struct Connection {
- framed: Framed,
+ framed: FramedTcpConnection,
}
impl Connection {
@@ -36,9 +29,9 @@ impl Connection {
stream.set_nodelay(true).context("could not set nodelay")?;
- let framed = Framed::new(stream, LengthDelimitedCodec::new());
+ let framed = FramedTcpConnection::new(stream);
- Ok(Connection { framed })
+ Ok(Self { framed })
}
pub async fn initialize_communication(
@@ -50,6 +43,7 @@ impl Connection {
.context("error while authenticating")?;
let version_info: VersionInfo = self
+ .framed
.receive()
.await
.context("could not receive version info")?;
@@ -70,63 +64,11 @@ impl Connection {
info!("sending out stream config {:?}", &stream_config);
- self.send(&stream_config)
+ self.framed
+ .send(&stream_config)
.await
.context("could not send device config")?;
Ok(device_config)
}
-
- async fn receive(&mut self) -> Result {
- let msg = self
- .framed
- .next()
- .await
- .context(format!(
- "connection closed before message of type {} was received",
- type_name::(),
- ))?
- .context(format!(
- "could not receive message of type {}",
- type_name::()
- ))?;
-
- let stream_config = bson::deserialize_from_slice(&msg).context(format!(
- "could not deserialize message of type {}",
- type_name::(),
- ))?;
-
- Ok(stream_config)
- }
-
- async fn receive_raw(&mut self) -> Result {
- self.framed
- .next()
- .await
- .context("connection closed before raw message was received".to_string())?
- .context("could not receive raw message".to_string())
- }
-
- async fn send(&mut self, value: &T) -> Result<(), Error> {
- let msg = bson::serialize_to_vec(value)
- .context(format!("could not serialize type {}", type_name::()))?;
-
- self.framed
- .send(msg.into())
- .await
- .context(format!(
- "could not send serialized message of type {}",
- type_name::()
- ))
- .map(|_| ())
- }
-
- #[allow(unused)]
- async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
- self.framed
- .send(msg)
- .await
- .context("could not send raw message".to_string())
- .map(|_| ())
- }
}
diff --git a/client/src/connection/ssh/mod.rs b/client/src/connection/ssh/mod.rs
index e73f136..b2544bc 100644
--- a/client/src/connection/ssh/mod.rs
+++ b/client/src/connection/ssh/mod.rs
@@ -15,6 +15,7 @@ use keys::get_keys_to_check;
impl Connection {
pub async fn authenticate(&mut self, client_options: ClientOptions) -> Result<(), Error> {
let token: AuthentificationToken = self
+ .framed
.receive()
.await
.context("could not receive authentification token")?;
@@ -71,11 +72,13 @@ impl Connection {
let signatures: Signatures = signatures.try_into()?;
- self.send(&signatures)
+ self.framed
+ .send(&signatures)
.await
.context("could not send over all public keys to check")?;
let authorized_key: AuthorizedPublicKey = self
+ .framed
.receive()
.await
.context("could not receive authorized key index")?;
diff --git a/client/src/connection/video.rs b/client/src/connection/video.rs
index b16e2e8..4550c9a 100644
--- a/client/src/connection/video.rs
+++ b/client/src/connection/video.rs
@@ -26,6 +26,7 @@ impl VideoConnection {
let compressed_frame = self
.conn
+ .framed
.receive_raw()
.await
.context("could not reveive next frame")?;
diff --git a/server/src/connection/mod.rs b/server/src/connection/mod.rs
index b892c12..c8c5d4a 100644
--- a/server/src/connection/mod.rs
+++ b/server/src/connection/mod.rs
@@ -16,14 +16,14 @@ use crate::version::VersionInfo;
#[derive(Debug)]
pub struct Connection {
- framed: Framed,
+ framed: FramedTcpConnection,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
- let framed = Framed::new(stream, LengthDelimitedCodec::new());
+ let framed = FramedTcpConnection::new(stream);
- Connection { framed }
+ Self { framed }
}
pub async fn initialize_communication(&mut self) -> Result {
@@ -41,11 +41,13 @@ impl Connection {
info!("sending out version information");
- self.send(&version_info)
+ self.framed
+ .send(&version_info)
.await
.context("could not send out version information")?;
let stream_config: StreamConfig = self
+ .framed
.receive()
.await
.context("could not receive stream config")?;
@@ -54,8 +56,21 @@ impl Connection {
Ok(stream_config)
}
+}
+
+#[derive(Debug)]
+pub struct FramedTcpConnection {
+ framed: Framed,
+}
+
+impl FramedTcpConnection {
+ pub fn new(stream: TcpStream) -> Self {
+ let framed = Framed::new(stream, LengthDelimitedCodec::new());
+
+ Self { framed }
+ }
- async fn receive(&mut self) -> Result {
+ pub async fn receive(&mut self) -> Result {
let msg = self
.framed
.next()
@@ -78,7 +93,7 @@ impl Connection {
}
#[allow(unused)]
- async fn receive_raw(&mut self) -> Result {
+ pub async fn receive_raw(&mut self) -> Result {
self.framed
.next()
.await
@@ -86,7 +101,7 @@ impl Connection {
.context("could not receive raw message".to_string())
}
- async fn send(&mut self, value: &T) -> Result<(), Error> {
+ pub async fn send(&mut self, value: &T) -> Result<(), Error> {
let msg = bson::serialize_to_vec(value)
.context(format!("could not serialize type {}", type_name::()))?;
@@ -100,7 +115,7 @@ impl Connection {
.map(|_| ())
}
- async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
+ pub async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
self.framed
.send(msg)
.await
diff --git a/server/src/connection/ssh/mod.rs b/server/src/connection/ssh/mod.rs
index 691a582..66f2647 100644
--- a/server/src/connection/ssh/mod.rs
+++ b/server/src/connection/ssh/mod.rs
@@ -74,7 +74,8 @@ pub struct AuthorizedPublicKey {
impl Connection {
pub async fn authenticate(&mut self) -> Result {
let token = AuthentificationToken::new();
- self.send(&token)
+ self.framed
+ .send(&token)
.await
.context("could not send authentification token to client")?;
@@ -93,6 +94,7 @@ impl Connection {
let authorized_keys = get_authorized_keys().context("could not get authorized keys")?;
let signatures: Signatures = self
+ .framed
.receive()
.await
.context("could not receive public keys")?;
@@ -112,7 +114,8 @@ impl Connection {
index: authorized_key_index,
};
- self.send(&authorized_key_message)
+ self.framed
+ .send(&authorized_key_message)
.await
.context("could not send index of authorized key")?;
diff --git a/server/src/connection/video.rs b/server/src/connection/video.rs
index c466b3d..cce892a 100644
--- a/server/src/connection/video.rs
+++ b/server/src/connection/video.rs
@@ -48,6 +48,7 @@ impl VideoConnection {
);
self.conn
+ .framed
.send_raw(encoded_buffer.into())
.await
.context("could not send next frame")?;