From f15dfd2f15048a15a42109d90bb24fb19e9a1c19 Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Tue, 2 Dec 2025 14:22:16 -0600 Subject: [PATCH 1/6] Changes made to the relay to facilitate sending packets to imotions --- relay/src/message.rs | 12 +- relay/src/state.rs | 311 ++++++++++++++++++++----------------------- relay/src/update.rs | 74 +++++----- relay/src/view.rs | 55 +++----- 4 files changed, 215 insertions(+), 237 deletions(-) diff --git a/relay/src/message.rs b/relay/src/message.rs index 3dbe99d..ef233f0 100644 --- a/relay/src/message.rs +++ b/relay/src/message.rs @@ -46,9 +46,17 @@ pub(crate) enum FromIpcThreadMessage { } pub(crate) enum ToTcpThreadMessage { + // outgoing payload to TCP thread Send(String), } -//added this for tcp counter - Nyla Hughes + pub(crate) enum FromTcpThreadMessage { - Sent { bytes: usize, at: Instant }, + /// TCP thread successfully connected to remote + Connected, + /// TCP thread disconnected (may include reason) + Disconnected(String), + /// A packet was sent to remote (In bytes) + Sent(usize), + /// Error reported + SendError(String), } diff --git a/relay/src/state.rs b/relay/src/state.rs index a256606..8ae207c 100644 --- a/relay/src/state.rs +++ b/relay/src/state.rs @@ -4,34 +4,23 @@ use std::io::BufRead; use std::io::BufReader; use std::io::Write; use std::net::TcpStream; -use std::thread::JoinHandle; - +use std::thread::{JoinHandle, spawn}; use iced::time::Duration; use crate::bichannel; use crate::bichannel::ParentBiChannel; - use crate::message::FromIpcThreadMessage; use crate::message::FromTcpThreadMessage; use crate::message::ToIpcThreadMessage; use crate::message::ToTcpThreadMessage; -//Code added for tcp packet count -Nyla Hughes -use std::collections::VecDeque; -use std::time::Instant; -// - - -// use crate::ChannelMessage; - use interprocess::local_socket::{traits::Listener, GenericNamespaced, ListenerOptions, ToNsName}; -use std::collections::VecDeque; -use std::time::Duration as StdDuration; +use std::time::{Duration as StdDuration, Instant}; +// --- State definition and Default impl (replace existing block) --- #[allow(unused)] pub(crate) struct State { pub elapsed_time: Duration, - pub event_log: Vec, pub ipc_thread_handle: Option>>, @@ -41,7 +30,14 @@ pub(crate) struct State { pub tcp_addr_field: String, pub latest_baton_send: Option, pub active_baton_connection: bool, - // pub recv: Option>, + + // timestamp of last received baton packet (used by update logic) + pub last_baton_instant: Option, + + // simple metrics/UI helpers + pub show_metrics: bool, + pub packets_last_60s: usize, + pub bps: f64, // Optional GUI error message pub error_message: Option, @@ -57,15 +53,6 @@ pub(crate) struct State { pub tcp_bichannel: Option>, pub last_send_timestamp: Option, - - // Added this for the tcp packet counter -Nyla Hughes - pub sent_packet_times: VecDeque, - pub sent_samples: VecDeque<(Instant, usize)>, - pub packets_last_60s: usize, - pub bps: f64, - pub show_metrics: bool, - // - } impl Default for State { @@ -81,6 +68,11 @@ impl Default for State { tcp_addr_field: String::new(), latest_baton_send: None, active_baton_connection: false, + last_baton_instant: None, + + show_metrics: false, + packets_last_60s: 0, + bps: 0.0, error_message: None, card_open: false, @@ -93,29 +85,95 @@ impl Default for State { tcp_bichannel: None, last_send_timestamp: None, - - // Added this for the tcp packet counter -Nyla Hughes - sent_packet_times: VecDeque::new(), - sent_samples: VecDeque::new(), - packets_last_60s: 0, - bps: 0.0, - show_metrics: false, } } } +// --- helper functions ------------------------------------------------------- + +fn sanitize_field(s: &str) -> String { + // remove CR/LF and replace any internal semicolons with commas, + // trim whitespace + s.replace('\r', "") + .replace('\n', "") + .replace(';', ",") + .trim() + .to_string() +} + +fn normalize_baton_payload(raw: &str) -> Vec { + // trim whitespace, remove surrounding CR/LF + let mut s = raw.trim().replace('\r', "").replace('\n', ""); + // remove leading semicolons that create empty first fields + while s.starts_with(';') { + s.remove(0); + } + // also remove trailing semicolons (avoid empty trailing field) + while s.ends_with(';') { + s.pop(); + } + // split on semicolon and sanitize each field + s.split(';') + .map(|f| sanitize_field(f)) + .filter(|f| !f.is_empty()) + .collect() +} + +fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { + // Header used in previous code: "E;1;PilotDataSync;;;;;{Event};{fields...}\r\n" + let mut packet = String::from("E;1;PilotDataSync;;;;;"); + packet.push_str(event_name); + if !fields.is_empty() { + packet.push(';'); + packet.push_str(&fields.join(";")); + } + packet.push_str("\r\n"); + packet +} + +// Add this helper near your other helpers +fn send_packet_and_debug(stream: &mut std::net::TcpStream, packet: &str) -> Result<()> { + // Print readable and hex views for debugging + eprintln!("TX packet (len={}): {:?}", packet.len(), packet); + let hex: String = packet.as_bytes().iter().map(|b| format!("{:02X} ", b)).collect(); + eprintln!("TX hex: {}", hex.trim_end()); + + // Write then flush -- report any error + stream.write_all(packet.as_bytes()) + .map_err(|e| anyhow::anyhow!("write_all failed: {}", e))?; + stream.flush() + .map_err(|e| anyhow::anyhow!("flush failed: {}", e))?; + Ok(()) +} + +// --- State impl ------------------------------------------------------------- + impl State { + // Simple metric helpers used by update/view code that expect them. + pub fn refresh_metrics_now(&mut self) { + // placeholder: in future compute accurate rates from history + // Here we keep current values; could implement sliding window later. + if self.packets_last_60s > 0 { + // naive decay to avoid stale large counts (noop for now) + self.packets_last_60s = self.packets_last_60s.saturating_sub(0); + } + } + + pub fn on_tcp_packet_sent(&mut self, bytes: usize) { + // Update simple counters and log + self.packets_last_60s = self.packets_last_60s.saturating_add(1); + self.bps = bytes as f64; + self.log_event(format!("Sent packet ({} bytes)", bytes)); + } + pub fn ipc_connect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_some() { bail!("IPC thread already exists.") } - // TODO let (ipc_bichannel, mut child_bichannel) = bichannel::create_bichannels::(); - let ipc_thread_handle = std::thread::spawn(move || { - // sample pulled directly from `interprocess` documentation - + let ipc_thread_handle = spawn(move || { let printname = "baton.sock"; let name = printname.to_ns_name::().unwrap(); @@ -124,7 +182,7 @@ impl State { let listener = match opts.create_sync() { Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { eprintln!( - "Error: could not start server because the socket file is occupied. Please check if + "Error: could not start server because the socket file is occupied. Please check if {printname} is in use by another process and try again." ); return Ok(()); @@ -157,18 +215,12 @@ impl State { }; let mut conn = BufReader::new(conn); - // mark connected - let _ = child_bichannel.set_is_conn_to_endpoint(true); + child_bichannel.set_is_conn_to_endpoint(true)?; - // read initial greeting/handshake if any match conn.read_line(&mut buffer) { Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // non-blocking, continue to main loop - } - Err(e) => { - eprintln!("Initial read error: {e}"); - } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + _ => panic!(), } let write_res = conn @@ -177,10 +229,8 @@ impl State { match write_res { Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (), - Err(e) => { - eprintln!("Initial write error: {e}"); - } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + _ => panic!(), } print!("Client answered: {buffer}"); @@ -196,26 +246,18 @@ impl State { // read from connection input match conn.read_line(&mut buffer) { Ok(s) if s == 0 || buffer.len() == 0 => { - // EOF / remote closed the connection: - // notify parent and mark disconnected, then break to accept next connection. - let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); buffer.clear(); - break; + continue; } Ok(_s) => { - let _ = buffer.pop(); // remove trailing newline (if present) - println!("Got: {buffer}"); + let _ = buffer.pop(); // remove trailing newline + println!("Got: {buffer} ({_s} bytes read)"); - // baton shutdown message received. Send shutdown message and break to next connection if buffer.starts_with("SHUTDOWN") { let _ = child_bichannel .send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - buffer.clear(); - break; // break inner loop, go back to accept() + return Ok(()); } else { - // actual baton data received let _ = child_bichannel.send_to_parent( FromIpcThreadMessage::BatonData(buffer.clone()), ); @@ -223,28 +265,10 @@ impl State { buffer.clear(); } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // nothing to read, avoid busy-loop - std::thread::sleep(std::time::Duration::from_millis(1)); - continue; - } - Err(e) => { - eprintln!("Got err {e}"); - // on unexpected read error, mark disconnected and break - let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - break; - } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("Got err {e}"), } } - - // ensure connected flag cleared when client loop exits - let _ = child_bichannel.set_is_conn_to_endpoint(false); - - // continue listening for new connections unless killswitch engaged - if child_bichannel.is_killswitch_engaged() { - return Ok(()); - } } Ok(()) @@ -292,67 +316,65 @@ impl State { bail!("TCP thread already exists.") } - // create bichannel for TCP thread let (tcp_bichannel, mut child_bichannel) = bichannel::create_bichannels(); self.tcp_bichannel = Some(tcp_bichannel); - // The TCP thread will keep trying to connect until the killswitch is engaged. - // It will buffer outgoing messages when disconnected and will attempt to flush them on reconnect. - let tcp_thread_handle = std::thread::spawn(move || { - let mut backoff_ms = 500u64; - let mut send_buffer: VecDeque = VecDeque::new(); + let tcp_thread_handle = spawn(move || { + let mut stream = match TcpStream::connect(address) { + Ok(stream) => { + println!("Successfully connected."); + let _ = child_bichannel.set_is_conn_to_endpoint(true); + stream + } - loop { - if child_bichannel.is_killswitch_engaged() { - // shutdown requested - return Ok(()); + Err(e) => { + println!("Connection failed: {}", e); + bail!("Failed to connect to TCP"); } + }; while !child_bichannel.is_killswitch_engaged() { - // check messages from main thread for message in child_bichannel.received_messages() { match message { ToTcpThreadMessage::Send(data) => { - // added this for tcp packet count -Nyla Hughes - let packet = format!("E;1;PilotDataSync;;;;;AltitudeSync;{data}\r\n"); - match stream.write_all(packet.as_bytes()) { - Ok(()) => { - let _ = child_bichannel.send_to_parent( - FromTcpThreadMessage::Sent { - bytes: packet.len(), - at: Instant::now(), - }, - ); - } - Err(e) => { - eprintln!("TCP send failed: {e}"); - } + // Normalize baton payload + let fields = normalize_baton_payload(&data); + + // --- STRICT MAPPING FOR iMOTIONS EVENT --- + // Example: AltitudeSync expects exactly 2 fields (adjust if needed). + const EXPECTED_FIELDS: usize = 2; + if fields.len() < EXPECTED_FIELDS { + eprintln!( + "Dropping packet: AltitudeSync expects {} fields but baton sent {}: {:?}", + EXPECTED_FIELDS, fields.len(), fields + ); + continue; } - } - // if killswitch engaged, break outer loop and exit - if child_bichannel.is_killswitch_engaged() { - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Ok(()); - } + // Pick exactly the fields iMotions expects (guarding indexes) + let payload = vec![ + fields.get(0).unwrap().clone(), // primary altitude + fields.get(1).unwrap().clone(), // secondary altitude (or other) + ]; - // otherwise we fell out of connected loop due to error - try reconnect - let _ = child_bichannel.set_is_conn_to_endpoint(false); - } - Err(e) => { - // failed to connect - report and backoff, unless killswitch engaged - let reason = format!("Connect failed: {}", e); - let _ = child_bichannel.send_to_parent(FromTcpThreadMessage::Disconnected(reason)); - if child_bichannel.is_killswitch_engaged() { - return Ok(()); + let packet = build_imotions_packet("AltitudeSync", &payload); + eprintln!("Sending to iMotions: {:?}", packet); + + if let Err(e) = send_packet_and_debug(&mut stream, &packet) { + eprintln!("Failed to send packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } } - std::thread::sleep(StdDuration::from_millis(backoff_ms)); - // exponential backoff up to 30s - backoff_ms = std::cmp::min(backoff_ms.saturating_mul(2), 30_000); - continue; } } + + std::thread::sleep(std::time::Duration::from_millis(1)); } + + Ok(()) }); self.tcp_thread_handle = Some(tcp_thread_handle); @@ -394,43 +416,4 @@ impl State { pub fn log_event(&mut self, event: String) { self.event_log.push(event); } - - // Added this for tcp packet count -Nyla Hughes - pub fn on_tcp_packet_sent(&mut self, bytes: usize) { - let now = Instant::now(); - self.sent_packet_times.push_back(now); - self.sent_samples.push_back((now, bytes)); - self.refresh_metrics(now); - } - - pub fn refresh_metrics_now(&mut self) { - let now = Instant::now(); - self.refresh_metrics(now); - } - - fn refresh_metrics(&mut self, now: Instant) { - // last 60 seconds -> packet count - let window60 = std::time::Duration::from_secs(60); - while let Some(&t) = self.sent_packet_times.front() { - if now.duration_since(t) > window60 { - self.sent_packet_times.pop_front(); - } else { - break; - } - } - self.packets_last_60s = self.sent_packet_times.len(); - - // last 1 second -> throughput - let window1 = std::time::Duration::from_secs(1); - while let Some(&(t, _)) = self.sent_samples.front() { - if now.duration_since(t) > window1 { - self.sent_samples.pop_front(); - } else { - break; - } - } - let bytes_last_1s: usize = self.sent_samples.iter().map(|&(_, b)| b).sum(); - self.bps = (bytes_last_1s as f64) * 8.0; - self.show_metrics = self.packets_last_60s > 0 || self.bps >= 1.0; - } } \ No newline at end of file diff --git a/relay/src/update.rs b/relay/src/update.rs index 55082be..1f22da1 100644 --- a/relay/src/update.rs +++ b/relay/src/update.rs @@ -1,30 +1,51 @@ use iced::{time::Duration, Task}; use std::fs::File; use std::io::prelude::*; +use std::time::{Instant, Duration as StdDuration}; -//added this for tcp counter - Nyla Hughes -use crate::message::{Message, ToTcpThreadMessage, FromIpcThreadMessage, FromTcpThreadMessage}; -use crate::State; +use crate::{message::ToTcpThreadMessage, message::FromTcpThreadMessage, FromIpcThreadMessage, Message, State}; pub(crate) fn update(state: &mut State, message: Message) -> Task { use Message as M; + // threshold window considered "recent" (avoid short false-negatives) + const LAST_SEEN_WINDOW: StdDuration = StdDuration::from_secs(2); + #[allow(unreachable_patterns)] match message { M::Update => { state.elapsed_time += Duration::from_millis(10); - // added this for tcp counter - Nyla Hughes - state.refresh_metrics_now(); + // 1) compute active baton connection: true if IPC reports connected OR we saw a packet recently + let ipc_conn_flag = state + .ipc_bichannel + .as_ref() + .and_then(|b| b.is_conn_to_endpoint().ok()) + .unwrap_or(false); + + let recent_packet = state + .last_baton_instant + .map(|t| t.elapsed() <= LAST_SEEN_WINDOW) + .unwrap_or(false); + state.active_baton_connection = ipc_conn_flag || recent_packet; + + // check for messages from IPC thread if let Some(ipc_bichannel) = &state.ipc_bichannel { for message in ipc_bichannel.received_messages() { match message { FromIpcThreadMessage::BatonData(data) => { + // forward to TCP thread (clone for the outgoing buffer) state.tcp_bichannel.as_mut().map(|tcp_bichannel| { tcp_bichannel.send_to_child(ToTcpThreadMessage::Send(data.clone())) }); + + // log each individual baton packet so UI shows it + state.log_event(format!("Baton packet: {data}")); + + // store latest data and mark active connection + timestamp state.latest_baton_send = Some(data); + state.last_baton_instant = Some(Instant::now()); state.active_baton_connection = true; } FromIpcThreadMessage::BatonShutdown => { @@ -35,14 +56,25 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { } } } + + // check for messages from TCP thread if let Some(tcp_bichannel) = &state.tcp_bichannel { for message in tcp_bichannel.received_messages() { match message { - //added this for tcp counter - Nyla Hughes - FromTcpThreadMessage::Sent { bytes, .. } => { - state.on_tcp_packet_sent(bytes); + FromTcpThreadMessage::Connected => { + state.log_event("TCP connected to iMotions".into()); + state.tcp_connected = true; + } + FromTcpThreadMessage::Disconnected(reason) => { + state.log_event(format!("TCP disconnected: {reason}")); + state.tcp_connected = false; + } + FromTcpThreadMessage::Sent(bytes) => { + state.on_tcp_packet_sent(bytes); + } + FromTcpThreadMessage::SendError(err) => { + state.log_event(format!("TCP send error: {err}")); } - _ => (), } } } @@ -84,7 +116,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { } else { state.tcp_connected = false } - Task::none() } M::ConnectIpc => { @@ -245,26 +276,3 @@ fn create_xml_file(state: &mut State) -> Task { Task::none() // Return type that we need for the Update logic } - -#[cfg(test)] -mod tests { - use super::*; - use crate::State; - use crate::Message; - - #[test] - fn send_packet_updates_timestamp() { - let mut state = State::default(); - - assert!(state.last_send_timestamp.is_none()); - - // Should simulate sending a packet - let _ = super::update(&mut state, Message::SendPacket); - - // After update, timestamp should be set - assert!(state.last_send_timestamp.is_some()); - let ts = state.last_send_timestamp.as_ref().unwrap(); - - assert!(ts.chars().all(|c| c.is_ascii_digit())); - } -} diff --git a/relay/src/view.rs b/relay/src/view.rs index cfdf081..3f8974d 100644 --- a/relay/src/view.rs +++ b/relay/src/view.rs @@ -47,9 +47,6 @@ pub(crate) fn view(state: &State) -> UIElement { elements.push(send_btn); } - // Added this for tcp counter - Nyla Hughes - elements.push(metrics_block(state)); - // TCP Connection Status elements elements.push(tcp_connect_status_element(state)); elements.push(check_tcp_status_button(state)); @@ -100,46 +97,12 @@ fn baton_connect_status_element(state: &State) -> UIElement { fn baton_data_element(state: &State) -> UIElement { // need to update view function with float parsing? perhaps? idk let baton_data = match &state.latest_baton_send { - //added this for tcp counter - Nyla Hughes - Some(data) => format!("[BATON]: {data}"), - // + Some(data) => format!("[BATON]: {}", data), None => "No data from baton.".into(), }; text(baton_data).into() } -// Added this for tcp counter - Nyla Hughes -fn metrics_block(state: &State) -> UIElement { - if !state.show_metrics { - return text("").into(); - } - - let packets_60 = state.packets_last_60s; - let bps_str = human_bps(state.bps); - - column![ - text(format!("Packets sent in the last 60's: {packets_60}")), - text(format!("Throughput: {bps_str}")), - ] - .into() -} -fn human_bps(bps: f64) -> String { - const K: f64 = 1_000.0; - if bps < K { - return format!("{:.0} bps", bps); - } - let kbps = bps / K; - if kbps < K { - return format!("{:.1} Kbps", kbps); - } - let mbps = kbps / K; - if mbps < K { - return format!("{:.2} Mbps", mbps); - } - let gbps = mbps / K; - format!("{:.2} Gbps", gbps) -} - fn tcp_connect_status_element(state: &State) -> UIElement { text(format!("TCP Connection Status: {}", state.tcp_connected)).into() } @@ -236,3 +199,19 @@ fn send_packet_button(state: &State) -> Option { ) } } + +// small helper to format bps +fn human_bps(bps: f64) -> String { + if bps <= 0.0 { + return "0 B/s".into(); + } + if bps < 1024.0 { + return format!("{:.0} B/s", bps); + } + let kb = bps / 1024.0; + if kb < 1024.0 { + return format!("{:.1} KB/s", kb); + } + let mb = kb / 1024.0; + format!("{:.2} MB/s", mb) +} From da4c63d8d0ddf53481b9741bfd562c390e959567 Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Tue, 2 Dec 2025 14:59:53 -0600 Subject: [PATCH 2/6] pushing current code --- relay/src/state.rs | 97 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/relay/src/state.rs b/relay/src/state.rs index 8ae207c..a8e3063 100644 --- a/relay/src/state.rs +++ b/relay/src/state.rs @@ -181,10 +181,10 @@ impl State { let listener = match opts.create_sync() { Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { - eprintln!( + eprintln![ "Error: could not start server because the socket file is occupied. Please check if {printname} is in use by another process and try again." - ); + ]; return Ok(()); } x => x.unwrap(), @@ -340,33 +340,96 @@ impl State { // Normalize baton payload let fields = normalize_baton_payload(&data); - // --- STRICT MAPPING FOR iMOTIONS EVENT --- - // Example: AltitudeSync expects exactly 2 fields (adjust if needed). - const EXPECTED_FIELDS: usize = 2; - if fields.len() < EXPECTED_FIELDS { + // --- STRICT MAPPING FOR iMOTIONS EVENTS --- + // The relay will attempt to send as many iMotions events + // as the incoming baton payload supports. Expected ordering: + // [Alt_FM, Alt_Pilot, Airspeed_FM, Airspeed_Pilot, + // Vertical_FM, Vertical_Pilot, Heading_FM, Heading_Pilot] + // + // For each pair present, send the corresponding iMotions packet. + if fields.len() < 2 { eprintln!( - "Dropping packet: AltitudeSync expects {} fields but baton sent {}: {:?}", - EXPECTED_FIELDS, fields.len(), fields + "Dropping packet: not enough fields (need >=2) but baton sent {}: {:?}", + fields.len(), fields ); continue; } - // Pick exactly the fields iMotions expects (guarding indexes) - let payload = vec![ - fields.get(0).unwrap().clone(), // primary altitude - fields.get(1).unwrap().clone(), // secondary altitude (or other) + // Send AltitudeSync if we have at least 2 fields + let altitude_payload = vec![ + fields.get(0).unwrap().clone(), + fields.get(1).unwrap().clone(), ]; + let altitude_packet = build_imotions_packet("AltitudeSync", &altitude_payload); + eprintln!("Sending to iMotions: {:?}", altitude_packet); - let packet = build_imotions_packet("AltitudeSync", &payload); - eprintln!("Sending to iMotions: {:?}", packet); - - if let Err(e) = send_packet_and_debug(&mut stream, &packet) { - eprintln!("Failed to send packet: {}", e); + if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { + eprintln!("Failed to send Altitude packet: {}", e); let _ = child_bichannel.set_is_conn_to_endpoint(false); return Err(e); } else { let _ = child_bichannel.set_is_conn_to_endpoint(true); } + + // Send AirspeedSync if we have at least 4 fields + if fields.len() >= 4 { + let airspeed_payload = vec![ + fields.get(2).unwrap().clone(), + fields.get(3).unwrap().clone(), + ]; + let airspeed_packet = build_imotions_packet("AirspeedSync", &airspeed_payload); + eprintln!("Sending to iMotions: {:?}", airspeed_packet); + + if let Err(e) = send_packet_and_debug(&mut stream, &airspeed_packet) { + eprintln!("Failed to send Airspeed packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } else { + eprintln!("Airspeed packet skipped: need >=4 fields, have {}", fields.len()); + } + + // Send VerticalVelocitySync if we have at least 4 fields + if fields.len() >= 6 { + let vv_payload = vec![ + fields.get(4).unwrap().clone(), + fields.get(5).unwrap().clone(), + ]; + let vv_packet = build_imotions_packet("VerticalAirspeedSync", &vv_payload); + eprintln!("Sending to iMotions: {:?}", vv_packet); + + if let Err(e) = send_packet_and_debug(&mut stream, &vv_packet) { + eprintln!("Failed to send Vertical Velocity packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } else { + eprintln!("VerticalVelocity packet skipped: need >=6 fields, have {}", fields.len()); + } + + // Send HeadingSync if we have at least 8 fields + if fields.len() >= 8 { + let heading_payload = vec![ + fields.get(6).unwrap().clone(), + fields.get(7).unwrap().clone(), + ]; + let heading_packet = build_imotions_packet("HeadingSync", &heading_payload); + eprintln!("Sending to iMotions: {:?}", heading_packet); + + if let Err(e) = send_packet_and_debug(&mut stream, &heading_packet) { + eprintln!("Failed to send Heading packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + } else { + eprintln!("Heading packet skipped: need >=8 fields, have {}", fields.len()); + } } } } From 62dd75b37cd4f405c151c6a75fdb3200570a9f3b Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Tue, 2 Dec 2025 15:27:33 -0600 Subject: [PATCH 3/6] working relay --- relay/src/state.rs | 171 ++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 95 deletions(-) diff --git a/relay/src/state.rs b/relay/src/state.rs index a8e3063..3f2488a 100644 --- a/relay/src/state.rs +++ b/relay/src/state.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::Result; use anyhow::{anyhow, bail}; use std::io::BufRead; use std::io::BufReader; @@ -6,39 +6,31 @@ use std::io::Write; use std::net::TcpStream; use std::thread::{JoinHandle, spawn}; use iced::time::Duration; - use crate::bichannel; use crate::bichannel::ParentBiChannel; use crate::message::FromIpcThreadMessage; use crate::message::FromTcpThreadMessage; use crate::message::ToIpcThreadMessage; use crate::message::ToTcpThreadMessage; - use interprocess::local_socket::{traits::Listener, GenericNamespaced, ListenerOptions, ToNsName}; use std::time::{Duration as StdDuration, Instant}; - // --- State definition and Default impl (replace existing block) --- #[allow(unused)] pub(crate) struct State { pub elapsed_time: Duration, pub event_log: Vec, - pub ipc_thread_handle: Option>>, pub tcp_thread_handle: Option>>, - pub tcp_connected: bool, pub tcp_addr_field: String, pub latest_baton_send: Option, pub active_baton_connection: bool, - // timestamp of last received baton packet (used by update logic) pub last_baton_instant: Option, - // simple metrics/UI helpers pub show_metrics: bool, pub packets_last_60s: usize, pub bps: f64, - // Optional GUI error message pub error_message: Option, // Is GUI pop-up card open @@ -48,49 +40,38 @@ pub(crate) struct State { pub airspeed_toggle: bool, pub vertical_airspeed_toggle: bool, pub heading_toggle: bool, - pub ipc_bichannel: Option>, pub tcp_bichannel: Option>, - pub last_send_timestamp: Option, } - impl Default for State { fn default() -> State { State { elapsed_time: Duration::ZERO, event_log: Vec::new(), - ipc_thread_handle: None, tcp_thread_handle: None, - tcp_connected: false, tcp_addr_field: String::new(), latest_baton_send: None, active_baton_connection: false, last_baton_instant: None, - show_metrics: false, packets_last_60s: 0, bps: 0.0, - error_message: None, card_open: false, altitude_toggle: true, airspeed_toggle: true, vertical_airspeed_toggle: true, heading_toggle: true, - ipc_bichannel: None, tcp_bichannel: None, - last_send_timestamp: None, } } } - // --- helper functions ------------------------------------------------------- - fn sanitize_field(s: &str) -> String { // remove CR/LF and replace any internal semicolons with commas, // trim whitespace @@ -100,7 +81,6 @@ fn sanitize_field(s: &str) -> String { .trim() .to_string() } - fn normalize_baton_payload(raw: &str) -> Vec { // trim whitespace, remove surrounding CR/LF let mut s = raw.trim().replace('\r', "").replace('\n', ""); @@ -118,7 +98,6 @@ fn normalize_baton_payload(raw: &str) -> Vec { .filter(|f| !f.is_empty()) .collect() } - fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { // Header used in previous code: "E;1;PilotDataSync;;;;;{Event};{fields...}\r\n" let mut packet = String::from("E;1;PilotDataSync;;;;;"); @@ -130,14 +109,12 @@ fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { packet.push_str("\r\n"); packet } - // Add this helper near your other helpers fn send_packet_and_debug(stream: &mut std::net::TcpStream, packet: &str) -> Result<()> { // Print readable and hex views for debugging eprintln!("TX packet (len={}): {:?}", packet.len(), packet); let hex: String = packet.as_bytes().iter().map(|b| format!("{:02X} ", b)).collect(); eprintln!("TX hex: {}", hex.trim_end()); - // Write then flush -- report any error stream.write_all(packet.as_bytes()) .map_err(|e| anyhow::anyhow!("write_all failed: {}", e))?; @@ -145,9 +122,7 @@ fn send_packet_and_debug(stream: &mut std::net::TcpStream, packet: &str) -> Resu .map_err(|e| anyhow::anyhow!("flush failed: {}", e))?; Ok(()) } - // --- State impl ------------------------------------------------------------- - impl State { // Simple metric helpers used by update/view code that expect them. pub fn refresh_metrics_now(&mut self) { @@ -158,27 +133,22 @@ impl State { self.packets_last_60s = self.packets_last_60s.saturating_sub(0); } } - pub fn on_tcp_packet_sent(&mut self, bytes: usize) { // Update simple counters and log self.packets_last_60s = self.packets_last_60s.saturating_add(1); self.bps = bytes as f64; self.log_event(format!("Sent packet ({} bytes)", bytes)); } - pub fn ipc_connect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_some() { bail!("IPC thread already exists.") } - let (ipc_bichannel, mut child_bichannel) = bichannel::create_bichannels::(); let ipc_thread_handle = spawn(move || { let printname = "baton.sock"; let name = printname.to_ns_name::().unwrap(); - let opts = ListenerOptions::new().name(name); - let listener = match opts.create_sync() { Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { eprintln![ @@ -192,11 +162,8 @@ impl State { listener .set_nonblocking(interprocess::local_socket::ListenerNonblockingMode::Both) .expect("Error setting non-blocking mode on listener"); - println!("Server running at {printname}"); - let mut buffer = String::with_capacity(128); - while !child_bichannel.is_killswitch_engaged() { let conn = listener.accept(); let conn = match (child_bichannel.is_killswitch_engaged(), conn) { @@ -213,36 +180,29 @@ impl State { continue; } }; - let mut conn = BufReader::new(conn); child_bichannel.set_is_conn_to_endpoint(true)?; - match conn.read_line(&mut buffer) { Ok(_) => (), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, _ => panic!(), } - let write_res = conn .get_mut() .write_all(b"Hello, from the relay prototype (Rust)!\n"); - match write_res { Ok(_) => (), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, _ => panic!(), } - print!("Client answered: {buffer}"); buffer.clear(); - // Continuously receive data from plugin while !child_bichannel.is_killswitch_engaged() { // check for any new messages from parent and act accordingly for message in child_bichannel.received_messages() { match message {} } - // read from connection input match conn.read_line(&mut buffer) { Ok(s) if s == 0 || buffer.len() == 0 => { @@ -252,7 +212,6 @@ impl State { Ok(_s) => { let _ = buffer.pop(); // remove trailing newline println!("Got: {buffer} ({_s} bytes read)"); - if buffer.starts_with("SHUTDOWN") { let _ = child_bichannel .send_to_parent(FromIpcThreadMessage::BatonShutdown); @@ -262,7 +221,6 @@ impl State { FromIpcThreadMessage::BatonData(buffer.clone()), ); } - buffer.clear(); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, @@ -270,35 +228,27 @@ impl State { } } } - Ok(()) }); - self.ipc_bichannel = Some(ipc_bichannel); self.ipc_thread_handle = Some(ipc_thread_handle); - Ok(()) } - pub fn ipc_disconnect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_none() { bail!("IPC thread does not exist.") } - let Some((bichannel, handle)) = self.ipc_bichannel.take().zip(self.ipc_thread_handle.take()) else { bail!("IPC thread does not exist.") }; - bichannel.killswitch_engage()?; let res = handle .join() .map_err(|e| anyhow!("Join handle err: {e:?}"))?; - Ok(res?) } - pub fn _is_ipc_connected(&self) -> bool { if let Some(status) = self .ipc_bichannel @@ -310,15 +260,12 @@ impl State { false } } - pub fn tcp_connect(&mut self, address: String) -> Result<()> { if self.tcp_thread_handle.is_some() { bail!("TCP thread already exists.") } - let (tcp_bichannel, mut child_bichannel) = bichannel::create_bichannels(); self.tcp_bichannel = Some(tcp_bichannel); - let tcp_thread_handle = spawn(move || { let mut stream = match TcpStream::connect(address) { Ok(stream) => { @@ -326,27 +273,25 @@ impl State { let _ = child_bichannel.set_is_conn_to_endpoint(true); stream } - Err(e) => { println!("Connection failed: {}", e); bail!("Failed to connect to TCP"); } }; - while !child_bichannel.is_killswitch_engaged() { for message in child_bichannel.received_messages() { match message { ToTcpThreadMessage::Send(data) => { // Normalize baton payload let fields = normalize_baton_payload(&data); - - // --- STRICT MAPPING FOR iMOTIONS EVENTS --- - // The relay will attempt to send as many iMotions events - // as the incoming baton payload supports. Expected ordering: - // [Alt_FM, Alt_Pilot, Airspeed_FM, Airspeed_Pilot, - // Vertical_FM, Vertical_Pilot, Heading_FM, Heading_Pilot] + // --- Flexible mapping for iMOTIONS events --- + // The relay accepts two common payload shapes: + // 1) Paired fields for each sample (FM,Pilot) in sequence: + // [Alt_FM, Alt_Pilot, Air_FM, Air_Pilot, Vert_FM, Vert_Pilot, Head_FM, Head_Pilot] + // 2) Single pilot-only values in order: + // [Altitude, Airspeed, Heading, VerticalVelocity] // - // For each pair present, send the corresponding iMotions packet. + // Emit whatever events we can from the incoming payload. if fields.len() < 2 { eprintln!( "Dropping packet: not enough fields (need >=2) but baton sent {}: {:?}", @@ -354,23 +299,74 @@ impl State { ); continue; } - + // If payload is exactly 4 fields, assume pilot-only order + if fields.len() == 4 { + // plugin order: Altitude, Airspeed, Heading, VerticalVelocity + // For iMotions we need (FlightModel, Pilot) pairs. Use the pilot value for both slots. + let alt = fields.get(0).unwrap().clone(); + let air = fields.get(1).unwrap().clone(); + let head = fields.get(2).unwrap().clone(); + let vv = fields.get(3).unwrap().clone(); + // Altitude + let altitude_packet = build_imotions_packet("AltitudeSync", &[alt.clone(), alt.clone()]); + eprintln!("Sending to iMotions: {:?}", altitude_packet); + if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { + eprintln!("Failed to send Altitude packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + // Airspeed + let airspeed_packet = build_imotions_packet("AirspeedSync", &[air.clone(), air.clone()]); + eprintln!("Sending to iMotions: {:?}", airspeed_packet); + if let Err(e) = send_packet_and_debug(&mut stream, &airspeed_packet) { + eprintln!("Failed to send Airspeed packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + // Vertical velocity + let vv_packet = build_imotions_packet("VerticalVelocitySync", &[vv.clone(), vv.clone()]); + eprintln!("Sending to iMotions: {:?}", vv_packet); + if let Err(e) = send_packet_and_debug(&mut stream, &vv_packet) { + eprintln!("Failed to send Vertical Velocity packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + // Heading + let heading_packet = build_imotions_packet("HeadingSync", &[head.clone(), head.clone()]); + eprintln!("Sending to iMotions: {:?}", heading_packet); + if let Err(e) = send_packet_and_debug(&mut stream, &heading_packet) { + eprintln!("Failed to send Heading packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } + // done with this message + continue; + } + // Otherwise attempt the paired-fields mapping (previous behavior) // Send AltitudeSync if we have at least 2 fields - let altitude_payload = vec![ - fields.get(0).unwrap().clone(), - fields.get(1).unwrap().clone(), - ]; - let altitude_packet = build_imotions_packet("AltitudeSync", &altitude_payload); - eprintln!("Sending to iMotions: {:?}", altitude_packet); - - if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { - eprintln!("Failed to send Altitude packet: {}", e); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Err(e); - } else { - let _ = child_bichannel.set_is_conn_to_endpoint(true); + if fields.len() >= 2 { + let altitude_payload = vec![ + fields.get(0).unwrap().clone(), + fields.get(1).unwrap().clone(), + ]; + let altitude_packet = build_imotions_packet("AltitudeSync", &altitude_payload); + eprintln!("Sending to iMotions: {:?}", altitude_packet); + if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { + eprintln!("Failed to send Altitude packet: {}", e); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } } - // Send AirspeedSync if we have at least 4 fields if fields.len() >= 4 { let airspeed_payload = vec![ @@ -379,7 +375,6 @@ impl State { ]; let airspeed_packet = build_imotions_packet("AirspeedSync", &airspeed_payload); eprintln!("Sending to iMotions: {:?}", airspeed_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &airspeed_packet) { eprintln!("Failed to send Airspeed packet: {}", e); let _ = child_bichannel.set_is_conn_to_endpoint(false); @@ -390,16 +385,14 @@ impl State { } else { eprintln!("Airspeed packet skipped: need >=4 fields, have {}", fields.len()); } - - // Send VerticalVelocitySync if we have at least 4 fields + // Send VerticalVelocitySync if we have at least 6 fields if fields.len() >= 6 { let vv_payload = vec![ fields.get(4).unwrap().clone(), fields.get(5).unwrap().clone(), ]; - let vv_packet = build_imotions_packet("VerticalAirspeedSync", &vv_payload); + let vv_packet = build_imotions_packet("VerticalVelocitySync", &vv_payload); eprintln!("Sending to iMotions: {:?}", vv_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &vv_packet) { eprintln!("Failed to send Vertical Velocity packet: {}", e); let _ = child_bichannel.set_is_conn_to_endpoint(false); @@ -410,7 +403,6 @@ impl State { } else { eprintln!("VerticalVelocity packet skipped: need >=6 fields, have {}", fields.len()); } - // Send HeadingSync if we have at least 8 fields if fields.len() >= 8 { let heading_payload = vec![ @@ -419,7 +411,6 @@ impl State { ]; let heading_packet = build_imotions_packet("HeadingSync", &heading_payload); eprintln!("Sending to iMotions: {:?}", heading_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &heading_packet) { eprintln!("Failed to send Heading packet: {}", e); let _ = child_bichannel.set_is_conn_to_endpoint(false); @@ -433,37 +424,28 @@ impl State { } } } - std::thread::sleep(std::time::Duration::from_millis(1)); } - Ok(()) }); - self.tcp_thread_handle = Some(tcp_thread_handle); - Ok(()) } - pub fn tcp_disconnect(&mut self) -> Result<()> { if self.tcp_thread_handle.is_none() { bail!("TCP thread does not exist.") } - let Some((bichannel, handle)) = self.tcp_bichannel.take().zip(self.tcp_thread_handle.take()) else { bail!("TCP thread does not exist.") }; - bichannel.killswitch_engage()?; let res = handle .join() .map_err(|e| anyhow!("Join handle err: {e:?}"))?; - Ok(res?) } - pub fn _is_tcp_connected(&self) -> bool { if let Some(status) = self .tcp_bichannel @@ -475,7 +457,6 @@ impl State { false } } - pub fn log_event(&mut self, event: String) { self.event_log.push(event); } From 48aaa00363bc942dc499749dfbe0907873e6d073 Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Mon, 16 Feb 2026 16:35:47 -0600 Subject: [PATCH 4/6] A little bit of code cleanup, definetely still more that needs to be done. So far I've cleaned up the view.rs, update.rs, and state.rs files. Still haven't cleaned any of the code in the xplane_plugin yet --- relay/src/view.rs | 181 +++++++++++++++++++++------------------------- 1 file changed, 81 insertions(+), 100 deletions(-) diff --git a/relay/src/view.rs b/relay/src/view.rs index 3f8974d..8438e3a 100644 --- a/relay/src/view.rs +++ b/relay/src/view.rs @@ -1,84 +1,58 @@ -//! This module defines the UI View layout using ICED. +//! UI view layout using iced. //! -//! UI elements should be defined as **separate functions** and added to the UI elements vector -//! instead of being written inline in the `view` function. -//! -//! This improves modularity, testing, and code readability. -//! -//! Examples: -//! ```fn spawn_error_message(state: &State) -> Option { /* ... */ } ``` -//! ``` if let Some(error_element) = spawn_error_message(state) { -//! elements.push(error_element.into()); -//! } ``` -//! -//! ``` fn ipc_disconnect_button(_state: &State) -> UIElement {/* ... */} ``` -//! ``` elements.push(tcp_disconnect_button(state)); ``` +//! Each UI element is produced by a focused function. The `view` function composes those +//! elements to keep the module modular and easy to maintain. use std::net::ToSocketAddrs; -use iced::{ - widget::{button, column, container, row, text, text_input, toggler}, - Element, Length, -}; +use iced::widget::{button, column, container, row, text, text_input, toggler}; +use iced::{Element, Length}; use iced_aw::{helpers::card, style}; use crate::{Message, State}; type UIElement<'a> = Element<'a, Message>; -// TODO: Fix the close button on the UI card. It displays a chinese character meaning "plowed earth"????? +const DEFAULT_TCP_PLACEHOLDER: &str = "127.0.0.1:9999"; + pub(crate) fn view(state: &State) -> UIElement { let mut elements: Vec = Vec::new(); - // OPTIONAL Error message - if let Some(error_element) = spawn_error_message(state) { - elements.push(error_element.into()); + if let Some(err) = spawn_error_message(state) { + elements.push(err); } - // Elapsed Time Text elements.push(elapsed_time_element(state)); - - // Baton Latest Send Text elements.push(baton_data_element(state)); elements.push(baton_connect_status_element(state)); - // Send Packet button (Only if baton is running) - Jacob - if let Some(send_btn) = send_packet_button(state) { - elements.push(send_btn); - } + // Send Packet button (enabled/disabled variant) + elements.push(send_packet_button(state)); - // TCP Connection Status elements elements.push(tcp_connect_status_element(state)); - elements.push(check_tcp_status_button(state)); + elements.push(check_tcp_status_button()); - // IPC Connect/Disconnect Buttons - elements.push(ipc_connect_button(state)); - elements.push(ipc_disconnect_button(state)); + elements.push(ipc_connect_button()); + elements.push(ipc_disconnect_button()); - // TCP Connect/Disconnect buttons elements.push(tcp_connect_button(state)); - elements.push(tcp_disconnect_button(state)); + elements.push(tcp_disconnect_button()); - // XML popup - elements.push(xml_downloader_popup(state)); + elements.push(xml_download_popup(state)); - // Create and return the GUI column from that vector column(elements).into() } +/// Show an inline error banner when the application state carries an error message. fn spawn_error_message(state: &State) -> Option { - if let Some(error) = &state.error_message { - Some( - container(text(format!("⚠️ {}", error))) - .padding(10) - .width(Length::Fill) - .style(container::rounded_box) - .center_x(Length::Fill) - .into(), - ) - } else { - None - } + state.error_message.as_ref().map(|err| { + container(text(format!("⚠️ {}", err))) + .padding(10) + .width(Length::Fill) + .style(container::rounded_box) + .center_x(Length::Fill) + .into() + }) } fn elapsed_time_element(state: &State) -> UIElement { @@ -86,89 +60,103 @@ fn elapsed_time_element(state: &State) -> UIElement { } fn baton_connect_status_element(state: &State) -> UIElement { - let connection_status = if state.active_baton_connection { - ":) Baton Connected!".to_string() + let status = if state.active_baton_connection { + "Baton: connected" } else { - ":( No Baton Connection".to_string() + "Baton: disconnected" }; - text(connection_status).into() + text(status).into() } fn baton_data_element(state: &State) -> UIElement { - // need to update view function with float parsing? perhaps? idk - let baton_data = match &state.latest_baton_send { + let content = match &state.latest_baton_send { Some(data) => format!("[BATON]: {}", data), None => "No data from baton.".into(), }; - text(baton_data).into() + text(content).into() } fn tcp_connect_status_element(state: &State) -> UIElement { - text(format!("TCP Connection Status: {}", state.tcp_connected)).into() + text(format!("TCP Connection: {}", state.tcp_connected)).into() } -fn check_tcp_status_button(_state: &State) -> UIElement { +fn check_tcp_status_button() -> UIElement { button("Check TCP Connection Status") .on_press(Message::ConnectionMessage) .into() } -fn ipc_connect_button(_state: &State) -> UIElement { +fn ipc_connect_button() -> UIElement { button("Connect IPC").on_press(Message::ConnectIpc).into() } -fn ipc_disconnect_button(_state: &State) -> UIElement { +fn ipc_disconnect_button() -> UIElement { button("Disconnect IPC") .on_press(Message::DisconnectIpc) .into() } -fn tcp_connect_button(state: &State) -> UIElement { - if state.tcp_addr_field.to_socket_addrs().is_ok() { - row![ - button("Connect TCP").on_press(Message::ConnectTcp), - text_input("127.0.0.1:9999", &state.tcp_addr_field) - .on_input(|addr| Message::TcpAddrFieldUpdate(addr)), - text("Address input is valid") - ] - .spacing(5) +/// TCP address input wired to state updates. +fn tcp_addr_input(state: &State) -> UIElement { + text_input(DEFAULT_TCP_PLACEHOLDER, &state.tcp_addr_field) + .on_input(|addr| Message::TcpAddrFieldUpdate(addr)) .into() +} + +/// Validate current TCP address input by attempting to resolve socket addresses. +fn tcp_addr_valid(state: &State) -> bool { + state.tcp_addr_field.to_socket_addrs().is_ok() +} + +/// TCP connect row: Connect button, address input, and validation hint. +fn tcp_connect_button(state: &State) -> UIElement { + let valid = tcp_addr_valid(state); + let connect_btn = if valid { + button("Connect TCP").on_press(Message::ConnectTcp) } else { - row![ - button("Connect TCP"), - text_input("127.0.0.1:9999", &state.tcp_addr_field) - .on_input(|addr| Message::TcpAddrFieldUpdate(addr)), - text("Address input is invalid") - ] - .spacing(5) - .into() - } + // Keep the button visible but inert when the address is invalid. + button("Connect TCP") + }; + + let hint_text = if valid { + "Address input is valid" + } else { + "Address input is invalid" + }; + + row![ + connect_btn, + tcp_addr_input(state), + text(hint_text) + ] + .spacing(5) + .into() } -fn tcp_disconnect_button(_state: &State) -> UIElement { +fn tcp_disconnect_button() -> UIElement { button("Disconnect TCP") .on_press(Message::DisconnectTcp) .into() } -fn xml_downloader_popup(state: &State) -> UIElement { +/// XML download card or opener button depending on state.card_open. +fn xml_download_popup(state: &State) -> UIElement { if state.card_open { container( card( - // FIXME: reword these toggles to actually be snappy wording - text(format!("Download the XML File!")), + text("Download the XML File!"), column![ toggler(state.altitude_toggle) - .label("Altitude Toggle!") + .label("Altitude") .on_toggle(Message::AltitudeToggle), toggler(state.airspeed_toggle) - .label("Airspeed Toggle") + .label("Airspeed") .on_toggle(Message::AirspeedToggle), toggler(state.vertical_airspeed_toggle) - .label("Vertical Airspeed Toggle") + .label("Vertical Airspeed") .on_toggle(Message::VerticalAirspeedToggle), toggler(state.heading_toggle) - .label("Heading Toggle") + .label("Heading") .on_toggle(Message::HeadingToggle), button("Generate XML File").on_press(Message::CreateXMLFile), ], @@ -184,23 +172,16 @@ fn xml_downloader_popup(state: &State) -> UIElement { } } - -fn send_packet_button(state: &State) -> Option { +/// Send packet button — enabled when Baton connection active, otherwise inert but visible. +fn send_packet_button(state: &State) -> UIElement { if state.active_baton_connection { - Some( - button("Send Packet") - .on_press(Message::SendPacket) - .into(), - ) + button("Send Packet").on_press(Message::SendPacket).into() } else { - Some( - button("Send Packet (No Baton Connection)") - .into(), - ) + button("Send Packet (No Baton Connection)").into() } } -// small helper to format bps +/// Format bytes-per-second into a human-friendly string. fn human_bps(bps: f64) -> String { if bps <= 0.0 { return "0 B/s".into(); From 745bd27c832626818248f8c1086ce8bbad8eda77 Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Mon, 16 Feb 2026 16:35:59 -0600 Subject: [PATCH 5/6] A little bit of code cleanup, definetely still more that needs to be done. So far I've cleaned up the view.rs, update.rs, and state.rs files. Still haven't cleaned any of the code in the xplane_plugin yet --- relay/src/state.rs | 421 ++++++++++++++++++-------------------------- relay/src/update.rs | 211 +++++++++------------- relay/src/view.rs | 92 ++++++---- 3 files changed, 305 insertions(+), 419 deletions(-) diff --git a/relay/src/state.rs b/relay/src/state.rs index 3f2488a..05a0e5f 100644 --- a/relay/src/state.rs +++ b/relay/src/state.rs @@ -1,20 +1,20 @@ -use anyhow::Result; -use anyhow::{anyhow, bail}; -use std::io::BufRead; -use std::io::BufReader; -use std::io::Write; -use std::net::TcpStream; -use std::thread::{JoinHandle, spawn}; +use anyhow::{anyhow, bail, Result}; use iced::time::Duration; -use crate::bichannel; -use crate::bichannel::ParentBiChannel; -use crate::message::FromIpcThreadMessage; -use crate::message::FromTcpThreadMessage; -use crate::message::ToIpcThreadMessage; -use crate::message::ToTcpThreadMessage; use interprocess::local_socket::{traits::Listener, GenericNamespaced, ListenerOptions, ToNsName}; +use std::io::{BufRead, BufReader, Write}; +use std::net::TcpStream; +use std::thread::{spawn, JoinHandle}; use std::time::{Duration as StdDuration, Instant}; -// --- State definition and Default impl (replace existing block) --- + +use crate::bichannel; +use crate::bichannel::ParentBiChannel; +use crate::message::{FromIpcThreadMessage, FromTcpThreadMessage, ToIpcThreadMessage, ToTcpThreadMessage}; + +/// Simple, consistent log helper used inside this module and spawned threads. +fn relay_log(msg: &str) { + eprintln!("[relay] {}", msg); +} + #[allow(unused)] pub(crate) struct State { pub elapsed_time: Duration, @@ -25,17 +25,17 @@ pub(crate) struct State { pub tcp_addr_field: String, pub latest_baton_send: Option, pub active_baton_connection: bool, - // timestamp of last received baton packet (used by update logic) + /// timestamp of last received baton packet (used by update logic) pub last_baton_instant: Option, - // simple metrics/UI helpers + /// simple metrics/UI helpers pub show_metrics: bool, pub packets_last_60s: usize, pub bps: f64, - // Optional GUI error message + /// Optional GUI error message pub error_message: Option, - // Is GUI pop-up card open + /// Is GUI pop-up card open pub card_open: bool, - // GUI Toggle state elements + /// GUI Toggle state elements pub altitude_toggle: bool, pub airspeed_toggle: bool, pub vertical_airspeed_toggle: bool, @@ -44,6 +44,7 @@ pub(crate) struct State { pub tcp_bichannel: Option>, pub last_send_timestamp: Option, } + impl Default for State { fn default() -> State { State { @@ -71,35 +72,32 @@ impl Default for State { } } } -// --- helper functions ------------------------------------------------------- + +// --- helpers ---------------------------------------------------------------- + fn sanitize_field(s: &str) -> String { - // remove CR/LF and replace any internal semicolons with commas, - // trim whitespace s.replace('\r', "") .replace('\n', "") .replace(';', ",") .trim() .to_string() } + fn normalize_baton_payload(raw: &str) -> Vec { - // trim whitespace, remove surrounding CR/LF let mut s = raw.trim().replace('\r', "").replace('\n', ""); - // remove leading semicolons that create empty first fields while s.starts_with(';') { s.remove(0); } - // also remove trailing semicolons (avoid empty trailing field) while s.ends_with(';') { s.pop(); } - // split on semicolon and sanitize each field s.split(';') .map(|f| sanitize_field(f)) .filter(|f| !f.is_empty()) .collect() } + fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { - // Header used in previous code: "E;1;PilotDataSync;;;;;{Event};{fields...}\r\n" let mut packet = String::from("E;1;PilotDataSync;;;;;"); packet.push_str(event_name); if !fields.is_empty() { @@ -109,131 +107,133 @@ fn build_imotions_packet(event_name: &str, fields: &[String]) -> String { packet.push_str("\r\n"); packet } -// Add this helper near your other helpers -fn send_packet_and_debug(stream: &mut std::net::TcpStream, packet: &str) -> Result<()> { - // Print readable and hex views for debugging - eprintln!("TX packet (len={}): {:?}", packet.len(), packet); - let hex: String = packet.as_bytes().iter().map(|b| format!("{:02X} ", b)).collect(); - eprintln!("TX hex: {}", hex.trim_end()); - // Write then flush -- report any error - stream.write_all(packet.as_bytes()) - .map_err(|e| anyhow::anyhow!("write_all failed: {}", e))?; - stream.flush() - .map_err(|e| anyhow::anyhow!("flush failed: {}", e))?; + +fn send_packet(stream: &mut TcpStream, packet: &str) -> Result<()> { + stream + .write_all(packet.as_bytes()) + .map_err(|e| anyhow!("write_all failed: {}", e))?; + stream + .flush() + .map_err(|e| anyhow!("flush failed: {}", e))?; Ok(()) } + // --- State impl ------------------------------------------------------------- + impl State { - // Simple metric helpers used by update/view code that expect them. pub fn refresh_metrics_now(&mut self) { - // placeholder: in future compute accurate rates from history - // Here we keep current values; could implement sliding window later. if self.packets_last_60s > 0 { - // naive decay to avoid stale large counts (noop for now) self.packets_last_60s = self.packets_last_60s.saturating_sub(0); } } + pub fn on_tcp_packet_sent(&mut self, bytes: usize) { - // Update simple counters and log self.packets_last_60s = self.packets_last_60s.saturating_add(1); self.bps = bytes as f64; self.log_event(format!("Sent packet ({} bytes)", bytes)); } + + pub fn is_ipc_connected(&self) -> bool { + self.ipc_bichannel + .as_ref() + .and_then(|b| b.is_conn_to_endpoint().ok()) + .unwrap_or(false) + } + + pub fn is_tcp_connected(&self) -> bool { + self.tcp_bichannel + .as_ref() + .and_then(|b| b.is_conn_to_endpoint().ok()) + .unwrap_or(false) + } + pub fn ipc_connect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_some() { - bail!("IPC thread already exists.") + bail!("IPC thread already exists."); } + let (ipc_bichannel, mut child_bichannel) = bichannel::create_bichannels::(); - let ipc_thread_handle = spawn(move || { + + let handle = spawn(move || { let printname = "baton.sock"; - let name = printname.to_ns_name::().unwrap(); + let name = printname.to_ns_name::().map_err(|e| { + relay_log(&format!("Failed to convert name to NsName: {}", e)); + anyhow!("Name conversion failed") + })?; let opts = ListenerOptions::new().name(name); - let listener = match opts.create_sync() { - Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { - eprintln![ - "Error: could not start server because the socket file is occupied. Please check if - {printname} is in use by another process and try again." - ]; - return Ok(()); - } - x => x.unwrap(), - }; + let listener = opts.create_sync().map_err(|e| { + relay_log(&format!("Failed to create IPC listener: {}", e)); + anyhow!("Listener create failed") + })?; listener .set_nonblocking(interprocess::local_socket::ListenerNonblockingMode::Both) - .expect("Error setting non-blocking mode on listener"); - println!("Server running at {printname}"); + .map_err(|e| anyhow!("set_nonblocking failed: {}", e))?; + relay_log("IPC server running"); + let mut buffer = String::with_capacity(128); while !child_bichannel.is_killswitch_engaged() { - let conn = listener.accept(); - let conn = match (child_bichannel.is_killswitch_engaged(), conn) { - (true, _) => return Ok(()), - (_, Ok(c)) => { - println!("success"); - c + match listener.accept() { + Ok(conn) => { + relay_log("IPC incoming connection accepted"); + let mut conn = BufReader::new(conn); + let _ = child_bichannel.set_is_conn_to_endpoint(true); + + loop { + if child_bichannel.is_killswitch_engaged() { + break; + } + + // Check parent messages (none expected for now) + for _msg in child_bichannel.received_messages() { + // intentionally no-op; reserved for future commands + } + + match conn.read_line(&mut buffer) { + Ok(0) => { + buffer.clear(); + break; + } + Ok(_) => { + let _ = buffer.pop(); // remove trailing newline if present + if buffer.starts_with("SHUTDOWN") { + let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonShutdown); + break; + } else { + let _ = child_bichannel.send_to_parent(FromIpcThreadMessage::BatonData(buffer.clone())); + } + buffer.clear(); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => { + relay_log(&format!("IPC connection read error: {}", e)); + break; + } + } + } + + let _ = child_bichannel.set_is_conn_to_endpoint(false); } - (_, Err(e)) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // no incoming connection right now + std::thread::sleep(StdDuration::from_millis(5)); continue; } - (_, Err(e)) => { - eprintln!("Incoming connection failed: {e}"); + Err(e) => { + relay_log(&format!("IPC accept failed: {}", e)); + std::thread::sleep(StdDuration::from_millis(50)); continue; } - }; - let mut conn = BufReader::new(conn); - child_bichannel.set_is_conn_to_endpoint(true)?; - match conn.read_line(&mut buffer) { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - _ => panic!(), - } - let write_res = conn - .get_mut() - .write_all(b"Hello, from the relay prototype (Rust)!\n"); - match write_res { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - _ => panic!(), - } - print!("Client answered: {buffer}"); - buffer.clear(); - // Continuously receive data from plugin - while !child_bichannel.is_killswitch_engaged() { - // check for any new messages from parent and act accordingly - for message in child_bichannel.received_messages() { - match message {} - } - // read from connection input - match conn.read_line(&mut buffer) { - Ok(s) if s == 0 || buffer.len() == 0 => { - buffer.clear(); - continue; - } - Ok(_s) => { - let _ = buffer.pop(); // remove trailing newline - println!("Got: {buffer} ({_s} bytes read)"); - if buffer.starts_with("SHUTDOWN") { - let _ = child_bichannel - .send_to_parent(FromIpcThreadMessage::BatonShutdown); - return Ok(()); - } else { - let _ = child_bichannel.send_to_parent( - FromIpcThreadMessage::BatonData(buffer.clone()), - ); - } - buffer.clear(); - } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - Err(e) => panic!("Got err {e}"), - } } } Ok(()) }); + self.ipc_bichannel = Some(ipc_bichannel); - self.ipc_thread_handle = Some(ipc_thread_handle); + self.ipc_thread_handle = Some(handle); Ok(()) } + pub fn ipc_disconnect(&mut self) -> Result<()> { if self.ipc_thread_handle.is_none() { bail!("IPC thread does not exist.") @@ -249,188 +249,119 @@ impl State { .map_err(|e| anyhow!("Join handle err: {e:?}"))?; Ok(res?) } - pub fn _is_ipc_connected(&self) -> bool { - if let Some(status) = self - .ipc_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - status - } else { - false - } - } + pub fn tcp_connect(&mut self, address: String) -> Result<()> { if self.tcp_thread_handle.is_some() { bail!("TCP thread already exists.") } let (tcp_bichannel, mut child_bichannel) = bichannel::create_bichannels(); self.tcp_bichannel = Some(tcp_bichannel); - let tcp_thread_handle = spawn(move || { - let mut stream = match TcpStream::connect(address) { - Ok(stream) => { - println!("Successfully connected."); - let _ = child_bichannel.set_is_conn_to_endpoint(true); - stream - } - Err(e) => { - println!("Connection failed: {}", e); - bail!("Failed to connect to TCP"); - } - }; + + let handle = spawn(move || { + let mut stream = TcpStream::connect(address).map_err(|e| { + relay_log(&format!("TCP connect failed: {}", e)); + anyhow!("Failed to connect to TCP") + })?; + relay_log("TCP connected"); + let _ = child_bichannel.set_is_conn_to_endpoint(true); + while !child_bichannel.is_killswitch_engaged() { for message in child_bichannel.received_messages() { match message { ToTcpThreadMessage::Send(data) => { - // Normalize baton payload let fields = normalize_baton_payload(&data); - // --- Flexible mapping for iMOTIONS events --- - // The relay accepts two common payload shapes: - // 1) Paired fields for each sample (FM,Pilot) in sequence: - // [Alt_FM, Alt_Pilot, Air_FM, Air_Pilot, Vert_FM, Vert_Pilot, Head_FM, Head_Pilot] - // 2) Single pilot-only values in order: - // [Altitude, Airspeed, Heading, VerticalVelocity] - // - // Emit whatever events we can from the incoming payload. + if fields.len() < 2 { - eprintln!( - "Dropping packet: not enough fields (need >=2) but baton sent {}: {:?}", - fields.len(), fields - ); + relay_log(&format!("Dropping packet: not enough fields (need >=2) but baton sent {}: {:?}", fields.len(), fields)); continue; } - // If payload is exactly 4 fields, assume pilot-only order + + // Pilot-only 4-field payload handling if fields.len() == 4 { - // plugin order: Altitude, Airspeed, Heading, VerticalVelocity - // For iMotions we need (FlightModel, Pilot) pairs. Use the pilot value for both slots. - let alt = fields.get(0).unwrap().clone(); - let air = fields.get(1).unwrap().clone(); - let head = fields.get(2).unwrap().clone(); - let vv = fields.get(3).unwrap().clone(); - // Altitude - let altitude_packet = build_imotions_packet("AltitudeSync", &[alt.clone(), alt.clone()]); - eprintln!("Sending to iMotions: {:?}", altitude_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { - eprintln!("Failed to send Altitude packet: {}", e); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Err(e); - } else { - let _ = child_bichannel.set_is_conn_to_endpoint(true); - } - // Airspeed - let airspeed_packet = build_imotions_packet("AirspeedSync", &[air.clone(), air.clone()]); - eprintln!("Sending to iMotions: {:?}", airspeed_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &airspeed_packet) { - eprintln!("Failed to send Airspeed packet: {}", e); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Err(e); - } else { - let _ = child_bichannel.set_is_conn_to_endpoint(true); - } - // Vertical velocity - let vv_packet = build_imotions_packet("VerticalVelocitySync", &[vv.clone(), vv.clone()]); - eprintln!("Sending to iMotions: {:?}", vv_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &vv_packet) { - eprintln!("Failed to send Vertical Velocity packet: {}", e); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Err(e); - } else { - let _ = child_bichannel.set_is_conn_to_endpoint(true); - } - // Heading - let heading_packet = build_imotions_packet("HeadingSync", &[head.clone(), head.clone()]); - eprintln!("Sending to iMotions: {:?}", heading_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &heading_packet) { - eprintln!("Failed to send Heading packet: {}", e); - let _ = child_bichannel.set_is_conn_to_endpoint(false); - return Err(e); - } else { - let _ = child_bichannel.set_is_conn_to_endpoint(true); + let alt = fields[0].clone(); + let air = fields[1].clone(); + let head = fields[2].clone(); + let vv = fields[3].clone(); + + let packets = [ + build_imotions_packet("AltitudeSync", &[alt.clone(), alt.clone()]), + build_imotions_packet("AirspeedSync", &[air.clone(), air.clone()]), + build_imotions_packet("VerticalVelocitySync", &[vv.clone(), vv.clone()]), + build_imotions_packet("HeadingSync", &[head.clone(), head.clone()]), + ]; + + for pkt in &packets { + if let Err(e) = send_packet(&mut stream, pkt) { + relay_log(&format!("TCP send failed: {}", e)); + let _ = child_bichannel.set_is_conn_to_endpoint(false); + return Err(e); + } else { + let _ = child_bichannel.set_is_conn_to_endpoint(true); + } } - // done with this message continue; } - // Otherwise attempt the paired-fields mapping (previous behavior) - // Send AltitudeSync if we have at least 2 fields + + // Paired-fields mapping (legacy behavior) if fields.len() >= 2 { - let altitude_payload = vec![ - fields.get(0).unwrap().clone(), - fields.get(1).unwrap().clone(), - ]; + let altitude_payload = vec![fields[0].clone(), fields[1].clone()]; let altitude_packet = build_imotions_packet("AltitudeSync", &altitude_payload); - eprintln!("Sending to iMotions: {:?}", altitude_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &altitude_packet) { - eprintln!("Failed to send Altitude packet: {}", e); + if let Err(e) = send_packet(&mut stream, &altitude_packet) { + relay_log(&format!("Failed to send Altitude packet: {}", e)); let _ = child_bichannel.set_is_conn_to_endpoint(false); return Err(e); } else { let _ = child_bichannel.set_is_conn_to_endpoint(true); } } - // Send AirspeedSync if we have at least 4 fields + if fields.len() >= 4 { - let airspeed_payload = vec![ - fields.get(2).unwrap().clone(), - fields.get(3).unwrap().clone(), - ]; + let airspeed_payload = vec![fields[2].clone(), fields[3].clone()]; let airspeed_packet = build_imotions_packet("AirspeedSync", &airspeed_payload); - eprintln!("Sending to iMotions: {:?}", airspeed_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &airspeed_packet) { - eprintln!("Failed to send Airspeed packet: {}", e); + if let Err(e) = send_packet(&mut stream, &airspeed_packet) { + relay_log(&format!("Failed to send Airspeed packet: {}", e)); let _ = child_bichannel.set_is_conn_to_endpoint(false); return Err(e); } else { let _ = child_bichannel.set_is_conn_to_endpoint(true); } - } else { - eprintln!("Airspeed packet skipped: need >=4 fields, have {}", fields.len()); } - // Send VerticalVelocitySync if we have at least 6 fields + if fields.len() >= 6 { - let vv_payload = vec![ - fields.get(4).unwrap().clone(), - fields.get(5).unwrap().clone(), - ]; + let vv_payload = vec![fields[4].clone(), fields[5].clone()]; let vv_packet = build_imotions_packet("VerticalVelocitySync", &vv_payload); - eprintln!("Sending to iMotions: {:?}", vv_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &vv_packet) { - eprintln!("Failed to send Vertical Velocity packet: {}", e); + if let Err(e) = send_packet(&mut stream, &vv_packet) { + relay_log(&format!("Failed to send Vertical Velocity packet: {}", e)); let _ = child_bichannel.set_is_conn_to_endpoint(false); return Err(e); } else { let _ = child_bichannel.set_is_conn_to_endpoint(true); } - } else { - eprintln!("VerticalVelocity packet skipped: need >=6 fields, have {}", fields.len()); } - // Send HeadingSync if we have at least 8 fields + if fields.len() >= 8 { - let heading_payload = vec![ - fields.get(6).unwrap().clone(), - fields.get(7).unwrap().clone(), - ]; + let heading_payload = vec![fields[6].clone(), fields[7].clone()]; let heading_packet = build_imotions_packet("HeadingSync", &heading_payload); - eprintln!("Sending to iMotions: {:?}", heading_packet); - if let Err(e) = send_packet_and_debug(&mut stream, &heading_packet) { - eprintln!("Failed to send Heading packet: {}", e); + if let Err(e) = send_packet(&mut stream, &heading_packet) { + relay_log(&format!("Failed to send Heading packet: {}", e)); let _ = child_bichannel.set_is_conn_to_endpoint(false); return Err(e); } else { let _ = child_bichannel.set_is_conn_to_endpoint(true); } - } else { - eprintln!("Heading packet skipped: need >=8 fields, have {}", fields.len()); } } } } - std::thread::sleep(std::time::Duration::from_millis(1)); + std::thread::sleep(StdDuration::from_millis(1)); } Ok(()) }); - self.tcp_thread_handle = Some(tcp_thread_handle); + + self.tcp_thread_handle = Some(handle); Ok(()) } + pub fn tcp_disconnect(&mut self) -> Result<()> { if self.tcp_thread_handle.is_none() { bail!("TCP thread does not exist.") @@ -446,17 +377,7 @@ impl State { .map_err(|e| anyhow!("Join handle err: {e:?}"))?; Ok(res?) } - pub fn _is_tcp_connected(&self) -> bool { - if let Some(status) = self - .tcp_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - status - } else { - false - } - } + pub fn log_event(&mut self, event: String) { self.event_log.push(event); } diff --git a/relay/src/update.rs b/relay/src/update.rs index 1f22da1..ba5372b 100644 --- a/relay/src/update.rs +++ b/relay/src/update.rs @@ -1,49 +1,38 @@ use iced::{time::Duration, Task}; use std::fs::File; -use std::io::prelude::*; -use std::time::{Instant, Duration as StdDuration}; +use std::io::Write; +use std::time::{Duration as StdDuration, Instant}; -use crate::{message::ToTcpThreadMessage, message::FromTcpThreadMessage, FromIpcThreadMessage, Message, State}; +use crate::{ + message::FromTcpThreadMessage, message::FromIpcThreadMessage, message::ToTcpThreadMessage, Message, State, +}; pub(crate) fn update(state: &mut State, message: Message) -> Task { use Message as M; - // threshold window considered "recent" (avoid short false-negatives) const LAST_SEEN_WINDOW: StdDuration = StdDuration::from_secs(2); - #[allow(unreachable_patterns)] match message { M::Update => { state.elapsed_time += Duration::from_millis(10); - // 1) compute active baton connection: true if IPC reports connected OR we saw a packet recently - let ipc_conn_flag = state - .ipc_bichannel - .as_ref() - .and_then(|b| b.is_conn_to_endpoint().ok()) - .unwrap_or(false); - + // compute active baton connection using centralized helpers + let ipc_conn_flag = state.is_ipc_connected(); let recent_packet = state .last_baton_instant .map(|t| t.elapsed() <= LAST_SEEN_WINDOW) .unwrap_or(false); - state.active_baton_connection = ipc_conn_flag || recent_packet; - // check for messages from IPC thread + // process IPC messages if let Some(ipc_bichannel) = &state.ipc_bichannel { - for message in ipc_bichannel.received_messages() { - match message { + for msg in ipc_bichannel.received_messages() { + match msg { FromIpcThreadMessage::BatonData(data) => { - // forward to TCP thread (clone for the outgoing buffer) - state.tcp_bichannel.as_mut().map(|tcp_bichannel| { - tcp_bichannel.send_to_child(ToTcpThreadMessage::Send(data.clone())) - }); - - // log each individual baton packet so UI shows it - state.log_event(format!("Baton packet: {data}")); - - // store latest data and mark active connection + timestamp + if let Some(tcp_bi) = state.tcp_bichannel.as_mut() { + let _ = tcp_bi.send_to_child(ToTcpThreadMessage::Send(data.clone())); + } + state.log_event(format!("Baton packet: {}", data)); state.latest_baton_send = Some(data); state.last_baton_instant = Some(Instant::now()); state.active_baton_connection = true; @@ -52,28 +41,28 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { let _ = state.tcp_disconnect(); state.active_baton_connection = false; } - _ => (), + _ => {} } } } - // check for messages from TCP thread + // process TCP messages if let Some(tcp_bichannel) = &state.tcp_bichannel { - for message in tcp_bichannel.received_messages() { - match message { + for msg in tcp_bichannel.received_messages() { + match msg { FromTcpThreadMessage::Connected => { state.log_event("TCP connected to iMotions".into()); state.tcp_connected = true; } FromTcpThreadMessage::Disconnected(reason) => { - state.log_event(format!("TCP disconnected: {reason}")); + state.log_event(format!("TCP disconnected: {}", reason)); state.tcp_connected = false; } FromTcpThreadMessage::Sent(bytes) => { state.on_tcp_packet_sent(bytes); } FromTcpThreadMessage::SendError(err) => { - state.log_event(format!("TCP send error: {err}")); + state.log_event(format!("TCP send error: {}", err)); } } } @@ -82,68 +71,49 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::WindowCloseRequest(id) => { - // pre-shutdown operations go here if let Some(ref bichannel) = state.ipc_bichannel { let _ = bichannel.killswitch_engage(); } - if let Some(ref bichannel) = state.tcp_bichannel { let _ = bichannel.killswitch_engage(); } - // delete socket file - let socket_file_path = if cfg!(target_os = "macos") { - "/tmp/baton.sock" - } else { - // TODO: add branch for Windows; mac branch is just for testing/building - panic!( - "No implementation available for given operating system: {}", - std::env::consts::OS - ) - }; - std::fs::remove_file(socket_file_path).unwrap(); + // remove unix socket file on macos test/dev path only + if cfg!(target_os = "macos") { + let _ = std::fs::remove_file("/tmp/baton.sock"); + } - // necessary to actually shut down the window, otherwise the close button will appear to not work iced::window::close(id) } M::ConnectionMessage => { - if let Some(status) = state - .tcp_bichannel - .as_ref() - .and_then(|bichannel| bichannel.is_conn_to_endpoint().ok()) - { - state.tcp_connected = status - } else { - state.tcp_connected = false - } + state.tcp_connected = state.is_tcp_connected(); Task::none() } M::ConnectIpc => { if let Err(e) = state.ipc_connect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("IPC connect failed: {}", e)); + } Task::none() } M::DisconnectIpc => { if let Err(e) = state.ipc_disconnect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("IPC disconnect failed: {}", e)); + } Task::none() } M::ConnectTcp => { let address = state.tcp_addr_field.clone(); if let Err(e) = state.tcp_connect(address) { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("TCP connect failed: {}", e)); + } Task::none() } M::DisconnectTcp => { if let Err(e) = state.tcp_disconnect() { - state.log_event(format!("Error: {e:?}")); - }; + state.log_event(format!("TCP disconnect failed: {}", e)); + } Task::none() } - // Toggle messages for GUI XML generator M::AltitudeToggle(value) => { state.altitude_toggle = value; Task::none() @@ -161,7 +131,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::CreateXMLFile => create_xml_file(state), - // Card Open/Close messages for GUI pop-up-card window M::CardOpen => { state.card_open = true; Task::none() @@ -171,7 +140,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { Task::none() } M::TcpAddrFieldUpdate(addr) => { - // Update the TCP address text input in the GUI let is_chars_valid = addr.chars().all(|c| c.is_numeric() || c == '.' || c == ':'); let dot_count = addr.chars().filter(|&c| c == '.').count(); let colon_count = addr.chars().filter(|&c| c == ':').count(); @@ -190,89 +158,68 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { } } -// Creates a default XML file when a button is clicked in the GUI fn create_xml_file(state: &mut State) -> Task { - // Get the user's downloads directory - let mut downloads_path = - dirs::download_dir().expect("Retrieving the user's Downloads file directory."); + let mut downloads_path = match dirs::download_dir() { + Some(p) => p, + None => { + let msg = "Could not determine Downloads directory".to_string(); + state.error_message = Some(msg.clone()); + state.log_event(msg); + return Task::none(); + } + }; downloads_path.push("iMotions.xml"); - // Create file in downloads directory. If alr there, will overwrite the existing file. - let mut file = File::create(&downloads_path).expect("Creating XML File."); - - // Check if all dataref toggles are false. If so, return error message - if !state.altitude_toggle - && !state.airspeed_toggle - && !state.vertical_airspeed_toggle - && !state.heading_toggle - { + // Validate toggles + if !state.altitude_toggle && !state.airspeed_toggle && !state.vertical_airspeed_toggle && !state.heading_toggle { state.error_message = Some("Please select at least one dataref toggle".into()); return Task::none(); } - state.error_message = None; // Clear previous error + state.error_message = None; - // NOTE: This XML formatting was found in the PilotDataSync Slack. Double check this is the correct formatting. - let mut contents = String::from( - "\n", - ); + let mut contents = String::from("\n"); if state.altitude_toggle { - let mut altitude_str = - String::from("\t\n"); - - altitude_str.push_str( - "\t\t\n", - ); - altitude_str.push_str( - "\t\t\n", - ); - altitude_str.push_str("\t\n"); - - contents.push_str(&altitude_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.airspeed_toggle { - let mut airspeed_str = - String::from("\t\n"); - - airspeed_str.push_str( - "\t\t\n", - ); - airspeed_str.push_str( - "\t\t\n", - ); - airspeed_str.push_str("\t\n"); - - contents.push_str(&airspeed_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.vertical_airspeed_toggle { - let mut vertical_airspeed_str = String::from( - "\t\n", - ); - - vertical_airspeed_str.push_str("\t\t\n"); - vertical_airspeed_str.push_str("\t\t\n"); - vertical_airspeed_str.push_str("\t\n"); - - contents.push_str(&vertical_airspeed_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } if state.heading_toggle { - let mut heading_str = - String::from("\t\n"); - - heading_str.push_str( - "\t\t\n", - ); - heading_str.push_str( - "\t\t\n", - ); - heading_str.push_str("\t\n"); - - contents.push_str(&heading_str); + contents.push_str("\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\t\n"); + contents.push_str("\t\n"); } contents.push_str(""); - // Write XML file - file.write_all(contents.as_bytes()) - .expect("Writing to XML file"); + match File::create(&downloads_path) { + Ok(mut file) => { + if let Err(e) = file.write_all(contents.as_bytes()) { + let msg = format!("Writing XML file failed: {}", e); + state.error_message = Some(msg.clone()); + state.log_event(msg); + } else { + state.log_event(format!("XML file written to {}", downloads_path.display())); + } + } + Err(e) => { + let msg = format!("Creating XML file failed: {}", e); + state.error_message = Some(msg.clone()); + state.log_event(msg); + } + } - Task::none() // Return type that we need for the Update logic + Task::none() } diff --git a/relay/src/view.rs b/relay/src/view.rs index 8438e3a..6cb7f45 100644 --- a/relay/src/view.rs +++ b/relay/src/view.rs @@ -1,7 +1,7 @@ -//! UI view layout using iced. +//! UI view definitions using iced. //! -//! Each UI element is produced by a focused function. The `view` function composes those -//! elements to keep the module modular and easy to maintain. +//! Each UI element is produced by a small focused function to improve readability, +//! testability and maintainability. The `view` function composes those elements. use std::net::ToSocketAddrs; @@ -15,59 +15,74 @@ type UIElement<'a> = Element<'a, Message>; const DEFAULT_TCP_PLACEHOLDER: &str = "127.0.0.1:9999"; +/// Compose the UI by collecting small, single-responsibility elements. pub(crate) fn view(state: &State) -> UIElement { let mut elements: Vec = Vec::new(); + // Optional error banner if let Some(err) = spawn_error_message(state) { elements.push(err); } + // Informational text elements.push(elapsed_time_element(state)); elements.push(baton_data_element(state)); elements.push(baton_connect_status_element(state)); - // Send Packet button (enabled/disabled variant) - elements.push(send_packet_button(state)); + // Action buttons + if let Some(btn) = send_packet_button(state) { + elements.push(btn); + } + // TCP controls and status elements.push(tcp_connect_status_element(state)); elements.push(check_tcp_status_button()); + // IPC controls elements.push(ipc_connect_button()); elements.push(ipc_disconnect_button()); + // TCP connect/disconnect row elements.push(tcp_connect_button(state)); elements.push(tcp_disconnect_button()); + // XML download / card elements.push(xml_download_popup(state)); column(elements).into() } -/// Show an inline error banner when the application state carries an error message. +/// Error banner shown only when `state.error_message` is Some(...) fn spawn_error_message(state: &State) -> Option { - state.error_message.as_ref().map(|err| { - container(text(format!("⚠️ {}", err))) - .padding(10) - .width(Length::Fill) - .style(container::rounded_box) - .center_x(Length::Fill) - .into() - }) -} - + state + .error_message + .as_ref() + .map(|err| { + container(text(format!("⚠️ {}", err))) + .padding(10) + .width(Length::Fill) + .style(container::rounded_box) + .center_x(Length::Fill) + .into() + }) +} + +/// Elapsed time display fn elapsed_time_element(state: &State) -> UIElement { text(format!("Elapsed time: {:?}", state.elapsed_time)).into() } +/// Baton connection status display fn baton_connect_status_element(state: &State) -> UIElement { - let status = if state.active_baton_connection { - "Baton: connected" + let connection_status = if state.active_baton_connection { + ":) Baton Connected!".to_string() } else { - "Baton: disconnected" + ":( No Baton Connection".to_string() }; - text(status).into() + text(connection_status).into() } +/// Last baton payload (or placeholder) fn baton_data_element(state: &State) -> UIElement { let content = match &state.latest_baton_send { Some(data) => format!("[BATON]: {}", data), @@ -76,16 +91,19 @@ fn baton_data_element(state: &State) -> UIElement { text(content).into() } +/// TCP connection boolean status fn tcp_connect_status_element(state: &State) -> UIElement { - text(format!("TCP Connection: {}", state.tcp_connected)).into() + text(format!("TCP Connection Status: {}", state.tcp_connected)).into() } +/// Simple helper: a button which triggers the app to verify the TCP connection state. fn check_tcp_status_button() -> UIElement { button("Check TCP Connection Status") .on_press(Message::ConnectionMessage) .into() } +/// IPC connect / disconnect buttons fn ipc_connect_button() -> UIElement { button("Connect IPC").on_press(Message::ConnectIpc).into() } @@ -96,38 +114,38 @@ fn ipc_disconnect_button() -> UIElement { .into() } -/// TCP address input wired to state updates. +/// Build the TCP address input widget wired to `Message::TcpAddrFieldUpdate`. fn tcp_addr_input(state: &State) -> UIElement { text_input(DEFAULT_TCP_PLACEHOLDER, &state.tcp_addr_field) .on_input(|addr| Message::TcpAddrFieldUpdate(addr)) .into() } -/// Validate current TCP address input by attempting to resolve socket addresses. +/// Return true when the current TCP address input parses to socket addresses. fn tcp_addr_valid(state: &State) -> bool { state.tcp_addr_field.to_socket_addrs().is_ok() } -/// TCP connect row: Connect button, address input, and validation hint. +/// TCP connect row: button, address input, and validation hint. fn tcp_connect_button(state: &State) -> UIElement { let valid = tcp_addr_valid(state); - let connect_btn = if valid { - button("Connect TCP").on_press(Message::ConnectTcp) + let hint = if valid { + "Address input is valid" } else { - // Keep the button visible but inert when the address is invalid. - button("Connect TCP") + "Address input is invalid" }; - let hint_text = if valid { - "Address input is valid" + let connect_btn = if valid { + button("Connect TCP").on_press(Message::ConnectTcp) } else { - "Address input is invalid" + // keep the button, but disable the event when invalid by not wiring on_press + button("Connect TCP") }; row![ connect_btn, tcp_addr_input(state), - text(hint_text) + text(hint) ] .spacing(5) .into() @@ -139,7 +157,7 @@ fn tcp_disconnect_button() -> UIElement { .into() } -/// XML download card or opener button depending on state.card_open. +/// XML download card or opener button depending on `state.card_open` fn xml_download_popup(state: &State) -> UIElement { if state.card_open { container( @@ -172,12 +190,12 @@ fn xml_download_popup(state: &State) -> UIElement { } } -/// Send packet button — enabled when Baton connection active, otherwise inert but visible. -fn send_packet_button(state: &State) -> UIElement { +/// Send packet button: enabled variant wires the message, disabled variant is inert. +fn send_packet_button(state: &State) -> Option { if state.active_baton_connection { - button("Send Packet").on_press(Message::SendPacket).into() + Some(button("Send Packet").on_press(Message::SendPacket).into()) } else { - button("Send Packet (No Baton Connection)").into() + Some(button("Send Packet (No Baton Connection)").into()) } } From 7fbb9a40241463a05049c438b50c5cbbcd447810 Mon Sep 17 00:00:00 2001 From: jthomas39 Date: Thu, 19 Feb 2026 16:35:27 -0600 Subject: [PATCH 6/6] I've made more changes to the code in the relay to make it more readable, let me know if there are any other changes I need to make to it, I looked at the plugin and it didn't look like it needed any changes for readability. But let me know what you think. --- relay/src/main.rs | 34 ++-------------------------------- relay/src/update.rs | 2 -- relay/src/view.rs | 13 ++++--------- 3 files changed, 6 insertions(+), 43 deletions(-) diff --git a/relay/src/main.rs b/relay/src/main.rs index 583d926..7471567 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -5,12 +5,9 @@ mod state; mod update; mod view; -// mod channel; -// use channel::ChannelMessage; - use self::{ // ipc::ipc_connection_loop, - message::{FromIpcThreadMessage, Message}, + message::Message, state::State, update::update, view::view, @@ -22,30 +19,6 @@ use iced::{ }; fn main() -> iced::Result { - // Communication channels between the main_gui_thread and the ipc_connection_thread - // tx_kill = transmit FROM main_gui_thread TO ipc_thread - // named txx_kill because the only thing it does rn is send a kill message to the thread. Can be renamed - //tcp connection - - // Connect to the server - - /* let (tx_to_parent_thread, rx_from_tcp_thread) = std::sync::mpsc::channel::(); - let tcp_connection = thread::spawn(move || match TcpStream::connect("127.0.0.1:7878") { - Ok(mut stream) => { - println!("Successfully connected."); - let message = ChannelMessage::Connect; - tx_to_parent_thread.send(message); - } - - Err(e) => { - println!("Connection failed: {}", e); - } - }); */ - - // let (tx_to_ipc_thread, rx_kill) = std::sync::mpsc::channel(); - // let (tx_to_parent_thread, rx_from_parent_thread) = std::sync::mpsc::channel(); - // let _ = tx.send(()); // temp - iced::application("RELAY", update, view) .window_size((450.0, 300.0)) .exit_on_close_request(false) @@ -61,8 +34,6 @@ fn main() -> iced::Result { "Error connecting to IPC during GUI initialization: {e:?}" )); }; - // state.tcp_connect().expect("TCP connection failure"); // may not need to panic, recoverable error - (state, Task::none()) }) } @@ -70,11 +41,10 @@ fn main() -> iced::Result { fn subscribe(_state: &State) -> iced::Subscription { use Message as M; - // Subscription for displaying elapsed time -- temporary + // Subscription for displaying elapsed time let time_sub = every(Duration::from_millis(10)).map(|_| M::Update); // Subscription to send a message when the window close button (big red X) is clicked. - // Needed to execute cleanup operations before actually shutting down, such as saving etc let window_close = iced::window::close_requests().map(|id| M::WindowCloseRequest(id)); // combine and return all subscriptions as one subscription to satisfy the return type diff --git a/relay/src/update.rs b/relay/src/update.rs index ba5372b..5228326 100644 --- a/relay/src/update.rs +++ b/relay/src/update.rs @@ -41,7 +41,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { let _ = state.tcp_disconnect(); state.active_baton_connection = false; } - _ => {} } } } @@ -154,7 +153,6 @@ pub(crate) fn update(state: &mut State, message: Message) -> Task { state.last_send_timestamp = Some(format!("{}", duration.as_secs())); Task::none() } - _ => Task::none(), } } diff --git a/relay/src/view.rs b/relay/src/view.rs index 6cb7f45..9a77381 100644 --- a/relay/src/view.rs +++ b/relay/src/view.rs @@ -52,7 +52,6 @@ pub(crate) fn view(state: &State) -> UIElement { column(elements).into() } -/// Error banner shown only when `state.error_message` is Some(...) fn spawn_error_message(state: &State) -> Option { state .error_message @@ -67,12 +66,10 @@ fn spawn_error_message(state: &State) -> Option { }) } -/// Elapsed time display fn elapsed_time_element(state: &State) -> UIElement { text(format!("Elapsed time: {:?}", state.elapsed_time)).into() } -/// Baton connection status display fn baton_connect_status_element(state: &State) -> UIElement { let connection_status = if state.active_baton_connection { ":) Baton Connected!".to_string() @@ -82,7 +79,6 @@ fn baton_connect_status_element(state: &State) -> UIElement { text(connection_status).into() } -/// Last baton payload (or placeholder) fn baton_data_element(state: &State) -> UIElement { let content = match &state.latest_baton_send { Some(data) => format!("[BATON]: {}", data), @@ -91,24 +87,23 @@ fn baton_data_element(state: &State) -> UIElement { text(content).into() } -/// TCP connection boolean status fn tcp_connect_status_element(state: &State) -> UIElement { text(format!("TCP Connection Status: {}", state.tcp_connected)).into() } /// Simple helper: a button which triggers the app to verify the TCP connection state. -fn check_tcp_status_button() -> UIElement { +fn check_tcp_status_button() -> UIElement<'static> { button("Check TCP Connection Status") .on_press(Message::ConnectionMessage) .into() } /// IPC connect / disconnect buttons -fn ipc_connect_button() -> UIElement { +fn ipc_connect_button() -> UIElement<'static> { button("Connect IPC").on_press(Message::ConnectIpc).into() } -fn ipc_disconnect_button() -> UIElement { +fn ipc_disconnect_button() -> UIElement<'static> { button("Disconnect IPC") .on_press(Message::DisconnectIpc) .into() @@ -151,7 +146,7 @@ fn tcp_connect_button(state: &State) -> UIElement { .into() } -fn tcp_disconnect_button() -> UIElement { +fn tcp_disconnect_button() -> UIElement<'static> { button("Disconnect TCP") .on_press(Message::DisconnectTcp) .into()