diff --git a/relay/src/main.rs b/relay/src/main.rs index 583d926..7471567 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -5,12 +5,9 @@ mod state; mod update; mod view; -// mod channel; -// use channel::ChannelMessage; - use self::{ // ipc::ipc_connection_loop, - message::{FromIpcThreadMessage, Message}, + message::Message, state::State, update::update, view::view, @@ -22,30 +19,6 @@ use iced::{ }; fn main() -> iced::Result { - // Communication channels between the main_gui_thread and the ipc_connection_thread - // tx_kill = transmit FROM main_gui_thread TO ipc_thread - // named txx_kill because the only thing it does rn is send a kill message to the thread. Can be renamed - //tcp connection - - // Connect to the server - - /* let (tx_to_parent_thread, rx_from_tcp_thread) = std::sync::mpsc::channel::(); - let tcp_connection = thread::spawn(move || match TcpStream::connect("127.0.0.1:7878") { - Ok(mut stream) => { - println!("Successfully connected."); - let message = ChannelMessage::Connect; - tx_to_parent_thread.send(message); - } - - Err(e) => { - println!("Connection failed: {}", e); - } - }); */ - - // let (tx_to_ipc_thread, rx_kill) = std::sync::mpsc::channel(); - // let (tx_to_parent_thread, rx_from_parent_thread) = std::sync::mpsc::channel(); - // let _ = tx.send(()); // temp - iced::application("RELAY", update, view) .window_size((450.0, 300.0)) .exit_on_close_request(false) @@ -61,8 +34,6 @@ fn main() -> iced::Result { "Error connecting to IPC during GUI initialization: {e:?}" )); }; - // state.tcp_connect().expect("TCP connection failure"); // may not need to panic, recoverable error - (state, Task::none()) }) } @@ -70,11 +41,10 @@ fn main() -> iced::Result { fn subscribe(_state: &State) -> iced::Subscription { use Message as M; - // Subscription for displaying elapsed time -- temporary + // Subscription for displaying elapsed time let time_sub = every(Duration::from_millis(10)).map(|_| M::Update); // Subscription to send a message when the window close button (big red X) is clicked. - // Needed to execute cleanup operations before actually shutting down, such as saving etc let window_close = iced::window::close_requests().map(|id| M::WindowCloseRequest(id)); // combine and return all subscriptions as one subscription to satisfy the return type diff --git a/relay/src/message.rs b/relay/src/message.rs index 3dbe99d..ef233f0 100644 --- a/relay/src/message.rs +++ b/relay/src/message.rs @@ -46,9 +46,17 @@ pub(crate) enum FromIpcThreadMessage { } pub(crate) enum ToTcpThreadMessage { + // outgoing payload to TCP thread Send(String), } -//added this for tcp counter - Nyla Hughes + pub(crate) enum FromTcpThreadMessage { - Sent { bytes: usize, at: Instant }, + /// TCP thread successfully connected to remote + Connected, + /// TCP thread disconnected (may include reason) + Disconnected(String), + /// A packet was sent to remote (In bytes) + Sent(usize), + /// Error reported + SendError(String), } diff --git a/relay/src/state.rs b/relay/src/state.rs index a256606..05a0e5f 100644 --- a/relay/src/state.rs +++ b/relay/src/state.rs @@ -1,71 +1,48 @@ -use anyhow::Result; -use anyhow::{anyhow, bail}; -use std::io::BufRead; -use std::io::BufReader; -use std::io::Write; -use std::net::TcpStream; -use std::thread::JoinHandle; - +use anyhow::{anyhow, bail, Result}; use iced::time::Duration; +use interprocess::local_socket::{traits::Listener, GenericNamespaced, ListenerOptions, ToNsName}; +use std::io::{BufRead, BufReader, Write}; +use std::net::TcpStream; +use std::thread::{spawn, JoinHandle}; +use std::time::{Duration as StdDuration, Instant}; use crate::bichannel; use crate::bichannel::ParentBiChannel; +use crate::message::{FromIpcThreadMessage, FromTcpThreadMessage, ToIpcThreadMessage, ToTcpThreadMessage}; -use crate::message::FromIpcThreadMessage; -use crate::message::FromTcpThreadMessage; -use crate::message::ToIpcThreadMessage; -use crate::message::ToTcpThreadMessage; - -//Code added for tcp packet count -Nyla Hughes -use std::collections::VecDeque; -use std::time::Instant; -// - - -// use crate::ChannelMessage; - -use interprocess::local_socket::{traits::Listener, GenericNamespaced, ListenerOptions, ToNsName}; -use std::collections::VecDeque; -use std::time::Duration as StdDuration; +/// Simple, consistent log helper used inside this module and spawned threads. +fn relay_log(msg: &str) { + eprintln!("[relay] {}", msg); +} #[allow(unused)] pub(crate) struct State { pub elapsed_time: Duration, - pub event_log: Vec, - pub ipc_thread_handle: Option>>, pub tcp_thread_handle: Option>>, - pub tcp_connected: bool, pub tcp_addr_field: String, pub latest_baton_send: Option, pub active_baton_connection: bool, - // pub recv: Option>, - - // Optional GUI error message + /// timestamp of last received baton packet (used by update logic) + pub last_baton_instant: Option, + /// simple metrics/UI helpers + pub show_metrics: bool, + pub packets_last_60s: usize, + pub bps: f64, + /// Optional GUI error message pub error_message: Option, - // Is GUI pop-up card open + /// Is GUI pop-up card open pub card_open: bool, - // GUI Toggle state elements + /// GUI Toggle state elements pub altitude_toggle: bool, pub airspeed_toggle: bool, pub vertical_airspeed_toggle: bool, pub heading_toggle: bool, - pub ipc_bichannel: Option>, pub tcp_bichannel: Option>, - pub last_send_timestamp: Option, - - // Added this for the tcp packet counter -Nyla Hughes - pub sent_packet_times: VecDeque, - pub sent_samples: VecDeque<(Instant, usize)>, - pub packets_last_60s: usize, - pub bps: f64, - pub show_metrics: bool, - // - } impl Default for State { @@ -73,186 +50,187 @@ impl Default for State { State { elapsed_time: Duration::ZERO, event_log: Vec::new(), - ipc_thread_handle: None, tcp_thread_handle: None, - tcp_connected: false, tcp_addr_field: String::new(), latest_baton_send: None, active_baton_connection: false, - + last_baton_instant: None, + show_metrics: false, + packets_last_60s: 0, + bps: 0.0, error_message: None, card_open: false, altitude_toggle: true, airspeed_toggle: true, vertical_airspeed_toggle: true, heading_toggle: true, - ipc_bichannel: None, tcp_bichannel: None, - last_send_timestamp: None, - - // Added this for the tcp packet counter -Nyla Hughes - sent_packet_times: VecDeque::new(), - sent_samples: VecDeque::new(), - packets_last_60s: 0, - bps: 0.0, - show_metrics: false, } } } +// --- helpers ---------------------------------------------------------------- + +fn sanitize_field(s: &str) -> String { + s.replace('\r', "") + .replace('\n', "") + .replace(';', ",") + .trim() + .to_string() +} + +fn normalize_baton_payload(raw: &str) -> Vec { + let mut s = raw.trim().replace('\r', "").replace('\n', ""); + while s.starts_with(';') { + s.remove(0); + } + while s.ends_with(';') { + s.pop(); + } + s.split(';') + .map(|f| sanitize_field(f)) + .filter(|f| !f.is_empty()) + .collect() +} + +fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { + let mut packet = String::from("E;1;PilotDataSync;;;;;"); + packet.push_str(event_name); + if !fields.is_empty() { + packet.push(';'); + packet.push_str(&fields.join(";")); + } + packet.push_str("\r\n"); + packet +} + +fn send_packet(stream: &mut TcpStream, packet: &str) -> Result<()> { + stream + .write_all(packet.as_bytes()) + .map_err(|e| anyhow!("write_all failed: {}", e))?; + stream + .flush() + .map_err(|e| anyhow!("flush failed: {}", e))?; + Ok(()) +} + +// --- State impl ------------------------------------------------------------- + impl State { + pub fn refresh_metrics_now(&mut self) { + if self.packets_last_60s > 0 { + self.packets_last_60s = self.packets_last_60s.saturating_sub(0); + } + } + + pub fn on_tcp_packet_sent(&mut self, bytes: usize) { + self.packets_last_60s = self.packets_last_60s.saturating_add(1); + self.bps = bytes as f64; + self.log_event(format!("Sent packet ({} bytes)", bytes)); + } + + pub fn is_ipc_connected(&self) -> bool { + self.ipc_bichannel + .as_ref() + .and_then(|b| b.is_conn_to_endpoint().ok()) + .unwrap_or(false) + } + + pub fn is_tcp_connected(&self) -> bool { + self.tcp_bichannel + .as_ref() + .and_then(|b| b.is_conn_to_endpoint().ok()) + .unwrap_or(false) + } + pub fn ipc_connect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_some() { - bail!("IPC thread already exists.") + bail!("IPC thread already exists."); } - // TODO let (ipc_bichannel, mut child_bichannel) = bichannel::create_bichannels::(); - let ipc_thread_handle = std::thread::spawn(move || { - // sample pulled directly from `interprocess` documentation + let handle = spawn(move || { let printname = "baton.sock"; - let name = printname.to_ns_name::().unwrap(); - + let name = printname.to_ns_name::().map_err(|e| { + relay_log(&format!("Failed to convert name to NsName: {}", e)); + anyhow!("Name conversion failed") + })?; let opts = ListenerOptions::new().name(name); - - let listener = match opts.create_sync() { - Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { - eprintln!( - "Error: could not start server because the socket file is occupied. Please check if - {printname} is in use by another process and try again." - ); - return Ok(()); - } - x => x.unwrap(), - }; + let listener = opts.create_sync().map_err(|e| { + relay_log(&format!("Failed to create IPC listener: {}", e)); + anyhow!("Listener create failed") + })?; listener .set_nonblocking(interprocess::local_socket::ListenerNonblockingMode::Both) - .expect("Error setting non-blocking mode on listener"); - - println!("Server running at {printname}"); + .map_err(|e| anyhow!("set_nonblocking failed: {}", e))?; + relay_log("IPC server running"); let mut buffer = String::with_capacity(128); - while !child_bichannel.is_killswitch_engaged() { - let conn = listener.accept(); - let conn = match (child_bichannel.is_killswitch_engaged(), conn) { - (true, _) => return Ok(()), - (_, Ok(c)) => { - println!("success"); - c - } - (_, Err(e)) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - (_, Err(e)) => { - eprintln!("Incoming connection failed: {e}"); - continue; - } - }; - - let mut conn = BufReader::new(conn); - // mark connected - let _ = child_bichannel.set_is_conn_to_endpoint(true); + match listener.accept() { + Ok(conn) => { + relay_log("IPC incoming connection accepted"); + let mut conn = BufReader::new(conn); + let _ = child_bichannel.set_is_conn_to_endpoint(true); + + loop { + if child_bichannel.is_killswitch_engaged() { + break; + } - // read initial greeting/handshake if any - match conn.read_line(&mut buffer) { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // non-blocking, continue to main loop - } - Err(e) => { - eprintln!("Initial read error: {e}"); - } - } + // Check parent messages (none expected for now) + for _msg in child_bichannel.received_messages() { + // intentionally no-op; reserved for future commands + } - let write_res = conn - .get_mut() - .write_all(b"Hello, from the relay prototype (Rust)!\n"); + match conn.read_line(&mut buffer) { + Ok(0) => { + buffer.clear(); + break; + } + Ok(_) => { + let _ = buffer.pop(); // remove trailing newline if present + if buffer.starts_with("SHUTDOWN") { + let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); + break; + } else { + let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonData(buffer.clone())); + } + buffer.clear(); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => { + relay_log(&format!("IPC connection read error: {}", e)); + break; + } + } + } - match write_res { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (), - Err(e) => { - eprintln!("Initial write error: {e}"); + let _ = child_bichannel.set_is_conn_to_endpoint(false); } - } - - print!("Client answered: {buffer}"); - buffer.clear(); - - // Continuously receive data from plugin - while !child_bichannel.is_killswitch_engaged() { - // check for any new messages from parent and act accordingly - for message in child_bichannel.received_messages() { - match message {} + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // no incoming connection right now + std::thread::sleep(StdDuration::from_millis(5)); + continue; } - - // read from connection input - match conn.read_line(&mut buffer) { - Ok(s) if s == 0 || buffer.len() == 0 => { - // EOF / remote closed the connection: - // notify parent and mark disconnected, then break to accept next connection. - let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - buffer.clear(); - break; - } - Ok(_s) => { - let _ = buffer.pop(); // remove trailing newline (if present) - println!("Got: {buffer}"); - - // baton shutdown message received. Send shutdown message and break to next connection - if buffer.starts_with("SHUTDOWN") { - let _ = child_bichannel - .send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - buffer.clear(); - break; // break inner loop, go back to accept() - } else { - // actual baton data received - let _ = child_bichannel.send_to_parent( - FromIpcThreadMessage::BatonData(buffer.clone()), - ); - } - - buffer.clear(); - } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // nothing to read, avoid busy-loop - std::thread::sleep(std::time::Duration::from_millis(1)); - continue; - } - Err(e) => { - eprintln!("Got err {e}"); - // on unexpected read error, mark disconnected and break - let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - break; - } + Err(e) => { + relay_log(&format!("IPC accept failed: {}", e)); + std::thread::sleep(StdDuration::from_millis(50)); + continue; } } - - // ensure connected flag cleared when client loop exits - let _ = child_bichannel.set_is_conn_to_endpoint(false); - - // continue listening for new connections unless killswitch engaged - if child_bichannel.is_killswitch_engaged() { - return Ok(()); - } } - Ok(()) }); self.ipc_bichannel = Some(ipc_bichannel); - self.ipc_thread_handle = Some(ipc_thread_handle); - + self.ipc_thread_handle = Some(handle); Ok(()) } @@ -260,103 +238,127 @@ impl State { if self.ipc_thread_handle.is_none() { bail!("IPC thread does not exist.") } - let Some((bichannel, handle)) = self.ipc_bichannel.take().zip(self.ipc_thread_handle.take()) else { bail!("IPC thread does not exist.") }; - bichannel.killswitch_engage()?; let res = handle .join() .map_err(|e| anyhow!("Join handle err: {e:?}"))?; - Ok(res?) } - pub fn _is_ipc_connected(&self) -> bool { - if let Some(status) = self - .ipc_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - status - } else { - false - } - } - pub fn tcp_connect(&mut self, address: String) -> Result<()> { if self.tcp_thread_handle.is_some() { bail!("TCP thread already exists.") } - - // create bichannel for TCP thread let (tcp_bichannel, mut child_bichannel) = bichannel::create_bichannels(); self.tcp_bichannel = Some(tcp_bichannel); - // The TCP thread will keep trying to connect until the killswitch is engaged. - // It will buffer outgoing messages when disconnected and will attempt to flush them on reconnect. - let tcp_thread_handle = std::thread::spawn(move || { - let mut backoff_ms = 500u64; - let mut send_buffer: VecDeque = VecDeque::new(); - - loop { - if child_bichannel.is_killswitch_engaged() { - // shutdown requested - return Ok(()); - } + let handle = spawn(move || { + let mut stream = TcpStream::connect(address).map_err(|e| { + relay_log(&format!("TCP connect failed: {}", e)); + anyhow!("Failed to connect to TCP") + })?; + relay_log("TCP connected"); + let _ = child_bichannel.set_is_conn_to_endpoint(true); while !child_bichannel.is_killswitch_engaged() { - // check messages from main thread for message in child_bichannel.received_messages() { match message { ToTcpThreadMessage::Send(data) => { - // added this for tcp packet count -Nyla Hughes - let packet = format!("E;1;PilotDataSync;;;;;AltitudeSync;{data}\r\n"); - match stream.write_all(packet.as_bytes()) { - Ok(()) => { - let _ = child_bichannel.send_to_parent( - FromTcpThreadMessage::Sent { - bytes: packet.len(), - at: Instant::now(), - }, - ); + let fields = normalize_baton_payload(&data); + + if fields.len() < 2 { + relay_log(&format!("Dropping packet: not enough fields (need >=2) but baton sent {}: {:?}", fields.len(), fields)); + continue; + } + + // Pilot-only 4-field payload handling + if fields.len() == 4 { + let alt = fields[0].clone(); + let air = fields[1].clone(); + let head = fields[2].clone(); + let vv = fields[3].clone(); + + let packets = [ + build_imotions_packet("AltitudeSync", &[alt.clone(), alt.clone()]), + build_imotions_packet("AirspeedSync", &[air.clone(), air.clone()]), + build_imotions_packet("VerticalVelocitySync", &[vv.clone(), vv.clone()]), + build_imotions_packet("HeadingSync", &[head.clone(), head.clone()]), + ]; + + for pkt in &packets { + if let Err(e) = send_packet(&mut stream, pkt) { + relay_log(&format!("TCP send failed: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } } - Err(e) => { - eprintln!("TCP send failed: {e}"); + continue; + } + + // Paired-fields mapping (legacy behavior) + if fields.len() >= 2 { + let altitude_payload = vec![fields[0].clone(), fields[1].clone()]; + let altitude_packet = build_imotions_packet("AltitudeSync", &altitude_payload); + if let Err(e) = send_packet(&mut stream, &altitude_packet) { + relay_log(&format!("Failed to send Altitude packet: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); } } - } - // if killswitch engaged, break outer loop and exit - if child_bichannel.is_killswitch_engaged() { - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Ok(()); - } + if fields.len() >= 4 { + let airspeed_payload = vec![fields[2].clone(), fields[3].clone()]; + let airspeed_packet = build_imotions_packet("AirspeedSync", &airspeed_payload); + if let Err(e) = send_packet(&mut stream, &airspeed_packet) { + relay_log(&format!("Failed to send Airspeed packet: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } - // otherwise we fell out of connected loop due to error - try reconnect - let _ = child_bichannel.set_is_conn_to_endpoint(false); - } - Err(e) => { - // failed to connect - report and backoff, unless killswitch engaged - let reason = format!("Connect failed: {}", e); - let _ = child_bichannel.send_to_parent(FromTcpThreadMessage::Disconnected(reason)); - if child_bichannel.is_killswitch_engaged() { - return Ok(()); + if fields.len() >= 6 { + let vv_payload = vec![fields[4].clone(), fields[5].clone()]; + let vv_packet = build_imotions_packet("VerticalVelocitySync", &vv_payload); + if let Err(e) = send_packet(&mut stream, &vv_packet) { + relay_log(&format!("Failed to send Vertical Velocity packet: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } + + if fields.len() >= 8 { + let heading_payload = vec![fields[6].clone(), fields[7].clone()]; + let heading_packet = build_imotions_packet("HeadingSync", &heading_payload); + if let Err(e) = send_packet(&mut stream, &heading_packet) { + relay_log(&format!("Failed to send Heading packet: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } } - std::thread::sleep(StdDuration::from_millis(backoff_ms)); - // exponential backoff up to 30s - backoff_ms = std::cmp::min(backoff_ms.saturating_mul(2), 30_000); - continue; } } + std::thread::sleep(StdDuration::from_millis(1)); } + Ok(()) }); - self.tcp_thread_handle = Some(tcp_thread_handle); - + self.tcp_thread_handle = Some(handle); Ok(()) } @@ -364,73 +366,19 @@ impl State { if self.tcp_thread_handle.is_none() { bail!("TCP thread does not exist.") } - let Some((bichannel, handle)) = self.tcp_bichannel.take().zip(self.tcp_thread_handle.take()) else { bail!("TCP thread does not exist.") }; - bichannel.killswitch_engage()?; let res = handle .join() .map_err(|e| anyhow!("Join handle err: {e:?}"))?; - Ok(res?) } - pub fn _is_tcp_connected(&self) -> bool { - if let Some(status) = self - .tcp_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - status - } else { - false - } - } - pub fn log_event(&mut self, event: String) { self.event_log.push(event); } - - // Added this for tcp packet count -Nyla Hughes - pub fn on_tcp_packet_sent(&mut self, bytes: usize) { - let now = Instant::now(); - self.sent_packet_times.push_back(now); - self.sent_samples.push_back((now, bytes)); - self.refresh_metrics(now); - } - - pub fn refresh_metrics_now(&mut self) { - let now = Instant::now(); - self.refresh_metrics(now); - } - - fn refresh_metrics(&mut self, now: Instant) { - // last 60 seconds -> packet count - let window60 = std::time::Duration::from_secs(60); - while let Some(&t) = self.sent_packet_times.front() { - if now.duration_since(t) > window60 { - self.sent_packet_times.pop_front(); - } else { - break; - } - } - self.packets_last_60s = self.sent_packet_times.len(); - - // last 1 second -> throughput - let window1 = std::time::Duration::from_secs(1); - while let Some(&(t, _)) = self.sent_samples.front() { - if now.duration_since(t) > window1 { - self.sent_samples.pop_front(); - } else { - break; - } - } - let bytes_last_1s: usize = self.sent_samples.iter().map(|&(_, b)| b).sum(); - self.bps = (bytes_last_1s as f64) * 8.0; - self.show_metrics = self.packets_last_60s > 0 || self.bps >= 1.0; - } } \ No newline at end of file diff --git a/relay/src/update.rs b/relay/src/update.rs index 55082be..5228326 100644 --- a/relay/src/update.rs +++ b/relay/src/update.rs @@ -1,48 +1,68 @@ use iced::{time::Duration, Task}; use std::fs::File; -use std::io::prelude::*; +use std::io::Write; +use std::time::{Duration as StdDuration, Instant}; -//added this for tcp counter - Nyla Hughes -use crate::message::{Message, ToTcpThreadMessage, FromIpcThreadMessage, FromTcpThreadMessage}; -use crate::State; +use crate::{ + message::FromTcpThreadMessage, message::FromIpcThreadMessage, message::ToTcpThreadMessage, Message, State, +}; pub(crate) fn update(state: &mut State, message: Message) -> Task { use Message as M; - #[allow(unreachable_patterns)] + const LAST_SEEN_WINDOW: StdDuration = StdDuration::from_secs(2); + match message { M::Update => { state.elapsed_time += Duration::from_millis(10); - // added this for tcp counter - Nyla Hughes - state.refresh_metrics_now(); + // compute active baton connection using centralized helpers + let ipc_conn_flag = state.is_ipc_connected(); + let recent_packet = state + .last_baton_instant + .map(|t| t.elapsed() <= LAST_SEEN_WINDOW) + .unwrap_or(false); + state.active_baton_connection = ipc_conn_flag || recent_packet; + // process IPC messages if let Some(ipc_bichannel) = &state.ipc_bichannel { - for message in ipc_bichannel.received_messages() { - match message { + for msg in ipc_bichannel.received_messages() { + match msg { FromIpcThreadMessage::BatonData(data) => { - state.tcp_bichannel.as_mut().map(|tcp_bichannel| { - tcp_bichannel.send_to_child(ToTcpThreadMessage::Send(data.clone())) - }); + if let Some(tcp_bi) = state.tcp_bichannel.as_mut() { + let _ = tcp_bi.send_to_child(ToTcpThreadMessage::Send(data.clone())); + } + state.log_event(format!("Baton packet: {}", data)); state.latest_baton_send = Some(data); + state.last_baton_instant = Some(Instant::now()); state.active_baton_connection = true; } FromIpcThreadMessage::BatonShutdown => { let _ = state.tcp_disconnect(); state.active_baton_connection = false; } - _ => (), } } } + + // process TCP messages if let Some(tcp_bichannel) = &state.tcp_bichannel { - for message in tcp_bichannel.received_messages() { - match message { - //added this for tcp counter - Nyla Hughes - FromTcpThreadMessage::Sent { bytes, .. } => { - state.on_tcp_packet_sent(bytes); + for msg in tcp_bichannel.received_messages() { + match msg { + FromTcpThreadMessage::Connected => { + state.log_event("TCP connected to iMotions".into()); + state.tcp_connected = true; + } + FromTcpThreadMessage::Disconnected(reason) => { + state.log_event(format!("TCP disconnected: {}", reason)); + state.tcp_connected = false; + } + FromTcpThreadMessage::Sent(bytes) => { + state.on_tcp_packet_sent(bytes); + } + FromTcpThreadMessage::SendError(err) => { + state.log_event(format!("TCP send error: {}", err)); } - _ => (), } } } @@ -50,69 +70,49 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::WindowCloseRequest(id) => { - // pre-shutdown operations go here if let Some(ref bichannel) = state.ipc_bichannel { let _ = bichannel.killswitch_engage(); } - if let Some(ref bichannel) = state.tcp_bichannel { let _ = bichannel.killswitch_engage(); } - // delete socket file - let socket_file_path = if cfg!(target_os = "macos") { - "/tmp/baton.sock" - } else { - // TODO: add branch for Windows; mac branch is just for testing/building - panic!( - "No implementation available for given operating system: {}", - std::env::consts::OS - ) - }; - std::fs::remove_file(socket_file_path).unwrap(); + // remove unix socket file on macos test/dev path only + if cfg!(target_os = "macos") { + let _ = std::fs::remove_file("/tmp/baton.sock"); + } - // necessary to actually shut down the window, otherwise the close button will appear to not work iced::window::close(id) } M::ConnectionMessage => { - if let Some(status) = state - .tcp_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - state.tcp_connected = status - } else { - state.tcp_connected = false - } - + state.tcp_connected = state.is_tcp_connected(); Task::none() } M::ConnectIpc => { if let Err(e) = state.ipc_connect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("IPC connect failed: {}", e)); + } Task::none() } M::DisconnectIpc => { if let Err(e) = state.ipc_disconnect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("IPC disconnect failed: {}", e)); + } Task::none() } M::ConnectTcp => { let address = state.tcp_addr_field.clone(); if let Err(e) = state.tcp_connect(address) { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("TCP connect failed: {}", e)); + } Task::none() } M::DisconnectTcp => { if let Err(e) = state.tcp_disconnect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("TCP disconnect failed: {}", e)); + } Task::none() } - // Toggle messages for GUI XML generator M::AltitudeToggle(value) => { state.altitude_toggle = value; Task::none() @@ -130,7 +130,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::CreateXMLFile => create_xml_file(state), - // Card Open/Close messages for GUI pop-up-card window M::CardOpen => { state.card_open = true; Task::none() @@ -140,7 +139,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::TcpAddrFieldUpdate(addr) => { - // Update the TCP address text input in the GUI let is_chars_valid = addr.chars().all(|c| c.is_numeric() || c == '.' || c == ':'); let dot_count = addr.chars().filter(|&c| c == '.').count(); let colon_count = addr.chars().filter(|&c| c == ':').count(); @@ -155,116 +153,71 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { state.last_send_timestamp = Some(format!("{}", duration.as_secs())); Task::none() } - _ => Task::none(), } } -// Creates a default XML file when a button is clicked in the GUI fn create_xml_file(state: &mut State) -> Task { - // Get the user's downloads directory - let mut downloads_path = - dirs::download_dir().expect("Retrieving the user's Downloads file directory."); + let mut downloads_path = match dirs::download_dir() { + Some(p) => p, + None => { + let msg = "Could not determine Downloads directory".to_string(); + state.error_message = Some(msg.clone()); + state.log_event(msg); + return Task::none(); + } + }; downloads_path.push("iMotions.xml"); - // Create file in downloads directory. If alr there, will overwrite the existing file. - let mut file = File::create(&downloads_path).expect("Creating XML File."); - - // Check if all dataref toggles are false. If so, return error message - if !state.altitude_toggle - && !state.airspeed_toggle - && !state.vertical_airspeed_toggle - && !state.heading_toggle - { + // Validate toggles + if !state.altitude_toggle && !state.airspeed_toggle && !state.vertical_airspeed_toggle && !state.heading_toggle { state.error_message = Some("Please select at least one dataref toggle".into()); return Task::none(); } - state.error_message = None; // Clear previous error + state.error_message = None; - // NOTE: This XML formatting was found in the PilotDataSync Slack. Double check this is the correct formatting. - let mut contents = String::from( - "\n", - ); + let mut contents = String::from("\n"); if state.altitude_toggle { - let mut altitude_str = - String::from("\t\n"); - - altitude_str.push_str( - "\t\t\n", - ); - altitude_str.push_str( - "\t\t\n", - ); - altitude_str.push_str("\t\n"); - - contents.push_str(&altitude_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.airspeed_toggle { - let mut airspeed_str = - String::from("\t\n"); - - airspeed_str.push_str( - "\t\t\n", - ); - airspeed_str.push_str( - "\t\t\n", - ); - airspeed_str.push_str("\t\n"); - - contents.push_str(&airspeed_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.vertical_airspeed_toggle { - let mut vertical_airspeed_str = String::from( - "\t\n", - ); - - vertical_airspeed_str.push_str("\t\t\n"); - vertical_airspeed_str.push_str("\t\t\n"); - vertical_airspeed_str.push_str("\t\n"); - - contents.push_str(&vertical_airspeed_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.heading_toggle { - let mut heading_str = - String::from("\t\n"); - - heading_str.push_str( - "\t\t\n", - ); - heading_str.push_str( - "\t\t\n", - ); - heading_str.push_str("\t\n"); - - contents.push_str(&heading_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } contents.push_str(""); - // Write XML file - file.write_all(contents.as_bytes()) - .expect("Writing to XML file"); - - Task::none() // Return type that we need for the Update logic -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::State; - use crate::Message; - - #[test] - fn send_packet_updates_timestamp() { - let mut state = State::default(); - - assert!(state.last_send_timestamp.is_none()); - - // Should simulate sending a packet - let _ = super::update(&mut state, Message::SendPacket); - - // After update, timestamp should be set - assert!(state.last_send_timestamp.is_some()); - let ts = state.last_send_timestamp.as_ref().unwrap(); - - assert!(ts.chars().all(|c| c.is_ascii_digit())); + match File::create(&downloads_path) { + Ok(mut file) => { + if let Err(e) = file.write_all(contents.as_bytes()) { + let msg = format!("Writing XML file failed: {}", e); + state.error_message = Some(msg.clone()); + state.log_event(msg); + } else { + state.log_event(format!("XML file written to {}", downloads_path.display())); + } + } + Err(e) => { + let msg = format!("Creating XML file failed: {}", e); + state.error_message = Some(msg.clone()); + state.log_event(msg); + } } + + Task::none() } diff --git a/relay/src/view.rs b/relay/src/view.rs index cfdf081..9a77381 100644 --- a/relay/src/view.rs +++ b/relay/src/view.rs @@ -1,87 +1,69 @@ -//! This module defines the UI View layout using ICED. +//! UI view definitions using iced. //! -//! UI elements should be defined as **separate functions** and added to the UI elements vector -//! instead of being written inline in the `view` function. -//! -//! This improves modularity, testing, and code readability. -//! -//! Examples: -//! ```fn spawn_error_message(state: &State) -> Option { /* ... */ } ``` -//! ``` if let Some(error_element) = spawn_error_message(state) { -//! elements.push(error_element.into()); -//! } ``` -//! -//! ``` fn ipc_disconnect_button(_state: &State) -> UIElement {/* ... */} ``` -//! ``` elements.push(tcp_disconnect_button(state)); ``` +//! Each UI element is produced by a small focused function to improve readability, +//! testability and maintainability. The `view` function composes those elements. use std::net::ToSocketAddrs; -use iced::{ - widget::{button, column, container, row, text, text_input, toggler}, - Element, Length, -}; +use iced::widget::{button, column, container, row, text, text_input, toggler}; +use iced::{Element, Length}; use iced_aw::{helpers::card, style}; use crate::{Message, State}; type UIElement<'a> = Element<'a, Message>; -// TODO: Fix the close button on the UI card. It displays a chinese character meaning "plowed earth"????? +const DEFAULT_TCP_PLACEHOLDER: &str = "127.0.0.1:9999"; + +/// Compose the UI by collecting small, single-responsibility elements. pub(crate) fn view(state: &State) -> UIElement { let mut elements: Vec = Vec::new(); - // OPTIONAL Error message - if let Some(error_element) = spawn_error_message(state) { - elements.push(error_element.into()); + // Optional error banner + if let Some(err) = spawn_error_message(state) { + elements.push(err); } - // Elapsed Time Text + // Informational text elements.push(elapsed_time_element(state)); - - // Baton Latest Send Text elements.push(baton_data_element(state)); elements.push(baton_connect_status_element(state)); - // Send Packet button (Only if baton is running) - Jacob - if let Some(send_btn) = send_packet_button(state) { - elements.push(send_btn); + // Action buttons + if let Some(btn) = send_packet_button(state) { + elements.push(btn); } - // Added this for tcp counter - Nyla Hughes - elements.push(metrics_block(state)); - - // TCP Connection Status elements + // TCP controls and status elements.push(tcp_connect_status_element(state)); - elements.push(check_tcp_status_button(state)); + elements.push(check_tcp_status_button()); - // IPC Connect/Disconnect Buttons - elements.push(ipc_connect_button(state)); - elements.push(ipc_disconnect_button(state)); + // IPC controls + elements.push(ipc_connect_button()); + elements.push(ipc_disconnect_button()); - // TCP Connect/Disconnect buttons + // TCP connect/disconnect row elements.push(tcp_connect_button(state)); - elements.push(tcp_disconnect_button(state)); + elements.push(tcp_disconnect_button()); - // XML popup - elements.push(xml_downloader_popup(state)); + // XML download / card + elements.push(xml_download_popup(state)); - // Create and return the GUI column from that vector column(elements).into() } fn spawn_error_message(state: &State) -> Option { - if let Some(error) = &state.error_message { - Some( - container(text(format!("⚠️ {}", error))) + state + .error_message + .as_ref() + .map(|err| { + container(text(format!("⚠️ {}", err))) .padding(10) .width(Length::Fill) .style(container::rounded_box) .center_x(Length::Fill) - .into(), - ) - } else { - None - } + .into() + }) } fn elapsed_time_element(state: &State) -> UIElement { @@ -98,114 +80,96 @@ fn baton_connect_status_element(state: &State) -> UIElement { } fn baton_data_element(state: &State) -> UIElement { - // need to update view function with float parsing? perhaps? idk - let baton_data = match &state.latest_baton_send { - //added this for tcp counter - Nyla Hughes - Some(data) => format!("[BATON]: {data}"), - // + let content = match &state.latest_baton_send { + Some(data) => format!("[BATON]: {}", data), None => "No data from baton.".into(), }; - text(baton_data).into() -} - -// Added this for tcp counter - Nyla Hughes -fn metrics_block(state: &State) -> UIElement { - if !state.show_metrics { - return text("").into(); - } - - let packets_60 = state.packets_last_60s; - let bps_str = human_bps(state.bps); - - column![ - text(format!("Packets sent in the last 60's: {packets_60}")), - text(format!("Throughput: {bps_str}")), - ] - .into() -} -fn human_bps(bps: f64) -> String { - const K: f64 = 1_000.0; - if bps < K { - return format!("{:.0} bps", bps); - } - let kbps = bps / K; - if kbps < K { - return format!("{:.1} Kbps", kbps); - } - let mbps = kbps / K; - if mbps < K { - return format!("{:.2} Mbps", mbps); - } - let gbps = mbps / K; - format!("{:.2} Gbps", gbps) + text(content).into() } fn tcp_connect_status_element(state: &State) -> UIElement { text(format!("TCP Connection Status: {}", state.tcp_connected)).into() } -fn check_tcp_status_button(_state: &State) -> UIElement { +/// Simple helper: a button which triggers the app to verify the TCP connection state. +fn check_tcp_status_button() -> UIElement<'static> { button("Check TCP Connection Status") .on_press(Message::ConnectionMessage) .into() } -fn ipc_connect_button(_state: &State) -> UIElement { +/// IPC connect / disconnect buttons +fn ipc_connect_button() -> UIElement<'static> { button("Connect IPC").on_press(Message::ConnectIpc).into() } -fn ipc_disconnect_button(_state: &State) -> UIElement { +fn ipc_disconnect_button() -> UIElement<'static> { button("Disconnect IPC") .on_press(Message::DisconnectIpc) .into() } -fn tcp_connect_button(state: &State) -> UIElement { - if state.tcp_addr_field.to_socket_addrs().is_ok() { - row![ - button("Connect TCP").on_press(Message::ConnectTcp), - text_input("127.0.0.1:9999", &state.tcp_addr_field) - .on_input(|addr| Message::TcpAddrFieldUpdate(addr)), - text("Address input is valid") - ] - .spacing(5) +/// Build the TCP address input widget wired to `Message::TcpAddrFieldUpdate`. +fn tcp_addr_input(state: &State) -> UIElement { + text_input(DEFAULT_TCP_PLACEHOLDER, &state.tcp_addr_field) + .on_input(|addr| Message::TcpAddrFieldUpdate(addr)) .into() +} + +/// Return true when the current TCP address input parses to socket addresses. +fn tcp_addr_valid(state: &State) -> bool { + state.tcp_addr_field.to_socket_addrs().is_ok() +} + +/// TCP connect row: button, address input, and validation hint. +fn tcp_connect_button(state: &State) -> UIElement { + let valid = tcp_addr_valid(state); + let hint = if valid { + "Address input is valid" } else { - row![ - button("Connect TCP"), - text_input("127.0.0.1:9999", &state.tcp_addr_field) - .on_input(|addr| Message::TcpAddrFieldUpdate(addr)), - text("Address input is invalid") - ] - .spacing(5) - .into() - } + "Address input is invalid" + }; + + let connect_btn = if valid { + button("Connect TCP").on_press(Message::ConnectTcp) + } else { + // keep the button, but disable the event when invalid by not wiring on_press + button("Connect TCP") + }; + + row![ + connect_btn, + tcp_addr_input(state), + text(hint) + ] + .spacing(5) + .into() } -fn tcp_disconnect_button(_state: &State) -> UIElement { +fn tcp_disconnect_button() -> UIElement<'static> { button("Disconnect TCP") .on_press(Message::DisconnectTcp) .into() } -fn xml_downloader_popup(state: &State) -> UIElement { +/// XML download card or opener button depending on `state.card_open` +fn xml_download_popup(state: &State) -> UIElement { if state.card_open { container( card( - // FIXME: reword these toggles to actually be snappy wording - text(format!("Download the XML File!")), + text("Download the XML File!"), column![ toggler(state.altitude_toggle) - .label("Altitude Toggle!") + .label("Altitude") .on_toggle(Message::AltitudeToggle), toggler(state.airspeed_toggle) - .label("Airspeed Toggle") + .label("Airspeed") .on_toggle(Message::AirspeedToggle), toggler(state.vertical_airspeed_toggle) - .label("Vertical Airspeed Toggle") + .label("Vertical Airspeed") .on_toggle(Message::VerticalAirspeedToggle), toggler(state.heading_toggle) - .label("Heading Toggle") + .label("Heading") .on_toggle(Message::HeadingToggle), button("Generate XML File").on_press(Message::CreateXMLFile), ], @@ -221,18 +185,27 @@ fn xml_downloader_popup(state: &State) -> UIElement { } } - +/// Send packet button: enabled variant wires the message, disabled variant is inert. fn send_packet_button(state: &State) -> Option { if state.active_baton_connection { - Some( - button("Send Packet") - .on_press(Message::SendPacket) - .into(), - ) + Some(button("Send Packet").on_press(Message::SendPacket).into()) } else { - Some( - button("Send Packet (No Baton Connection)") - .into(), - ) + Some(button("Send Packet (No Baton Connection)").into()) + } +} + +/// Format bytes-per-second into a human-friendly string. +fn human_bps(bps: f64) -> String { + if bps <= 0.0 { + return "0 B/s".into(); + } + if bps < 1024.0 { + return format!("{:.0} B/s", bps); + } + let kb = bps / 1024.0; + if kb < 1024.0 { + return format!("{:.1} KB/s", kb); } + let mb = kb / 1024.0; + format!("{:.2} MB/s", mb) }