Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.org
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ Please refer to the SSH section for further details.

#+BEGIN_HTML
<p align="center">
<a href="https://img.shields.io/badge/Version-v0.1.0-blue">
<img src="https://img.shields.io/badge/Version-v0.1.0-blue">
<a href="https://img.shields.io/badge/Version-v0.2.0-blue">
<img src="https://img.shields.io/badge/Version-v0.2.0-blue">
</a>
<a href="https://github.com/cloudsftp/reView/actions/workflows/compile.yaml">
<img src="https://github.com/cloudsftp/reView/actions/workflows/compile.yaml/badge.svg">
Expand Down
2 changes: 2 additions & 0 deletions client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ fn resolve_option<T: From<String>>(cli_value: Option<T>, variable_name: &str, de
)
}

/*
fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) -> bool {
let cli_value = if cli_value { Some(true) } else { None };
resolve_with(
Expand All @@ -116,6 +117,7 @@ fn resolve_boolean_option(cli_value: bool, variable_name: &str, default: bool) -
default,
)
}
*/

fn resolve_with<T>(
cli_value: Option<T>,
Expand Down
72 changes: 7 additions & 65 deletions client/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
mod ssh;
pub mod video;

use std::any::type_name;

use anyhow::{Context, Error};
use futures::{SinkExt, StreamExt};
use review_server::{
config::{StreamConfig, device::DeviceConfig},
connection::FramedTcpConnection,
version::VersionInfo,
};
use serde::{Serialize, de::DeserializeOwned};
use tokio::net::TcpStream;
use tokio_util::{
bytes::{Bytes, BytesMut},
codec::{Framed, LengthDelimitedCodec},
};
use tracing::info;

use crate::config::ClientOptions;

#[derive(Debug)]
pub struct Connection {
framed: Framed<TcpStream, LengthDelimitedCodec>,
framed: FramedTcpConnection,
}

impl Connection {
Expand All @@ -36,9 +29,9 @@ impl Connection {

stream.set_nodelay(true).context("could not set nodelay")?;

let framed = Framed::new(stream, LengthDelimitedCodec::new());
let framed = FramedTcpConnection::new(stream);

Ok(Connection { framed })
Ok(Self { framed })
}

pub async fn initialize_communication(
Expand All @@ -50,6 +43,7 @@ impl Connection {
.context("error while authenticating")?;

let version_info: VersionInfo = self
.framed
.receive()
.await
.context("could not receive version info")?;
Expand All @@ -70,63 +64,11 @@ impl Connection {

info!("sending out stream config {:?}", &stream_config);

self.send(&stream_config)
self.framed
.send(&stream_config)
.await
.context("could not send device config")?;

Ok(device_config)
}

async fn receive<T: DeserializeOwned>(&mut self) -> Result<T, Error> {
let msg = self
.framed
.next()
.await
.context(format!(
"connection closed before message of type {} was received",
type_name::<T>(),
))?
.context(format!(
"could not receive message of type {}",
type_name::<T>()
))?;

let stream_config = bson::deserialize_from_slice(&msg).context(format!(
"could not deserialize message of type {}",
type_name::<T>(),
))?;

Ok(stream_config)
}

async fn receive_raw(&mut self) -> Result<BytesMut, Error> {
self.framed
.next()
.await
.context("connection closed before raw message was received".to_string())?
.context("could not receive raw message".to_string())
}

async fn send<T: Serialize>(&mut self, value: &T) -> Result<(), Error> {
let msg = bson::serialize_to_vec(value)
.context(format!("could not serialize type {}", type_name::<T>()))?;

self.framed
.send(msg.into())
.await
.context(format!(
"could not send serialized message of type {}",
type_name::<T>()
))
.map(|_| ())
}

#[allow(unused)]
async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
self.framed
.send(msg)
.await
.context("could not send raw message".to_string())
.map(|_| ())
}
}
5 changes: 4 additions & 1 deletion client/src/connection/ssh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use keys::get_keys_to_check;
impl Connection {
pub async fn authenticate(&mut self, client_options: ClientOptions) -> Result<(), Error> {
let token: AuthentificationToken = self
.framed
.receive()
.await
.context("could not receive authentification token")?;
Expand Down Expand Up @@ -71,11 +72,13 @@ impl Connection {

let signatures: Signatures = signatures.try_into()?;

self.send(&signatures)
self.framed
.send(&signatures)
.await
.context("could not send over all public keys to check")?;

let authorized_key: AuthorizedPublicKey = self
.framed
.receive()
.await
.context("could not receive authorized key index")?;
Expand Down
1 change: 1 addition & 0 deletions client/src/connection/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl VideoConnection {

let compressed_frame = self
.conn
.framed
.receive_raw()
.await
.context("could not reveive next frame")?;
Expand Down
31 changes: 23 additions & 8 deletions server/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use crate::version::VersionInfo;

#[derive(Debug)]
pub struct Connection {
framed: Framed<TcpStream, LengthDelimitedCodec>,
framed: FramedTcpConnection,
}

impl Connection {
pub fn new(stream: TcpStream) -> Self {
let framed = Framed::new(stream, LengthDelimitedCodec::new());
let framed = FramedTcpConnection::new(stream);

Connection { framed }
Self { framed }
}

pub async fn initialize_communication(&mut self) -> Result<StreamConfig, Error> {
Expand All @@ -41,11 +41,13 @@ impl Connection {

info!("sending out version information");

self.send(&version_info)
self.framed
.send(&version_info)
.await
.context("could not send out version information")?;

let stream_config: StreamConfig = self
.framed
.receive()
.await
.context("could not receive stream config")?;
Expand All @@ -54,8 +56,21 @@ impl Connection {

Ok(stream_config)
}
}

#[derive(Debug)]
pub struct FramedTcpConnection {
framed: Framed<TcpStream, LengthDelimitedCodec>,
}

impl FramedTcpConnection {
pub fn new(stream: TcpStream) -> Self {
let framed = Framed::new(stream, LengthDelimitedCodec::new());

Self { framed }
}

async fn receive<T: DeserializeOwned>(&mut self) -> Result<T, Error> {
pub async fn receive<T: DeserializeOwned>(&mut self) -> Result<T, Error> {
let msg = self
.framed
.next()
Expand All @@ -78,15 +93,15 @@ impl Connection {
}

#[allow(unused)]
async fn receive_raw(&mut self) -> Result<BytesMut, Error> {
pub async fn receive_raw(&mut self) -> Result<BytesMut, Error> {
self.framed
.next()
.await
.context("connection closed before raw message was received".to_string())?
.context("could not receive raw message".to_string())
}

async fn send<T: Serialize>(&mut self, value: &T) -> Result<(), Error> {
pub async fn send<T: Serialize>(&mut self, value: &T) -> Result<(), Error> {
let msg = bson::serialize_to_vec(value)
.context(format!("could not serialize type {}", type_name::<T>()))?;

Expand All @@ -100,7 +115,7 @@ impl Connection {
.map(|_| ())
}

async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
pub async fn send_raw(&mut self, msg: Bytes) -> Result<(), Error> {
self.framed
.send(msg)
.await
Expand Down
7 changes: 5 additions & 2 deletions server/src/connection/ssh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ pub struct AuthorizedPublicKey {
impl Connection {
pub async fn authenticate(&mut self) -> Result<PublicKey, Error> {
let token = AuthentificationToken::new();
self.send(&token)
self.framed
.send(&token)
.await
.context("could not send authentification token to client")?;

Expand All @@ -93,6 +94,7 @@ impl Connection {
let authorized_keys = get_authorized_keys().context("could not get authorized keys")?;

let signatures: Signatures = self
.framed
.receive()
.await
.context("could not receive public keys")?;
Expand All @@ -112,7 +114,8 @@ impl Connection {
index: authorized_key_index,
};

self.send(&authorized_key_message)
self.framed
.send(&authorized_key_message)
.await
.context("could not send index of authorized key")?;

Expand Down
1 change: 1 addition & 0 deletions server/src/connection/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl VideoConnection {
);

self.conn
.framed
.send_raw(encoded_buffer.into())
.await
.context("could not send next frame")?;
Expand Down