From 7be28b0b04b842f85531656b619679f185795cfb Mon Sep 17 00:00:00 2001 From: Sebastian Lyng Johansen Date: Fri, 3 Apr 2026 12:46:05 +0200 Subject: [PATCH 1/2] feat: implement attach functionality --- Cargo.lock | 218 +++++++++++++++++++++++++++++++++----------- Cargo.toml | 7 +- src/config.rs | 31 +++++-- src/libc.rs | 79 ++++++++++++++-- src/main.rs | 101 +++++++++++++------- src/project.rs | 120 +++++++++++++++++------- src/pty.rs | 137 ++++++++++++++++++++++++++++ src/pty_client.rs | 168 ++++++++++++++++++++++++++++++++++ src/pty_server.rs | 154 +++++++++++++++++++++++++++++++ tests/attach.rs | 198 ++++++++++++++++++++++++++++++++++++++++ tests/common/mod.rs | 8 +- tests/logs.rs | 59 ++++++------ 12 files changed, 1110 insertions(+), 170 deletions(-) create mode 100644 src/pty.rs create mode 100644 src/pty_client.rs create mode 100644 src/pty_server.rs create mode 100644 tests/attach.rs diff --git a/Cargo.lock b/Cargo.lock index 74d24cc..536b639 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "avt" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa0f99f7bcce0e99d842c94947f8d0ab5f6f3abc08424e1a4b58a8a7ae30f7c7" +dependencies = [ + "rgb", + "unicode-width", +] + [[package]] name = "bitflags" version = "2.6.0" @@ -119,6 +129,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + [[package]] name = "cfg-if" version = "1.0.0" @@ -282,9 +298,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" [[package]] name = "heck" @@ -310,9 +326,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" dependencies = [ "equivalent", "hashbrown", @@ -335,9 +351,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "libc" -version = "0.2.171" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "linux-raw-sys" @@ -383,13 +399,23 @@ dependencies = [ [[package]] name = "objc2-core-foundation" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daeaf60f25471d26948a1c2f840e3f7d86f4109e3af4e8e4b5cd70c39690d925" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" dependencies = [ "bitflags", ] +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "once_cell" version = "1.20.0" @@ -428,9 +454,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] @@ -473,6 +499,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "rgb" +version = "0.8.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b34b781b31e5d73e9fbc8689c70551fd1ade9a19e3e28cfec8580a79290cc4" +dependencies = [ + "bytemuck", +] + [[package]] name = "rustix" version = "0.38.34" @@ -503,18 +538,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.198" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -534,11 +579,11 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.5" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" dependencies = [ - "serde", + "serde_core", ] [[package]] @@ -549,9 +594,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.59" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", @@ -560,14 +605,15 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.34.2" +version = "0.38.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4b93974b3d3aeaa036504b8eefd4c039dced109171c1ae973f1dc63b2c7e4b2" +checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f" dependencies = [ "libc", "memchr", "ntapi", "objc2-core-foundation", + "objc2-io-kit", "windows", ] @@ -592,44 +638,55 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "toml" -version = "0.8.12" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ - "serde", + "indexmap", + "serde_core", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_parser", + "toml_writer", + "winnow", ] [[package]] name = "toml_datetime" -version = "0.6.5" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ - "serde", + "serde_core", ] [[package]] -name = "toml_edit" -version = "0.22.12" +name = "toml_parser" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "indexmap", - "serde", - "serde_spanned", - "toml_datetime", "winnow", ] +[[package]] +name = "toml_writer" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "utf8parse" version = "0.2.1" @@ -703,31 +760,54 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.57.0" +version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" dependencies = [ "windows-core", - "windows-targets", ] [[package]] name = "windows-core" -version = "0.57.0" +version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement", "windows-interface", + "windows-link", "windows-result", - "windows-targets", + "windows-strings", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", ] [[package]] name = "windows-implement" -version = "0.57.0" +version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", @@ -736,22 +816,47 @@ dependencies = [ [[package]] name = "windows-interface" -version = "0.57.0" +version = "0.59.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", "syn", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-result" -version = "0.1.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-targets", + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", ] [[package]] @@ -779,6 +884,15 @@ dependencies = [ "windows_x86_64_msvc", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.5" @@ -829,12 +943,9 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" -version = "0.6.6" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" -dependencies = [ - "memchr", -] +checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" [[package]] name = "worker" @@ -843,6 +954,7 @@ dependencies = [ "anyhow", "assert_cmd", "assert_fs", + "avt", "clap", "itertools", "libc", diff --git a/Cargo.toml b/Cargo.toml index a2eb30c..be1ed6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "worker" version = "0.1.0" -edition = "2021" +edition = "2024" [[bin]] name = "mock" @@ -15,9 +15,10 @@ clap = { version = "4.0.27", features = ["derive", "string"] } libc = "0.2.153" serde = { version = "1.0.198", features = ["derive"] } serde_json = "1.0.116" -sysinfo = "0.34.2" -toml = "0.8.12" +sysinfo = "0.38.4" +toml = "1.1.2" itertools = "0.14.0" +avt = "0.17.0" [dev-dependencies] assert_cmd = "2.0" diff --git a/src/config.rs b/src/config.rs index 9e1dd11..925cd41 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,18 @@ -use std::{fs::File, path::PathBuf, str::FromStr}; +use std::{ + collections::hash_map::DefaultHasher, + fs::File, + hash::{Hash, Hasher}, + path::PathBuf, + str::FromStr, +}; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use itertools::{Either, Itertools}; use serde::Deserialize; use crate::{ - project::{Project, RunningProject, WorkerProject}, ActionArg, ActionArgRunning, + project::{Project, RunningProject, WorkerProject}, }; const CONFIG_FILE: &str = ".worker.toml"; @@ -93,7 +99,7 @@ impl FromStr for ActionArgRunning { pub struct WorkerConfig { pub projects: Vec, state_dir: PathBuf, - log_dir: PathBuf, + sock_dir: PathBuf, } impl WorkerConfig { @@ -102,10 +108,10 @@ impl WorkerConfig { let config_string = std::fs::read_to_string(base_dir.join(CONFIG_FILE))?; let state_dir = base_dir.join(".worker/state"); - let log_dir = base_dir.join(".worker/log"); + let sock_dir = base_dir.join(".worker/sock"); std::fs::create_dir_all(&state_dir)?; - std::fs::create_dir_all(&log_dir)?; + std::fs::create_dir_all(&sock_dir)?; // Deserialize the TOML string into the Config struct let config: Config = toml::from_str(&config_string)?; @@ -113,12 +119,19 @@ impl WorkerConfig { Ok(Self { projects: config.project, state_dir, - log_dir, + sock_dir, }) } - pub fn log_file(&self, project: &T) -> PathBuf { - self.log_dir.join(project.name()) + pub fn sock_file(&self, project: &T) -> PathBuf { + // Unix socket paths are limited to ~104 chars on macOS. + // Use a hash of the full sock_dir + project name to create a short + // but unique path under /tmp. + let full_path = self.sock_dir.join(project.name()); + let mut hasher = DefaultHasher::new(); + full_path.hash(&mut hasher); + let hash = hasher.finish(); + PathBuf::from(format!("/tmp/worker-{:016x}.sock", hash)) } pub fn get_state(&self, name: &str) -> Result, anyhow::Error> { diff --git a/src/libc.rs b/src/libc.rs index 6109b04..90a4b3b 100644 --- a/src/libc.rs +++ b/src/libc.rs @@ -1,5 +1,6 @@ +use std::os::fd::{AsRawFd, RawFd}; + use serde::{Deserialize, Serialize}; -use sysinfo::System; pub enum Fork { Parent(libc::pid_t), @@ -33,6 +34,12 @@ pub fn waitpid(pid: i32) -> Result { } } +/// Returns true if the child has exited. +pub fn has_child_exited(pid: i32) -> bool { + let mut status: i32 = 0; + unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) > 0 } +} + pub fn stop_pg(sid: i32, signal: &Signal) -> Result<(), i32> { match unsafe { libc::killpg(sid, signal.to_owned() as i32) } { 0 => Ok(()), @@ -40,13 +47,69 @@ pub fn stop_pg(sid: i32, signal: &Signal) -> Result<(), i32> { } } -pub fn has_processes_running(sid: libc::pid_t) -> bool { - let mut sys = System::new(); - sys.refresh_all(); - sys.processes().iter().any(|(_, p)| { - p.session_id() - .is_some_and(|session_id| session_id.as_u32() == sid as u32) - }) +pub fn dup2(src: i32, dst: i32) -> i32 { + unsafe { libc::dup2(src, dst) } +} + +pub fn signal(signum: i32, handler: usize) -> usize { + unsafe { libc::signal(signum, handler) } +} + +pub fn set_nonblocking(fd: RawFd) { + unsafe { + let flags = libc::fcntl(fd, libc::F_GETFL); + libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK); + } +} + +#[derive(Default)] +pub struct PollSet { + fds: Vec, +} + +pub enum PollResult { + Ready, + Timeout, + Interrupted, + Error, +} + +impl PollSet { + pub fn add(&mut self, fd: &impl AsRawFd) -> usize { + let idx = self.fds.len(); + self.fds.push(libc::pollfd { + fd: fd.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }); + idx + } + + pub fn wait(&mut self, timeout_ms: i32) -> PollResult { + let ret = unsafe { + libc::poll( + self.fds.as_mut_ptr(), + self.fds.len() as libc::nfds_t, + timeout_ms, + ) + }; + match ret { + _ if ret > 0 => PollResult::Ready, + 0 => PollResult::Timeout, + _ if std::io::Error::last_os_error().raw_os_error() == Some(libc::EINTR) => { + PollResult::Interrupted + } + _ => PollResult::Error, + } + } + + pub fn is_readable(&self, index: usize) -> bool { + self.fds[index].revents & libc::POLLIN != 0 + } + + pub fn is_hungup(&self, index: usize) -> bool { + self.fds[index].revents & libc::POLLHUP != 0 && self.fds[index].revents & libc::POLLIN == 0 + } } #[derive(Deserialize, Clone, Debug, Serialize, Hash, PartialEq, Eq)] diff --git a/src/main.rs b/src/main.rs index f939ada..e17e9c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,28 @@ use std::time::{Duration, Instant}; -use clap::{command, ArgGroup, Parser}; +use clap::{ArgGroup, Parser, command}; use config::WorkerConfig; use itertools::Itertools; use project::Project; -use crate::project::RunningProject; +use crate::{ + project::RunningProject, + pty::Message, + pty_client::{DetachReason, PtyClient}, +}; pub mod config; pub mod libc; pub mod project; - -fn start(config: &WorkerConfig, projects: Vec) -> Result<(), anyhow::Error> { +pub mod pty; +pub mod pty_client; +pub mod pty_server; + +fn start( + config: &WorkerConfig, + projects: Vec, + verbose: bool, +) -> Result<(), anyhow::Error> { let (running, not_running) = config.partition_projects(projects)?; for project in running { @@ -19,7 +30,22 @@ fn start(config: &WorkerConfig, projects: Vec) -> Result<(), anyhow::Er } for project in not_running { + if verbose { + eprintln!("Starting: {}", project.name); + eprintln!(" Command: {:?}", project.command); + eprintln!(" Cwd: {}", project.cwd); + eprintln!(" Socket: {}", config.sock_file(&project).display()); + if let Some(ref envs) = project.envs { + eprintln!(" Envs: {:?}", envs); + } + } + project.start(config)?; + + if verbose { + let client = PtyClient::connect(&config.sock_file(&project))?; + client.event_loop(true); + } } Ok(()) @@ -50,7 +76,11 @@ fn stop(config: &WorkerConfig, projects: Vec) -> Result<(), anyh fn restart(config: &WorkerConfig, projects: Vec) -> Result<(), anyhow::Error> { stop(config, projects.clone())?; - start(config, projects.into_iter().map(|p| p.into()).collect())?; + start( + config, + projects.into_iter().map(|p| p.into()).collect(), + false, + )?; Ok(()) } @@ -63,6 +93,24 @@ fn run(config: &WorkerConfig, project: Project) -> Result<(), anyhow::Error> { Ok(()) } +fn attach(config: &WorkerConfig, args: AttachArgs) -> Result<(), anyhow::Error> { + let mut client = PtyClient::connect(&config.sock_file(&args.project))?; + client.send(Message::DumpScreen); + + let reason = client.event_loop(false); + + let msg = match reason { + DetachReason::UserDetach => "Detached from process (Ctrl+D)", + DetachReason::ProcessExited => "Process exited", + DetachReason::ConnectionLost => "Connection lost", + }; + // Print on stdout (same fd as PTY output) so they don't interleave. + // Clear the current line in case output ended mid-line. + println!("\x1b[2K\r{}", msg); + + Ok(()) +} + fn status(config: &WorkerConfig, args: StatusArgs) -> Result<(), anyhow::Error> { for project in config.running()? { if args.quiet { @@ -88,26 +136,9 @@ fn list(config: &WorkerConfig, args: ListArgs) -> Result<(), anyhow::Error> { } fn logs(config: &WorkerConfig, args: LogsArgs) -> Result<(), anyhow::Error> { - let mut cmd = std::process::Command::new("tail"); - - if args.follow { - cmd.arg("-f"); - } - - let mut child = cmd - .args(["-n", &args.number.to_string()]) - .arg(config.log_file(&args.project)) - .spawn()?; - - if args.follow { - while args.project.is_running() { - std::thread::sleep(Duration::from_secs(2)); - } - child.kill()?; - } else { - child.wait()?; - } - + let mut client = PtyClient::connect(&config.sock_file(&args.project))?; + client.send(Message::DumpScreen); + client.event_loop(true); Ok(()) } @@ -163,6 +194,10 @@ struct StartArgs { conflicts_with = "projects" )] name: Option, + + /// Print debug info and tail the log after starting + #[arg(short, long)] + verbose: bool, } #[derive(Debug, Parser)] @@ -200,11 +235,6 @@ struct RunArgs { #[derive(Debug, Parser)] struct LogsArgs { project: RunningProject, - #[arg(short, long)] - follow: bool, - - #[arg(short, long = "lines", default_value = "50")] - number: i32, } #[derive(Debug, Parser)] @@ -219,6 +249,11 @@ struct ListArgs { quiet: bool, } +#[derive(Debug, Parser)] +struct AttachArgs { + project: RunningProject, +} + #[derive(Parser, Debug)] enum SubCommands { /// Start the specified project(s). E.g. `worker start foo bar` @@ -235,6 +270,8 @@ enum SubCommands { List(ListArgs), /// Print out logs for the specified project. Logs(LogsArgs), + /// Attach to a running process (Ctrl+D to detach) + Attach(AttachArgs), } #[derive(Parser, Debug)] @@ -261,13 +298,14 @@ fn main() -> Result<(), anyhow::Error> { match cli.subcommand { SubCommands::Start(args) => { + let verbose = args.verbose; let projects = match (args.projects, args.name, args.cmd) { (Some(projects), None, None) => unique(projects), (None, Some(name), Some(command)) => vec![Project::from_cmd(name, command)], _ => unreachable!("Only one of project or command should be specified"), }; - start(&config, projects)? + start(&config, projects, verbose)? } SubCommands::Stop(args) => stop( &config, @@ -303,6 +341,7 @@ fn main() -> Result<(), anyhow::Error> { SubCommands::Status(args) => status(&config, args)?, SubCommands::List(args) => list(&config, args)?, SubCommands::Logs(args) => logs(&config, args)?, + SubCommands::Attach(args) => attach(&config, args)?, } Ok(()) diff --git a/src/project.rs b/src/project.rs index 38216a6..4c12a72 100644 --- a/src/project.rs +++ b/src/project.rs @@ -1,21 +1,21 @@ use std::{ collections::HashMap, - fs::OpenOptions, hash::Hash, - os::{ - fd::{FromRawFd, IntoRawFd}, - unix::process::CommandExt, - }, + io::{Read, Write}, + os::{fd::AsRawFd, unix::process::CommandExt}, process::Stdio, str::FromStr, }; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; +use sysinfo::System; use crate::{ config::WorkerConfig, - libc::{fork, has_processes_running, setsid, stop_pg, waitpid, Fork, Signal}, + libc::{Fork, Signal, dup2, fork, setsid, signal, stop_pg, waitpid}, + pty::{self}, + pty_server::PtyServer, }; /// Project deserialized from config file @@ -65,35 +65,88 @@ impl Project { pub fn start(&self, config: &WorkerConfig) -> Result<(), anyhow::Error> { self.start_dependencies(config)?; + let sock_path = config.sock_file(self); + // Capture terminal size before forking — after fork, stdio is /dev/null + let (rows, cols) = pty::get_terminal_size(); + match fork().expect("Couldn't fork") { Fork::Parent(p) => { waitpid(p).unwrap(); } Fork::Child => { - let sid = setsid().expect("Couldn't setsid"); - config.store_state(sid, self)?; + // Pipe so shepherd can signal readiness before intermediate exits + let (mut pipe_r, mut pipe_w) = std::io::pipe().expect("Failed to create pipe"); - match fork().expect("Couldn't fork inner") { - Fork::Parent(_) => std::process::exit(0), + match fork().expect("Couldn't fork shepherd") { + Fork::Parent(_) => { + // Intermediate: wait for shepherd to be ready, then exit + drop(pipe_w); + let _ = pipe_r.read(&mut [0u8; 1]); + drop(pipe_r); + std::process::exit(0); + } Fork::Child => { - // Create a raw filedescriptor to use to merge stdout and stderr - let fd = OpenOptions::new() + // Shepherd process + drop(pipe_r); + // Redirect inherited stdio to /dev/null so we don't + // hold open any pipes from the parent (e.g. test harness). + if let Ok(devnull) = std::fs::OpenOptions::new() + .read(true) .write(true) - .truncate(true) - .create(true) - .open(config.log_file(self))? - .into_raw_fd(); - - let _ = std::process::Command::new(&self.command[0]) - .args(&self.command[1..]) - .envs(self.envs.clone().unwrap_or_default()) - .current_dir(&self.cwd) - .stdout(unsafe { Stdio::from_raw_fd(fd) }) - .stderr(unsafe { Stdio::from_raw_fd(fd) }) - .stdin(Stdio::null()) - .exec(); + .open("/dev/null") + { + dup2(devnull.as_raw_fd(), libc::STDIN_FILENO); + dup2(devnull.as_raw_fd(), libc::STDOUT_FILENO); + dup2(devnull.as_raw_fd(), libc::STDERR_FILENO); + } + + let sid = setsid().expect("Couldn't setsid"); + let mut pty = pty::Pty::open(rows, cols).expect("Failed to open PTY"); + + // Make the PTY slave the controlling terminal of this session + unsafe { + libc::ioctl( + pty.slave.as_raw_fd(), + libc::TIOCSCTTY as libc::c_ulong, + 0i32, + ); + } + + config + .store_state(sid, self) + .expect("Failed to store state"); + + match fork().expect("Couldn't fork command") { + Fork::Parent(child_pid) => { + drop(pty.slave); + // Ignore signals generated by the PTY line discipline + signal(libc::SIGINT, libc::SIG_IGN); + signal(libc::SIGQUIT, libc::SIG_IGN); + signal(libc::SIGTSTP, libc::SIG_IGN); + let server = PtyServer::bind(&sock_path)?; + // Signal readiness to intermediate + let _ = pipe_w.write_all(&[1u8]); + drop(pipe_w); + server.event_loop(&mut pty.master, child_pid); + std::process::exit(0); + } + Fork::Child => { + drop(pty.master); + drop(pipe_w); + dup2(pty.slave.as_raw_fd(), libc::STDIN_FILENO); + dup2(pty.slave.as_raw_fd(), libc::STDOUT_FILENO); + dup2(pty.slave.as_raw_fd(), libc::STDERR_FILENO); + drop(pty.slave); + + let _ = std::process::Command::new(&self.command[0]) + .args(&self.command[1..]) + .envs(self.envs.clone().unwrap_or_default()) + .current_dir(&self.cwd) + .exec(); + } + } } - }; + } } }; @@ -135,10 +188,6 @@ impl RunningProject { let signal = self.stop_signal.as_ref().unwrap_or(&Signal::SIGINT); stop_pg(self.pid, signal).map_err(|e| anyhow!("Error trying to stop project: {e}")) } - - pub fn is_running(&self) -> bool { - has_processes_running(self.pid) - } } impl From for Project { @@ -226,7 +275,14 @@ impl FromStr for RunningProject { .get_state(s)? .ok_or_else(|| anyhow!("Project {} is not running or does not exist", s))?; - if !has_processes_running(project.pid) { + let mut sys = System::new(); + sys.refresh_all(); + let has_processes_running = sys.processes().iter().any(|(_, p)| { + p.session_id() + .is_some_and(|session_id| session_id.as_u32() == project.pid as u32) + }); + + if !has_processes_running { return Err(anyhow!("{} is not running", project)); } diff --git a/src/pty.rs b/src/pty.rs new file mode 100644 index 0000000..e6103cf --- /dev/null +++ b/src/pty.rs @@ -0,0 +1,137 @@ +use std::{ + fs::File, + os::fd::{AsRawFd, FromRawFd}, +}; + +pub struct Pty { + pub master: File, + pub slave: File, +} + +impl Pty { + pub fn open(rows: u16, cols: u16) -> Result { + let master = unsafe { + let fd = libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY); + anyhow::ensure!(fd >= 0, "posix_openpt failed"); + File::from_raw_fd(fd) + }; + + unsafe { + anyhow::ensure!(libc::grantpt(master.as_raw_fd()) == 0, "grantpt failed"); + anyhow::ensure!(libc::unlockpt(master.as_raw_fd()) == 0, "unlockpt failed"); + let slave_name = libc::ptsname(master.as_raw_fd()); + anyhow::ensure!(!slave_name.is_null(), "ptsname failed"); + let fd = libc::open(slave_name, libc::O_RDWR); + anyhow::ensure!(fd >= 0, "failed to open slave pty"); + let slave = File::from_raw_fd(fd); + + set_window_size(&master, rows, cols); + + Ok(Pty { master, slave }) + } + } +} + +pub fn set_window_size(fd: &File, rows: u16, cols: u16) { + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &ws) }; +} + +pub fn get_terminal_size() -> (u16, u16) { + unsafe { + let mut ws: libc::winsize = std::mem::zeroed(); + if libc::ioctl(libc::STDOUT_FILENO, libc::TIOCGWINSZ, &mut ws) == 0 { + (ws.ws_row, ws.ws_col) + } else { + (24, 80) + } + } +} + +/// Protocol messages between PtyClient and PtyServer. +pub enum Message { + /// Resize the PTY. + Resize { rows: u16, cols: u16 }, + /// Request the current screen state with escape sequences and colors. + DumpScreen, + /// Request the current screen state as plain text. + DumpText, + /// Detach from the session. + Detach, + /// Forward input to the PTY. + Input(Vec), +} + +impl Message { + const RESIZE: u8 = 0x01; + const DUMP_SCREEN: u8 = 0x02; + const DUMP_TEXT: u8 = 0x03; + const CTRL_D: u8 = 0x04; + + pub fn encode(&self) -> Vec { + match self { + Message::Resize { rows, cols } => { + let mut buf = vec![Self::RESIZE]; + buf.extend_from_slice(&rows.to_be_bytes()); + buf.extend_from_slice(&cols.to_be_bytes()); + buf + } + Message::DumpScreen => vec![Self::DUMP_SCREEN], + Message::DumpText => vec![Self::DUMP_TEXT], + Message::Detach => vec![Self::CTRL_D], + Message::Input(data) => data.clone(), + } + } + + pub fn decode(buf: &[u8]) -> Self { + if buf.contains(&Self::CTRL_D) { + return Message::Detach; + } + if buf.len() == 5 && buf[0] == Self::RESIZE { + let rows = u16::from_be_bytes([buf[1], buf[2]]); + let cols = u16::from_be_bytes([buf[3], buf[4]]); + return Message::Resize { rows, cols }; + } + if buf.len() == 1 && buf[0] == Self::DUMP_SCREEN { + return Message::DumpScreen; + } + if buf.len() == 1 && buf[0] == Self::DUMP_TEXT { + return Message::DumpText; + } + Message::Input(buf.to_vec()) + } +} + +/// Enter raw mode if stdin is a terminal. Returns None if stdin is not a terminal. +pub fn enter_raw_mode() -> Option { + unsafe { + if libc::isatty(libc::STDIN_FILENO) == 0 { + return None; + } + let mut original: libc::termios = std::mem::zeroed(); + if libc::tcgetattr(libc::STDIN_FILENO, &mut original) != 0 { + return None; + } + let mut raw = original; + libc::cfmakeraw(&mut raw); + if libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, &raw) != 0 { + return None; + } + Some(original) + } +} + +pub fn restore_terminal(original: &libc::termios) { + unsafe { + libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, original); + } + // Reset terminal state that escape sequences (not termios) control, + // e.g. cursor visibility, alternate screen buffer + let _ = std::io::Write::write_all(&mut std::io::stdout(), b"\x1b[?25h\x1b[?1049l"); + let _ = std::io::Write::flush(&mut std::io::stdout()); +} diff --git a/src/pty_client.rs b/src/pty_client.rs new file mode 100644 index 0000000..feb925c --- /dev/null +++ b/src/pty_client.rs @@ -0,0 +1,168 @@ +use std::io::{Read, Write}; +use std::os::fd::{AsRawFd, RawFd}; +use std::os::unix::net::UnixStream; +use std::path::Path; + +use crate::libc::{PollResult, PollSet}; +use crate::pty::{self, Message, get_terminal_size}; + +static SIGWINCH_PIPE: std::sync::OnceLock = std::sync::OnceLock::new(); + +extern "C" fn sigwinch_handler(_: libc::c_int) { + if let Some(&fd) = SIGWINCH_PIPE.get() { + unsafe { libc::write(fd, [1u8].as_ptr() as *const libc::c_void, 1) }; + } +} + +const CTRL_C: u8 = 0x03; +const CTRL_D: u8 = 0x04; + +pub enum DetachReason { + UserDetach, + ProcessExited, + ConnectionLost, +} + +pub enum ClientEvent<'a> { + /// stdin has data ready to read. + Input(&'a [u8]), + /// PTY output received from the shepherd. + Output(&'a [u8]), + /// The shepherd disconnected (process exited). + Disconnected, +} + +pub struct PtyClient { + stream: UnixStream, + sigwinch_r: std::io::PipeReader, +} + +impl PtyClient { + pub fn connect(path: &Path) -> std::io::Result { + let stream = UnixStream::connect(path)?; + let (rows, cols) = get_terminal_size(); + let mut client = PtyClient::try_from(stream)?; + client.send(Message::Resize { rows, cols }); + client.stream.set_nonblocking(true)?; + Ok(client) + } + + pub fn event_loop(mut self, readonly: bool) -> DetachReason { + let original_termios = crate::pty::enter_raw_mode(); + + let mut buf = [0u8; 4096]; + let reason = loop { + match self.next_event(&mut buf) { + ClientEvent::Input(data) => { + if data.contains(&CTRL_D) || (readonly && data.contains(&CTRL_C)) { + break DetachReason::UserDetach; + } + if !readonly && self.write_all(data).is_err() { + break DetachReason::ConnectionLost; + } + } + ClientEvent::Output(data) => { + if std::io::stdout().write_all(data).is_err() { + break DetachReason::ConnectionLost; + } + let _ = std::io::stdout().flush(); + } + ClientEvent::Disconnected => break DetachReason::ProcessExited, + } + }; + + if let Some(ref original) = original_termios { + pty::restore_terminal(original); + } + + reason + } + + pub fn send(&mut self, msg: Message) { + let _ = self.stream.write_all(&msg.encode()); + } + + /// Poll for the next event. + /// Handles resize signals internally. Returns when stdin has data, + /// the stream has output, or the connection drops. + fn next_event<'a>(&mut self, buf: &'a mut [u8]) -> ClientEvent<'a> { + let stdin = std::io::stdin(); + + loop { + let mut poll_set = PollSet::default(); + let stdin_idx = poll_set.add(&stdin); + let stream_idx = poll_set.add(&self.stream); + let sigwinch_idx = poll_set.add(&self.sigwinch_r); + + match poll_set.wait(1000) { + PollResult::Interrupted | PollResult::Timeout => continue, + PollResult::Error => return ClientEvent::Disconnected, + PollResult::Ready => {} + } + + // Handle resize internally + if poll_set.is_readable(sigwinch_idx) { + let _ = self.sigwinch_r.read(&mut [0u8; 16]); + let (rows, cols) = get_terminal_size(); + self.send(Message::Resize { rows, cols }); + continue; + } + + // Check stream before stdin — ensures server-sent data + // (like log replay) is received before stdin is processed + if poll_set.is_readable(stream_idx) { + return match self.stream.read(buf) { + Ok(0) => ClientEvent::Disconnected, + Ok(n) => ClientEvent::Output(&buf[..n]), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(_) => ClientEvent::Disconnected, + }; + } + + if poll_set.is_hungup(stream_idx) { + return ClientEvent::Disconnected; + } + + if poll_set.is_readable(stdin_idx) { + return match stdin.lock().read(buf) { + Ok(n) if n > 0 => ClientEvent::Input(&buf[..n]), + _ => continue, + }; + } + } + } +} + +impl Read for PtyClient { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.stream.read(buf) + } +} + +impl Write for PtyClient { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.stream.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.stream.flush() + } +} + +impl AsRawFd for PtyClient { + fn as_raw_fd(&self) -> RawFd { + self.stream.as_raw_fd() + } +} + +impl TryFrom for PtyClient { + type Error = std::io::Error; + + fn try_from(stream: UnixStream) -> Result { + let (sigwinch_r, sigwinch_w) = std::io::pipe().expect("Failed to create SIGWINCH pipe"); + SIGWINCH_PIPE.set(sigwinch_w.as_raw_fd()).ok(); + std::mem::forget(sigwinch_w); + unsafe { libc::signal(libc::SIGWINCH, sigwinch_handler as usize) }; + Ok(Self { stream, sigwinch_r }) + } +} diff --git a/src/pty_server.rs b/src/pty_server.rs new file mode 100644 index 0000000..c44ee5c --- /dev/null +++ b/src/pty_server.rs @@ -0,0 +1,154 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::Path; + +use crate::libc::{PollResult, PollSet, has_child_exited}; +use crate::pty::{Message, get_terminal_size, set_window_size}; + +pub enum ServerEvent<'a> { + /// PTY master produced output. + Output(&'a [u8]), + /// Client sent input — write to PTY master. + ClientInput(Vec), + /// Client requested a resize — resize the PTY. + ClientResize(u16, u16), + /// PTY master closed or child process exited — shepherd should stop. + Done, +} + +pub struct PtyServer { + listener: UnixListener, + clients: Vec, + client_buf: [u8; 4096], + vt: avt::Vt, +} + +impl PtyServer { + pub fn bind(path: &Path) -> std::io::Result { + let _ = std::fs::remove_file(path); + let listener = UnixListener::bind(path)?; + listener.set_nonblocking(true)?; + let (rows, cols) = get_terminal_size(); + Ok(PtyServer { + listener, + clients: Vec::new(), + client_buf: [0u8; 4096], + vt: avt::Vt::builder() + .size(cols as usize, rows as usize) + .scrollback_limit(10000) + .build(), + }) + } + + pub fn event_loop(mut self, master: &mut File, child_pid: i32) { + let mut buf = [0u8; 4096]; + + loop { + match self.next_event(master, child_pid, &mut buf) { + ServerEvent::Output(data) => { + self.vt.feed_str(&String::from_utf8_lossy(data)); + self.clients.retain_mut(|c| c.write_all(data).is_ok()); + } + ServerEvent::ClientInput(data) => { + let _ = master.write_all(&data); + } + ServerEvent::ClientResize(rows, cols) => { + set_window_size(master, rows, cols); + self.vt.resize(cols as usize, rows as usize); + } + ServerEvent::Done => break, + } + } + } + + fn next_event<'a>( + &mut self, + master: &mut File, + child_pid: i32, + buf: &'a mut [u8], + ) -> ServerEvent<'a> { + loop { + if has_child_exited(child_pid) { + return ServerEvent::Done; + } + + let mut poll_set = PollSet::default(); + let master_idx = poll_set.add(master); + let listener_idx = poll_set.add(&self.listener); + let client_indices: Vec<_> = self.clients.iter().map(|c| poll_set.add(c)).collect(); + + match poll_set.wait(500) { + PollResult::Interrupted | PollResult::Timeout => continue, + PollResult::Error => return ServerEvent::Done, + PollResult::Ready => {} + } + + if poll_set.is_readable(master_idx) { + return match master.read(buf) { + Ok(0) | Err(_) => ServerEvent::Done, + Ok(n) => ServerEvent::Output(&buf[..n]), + }; + } + if poll_set.is_hungup(master_idx) { + return ServerEvent::Done; + } + if poll_set.is_readable(listener_idx) { + self.accept(master); + } + + // Check each client for data + for (i, &idx) in client_indices.iter().enumerate() { + if poll_set.is_readable(idx) || poll_set.is_hungup(idx) { + match Self::read_client_message(&mut self.clients[i], &mut self.client_buf) { + Message::Input(data) => return ServerEvent::ClientInput(data), + Message::Resize { rows, cols } => { + return ServerEvent::ClientResize(rows, cols); + } + Message::DumpScreen => { + let screen = self.vt.dump().replace('\u{9b}', "\x1b["); + let _ = self.clients[i].write_all(screen.as_bytes()); + } + Message::DumpText => { + for line in self.vt.text() { + if !line.is_empty() { + let _ = self.clients[i].write_all(line.as_bytes()); + let _ = self.clients[i].write_all(b"\r\n"); + } + } + } + Message::Detach => { + self.clients.remove(i); + break; + } + } + } + } + } + } + + fn accept(&mut self, master: &File) { + let Ok((mut stream, _)) = self.listener.accept() else { + return; + }; + + // Read the initial resize (blocking, before set_nonblocking) + let mut buf = [0u8; 5]; + if let Ok(n) = stream.read(&mut buf) { + if let Message::Resize { rows, cols } = Message::decode(&buf[..n]) { + set_window_size(master, rows, cols); + self.vt.resize(cols as usize, rows as usize); + } + } + + stream.set_nonblocking(true).ok(); + self.clients.push(stream); + } + + fn read_client_message(client: &mut UnixStream, buf: &mut [u8]) -> Message { + match client.read(buf) { + Ok(0) | Err(_) => Message::Detach, + Ok(n) => Message::decode(&buf[..n]), + } + } +} diff --git a/tests/attach.rs b/tests/attach.rs new file mode 100644 index 0000000..9abdbf1 --- /dev/null +++ b/tests/attach.rs @@ -0,0 +1,198 @@ +use std::time::Duration; + +use common::{WorkerTestConfig, WorkerTestProject}; +use uuid::Uuid; + +mod common; + +#[test] +fn test_attach_project_not_running() { + let worker = WorkerTestConfig::new(); + + let project_name = worker.project_name(&WorkerTestProject::One); + + let mut cmd = worker.attach(&[&project_name]); + cmd.assert().failure(); +} + +#[test] +fn test_attach_shows_previous_output() { + let worker = WorkerTestConfig::new(); + + let project_name = worker.project_name(&WorkerTestProject::One); + + let mut cmd = worker.start(&[&project_name]); + cmd.assert().success(); + + // Give the process time to produce output + std::thread::sleep(Duration::from_millis(500)); + + // Attach — server sends screen state on connect, then timeout + let mut cmd = worker.attach(&[&project_name]); + cmd.timeout(Duration::from_secs(2)); + let output = cmd.output().expect("Failed to run attach"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains("Hello from mock!"), + "Expected previous output in attach stdout, got: {}", + stdout + ); +} + +#[test] +fn test_attach_detach_process_still_running() { + let worker = WorkerTestConfig::new(); + + let project_name = worker.project_name(&WorkerTestProject::One); + + let mut cmd = worker.start(&[&project_name]); + cmd.assert().success(); + + // Attach and immediately detach with Ctrl+D + let mut cmd = worker.attach(&[&project_name]); + let output = cmd + .write_stdin([0x04]) + .output() + .expect("Failed to run attach"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains("Detached"), + "Expected detach message in stdout, got: {}", + stdout + ); + + // Verify the process is still running after detach + assert_eq!(worker.pids(&project_name).unwrap().len(), 1); +} + +#[test] +fn test_attach_command_success() { + let worker = WorkerTestConfig::new(); + + let uuid = Uuid::new_v4(); + let echo_cmd = format!("echo 'attach-test-{}' && sleep 5", uuid); + + let mut cmd = worker.start(&["-n", &uuid.to_string(), "-c", &echo_cmd]); + cmd.assert().success(); + + // Give the process time to produce output + std::thread::sleep(Duration::from_millis(500)); + + // Attach — server sends screen state, then timeout + let mut cmd = worker.attach(&[&uuid.to_string()]); + cmd.timeout(Duration::from_secs(2)); + let output = cmd.output().expect("Failed to run attach"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains(&format!("attach-test-{}", uuid)), + "Expected command output in attach stdout, got: {}", + stdout + ); +} + +#[test] +fn test_attach_process_exits_while_attached() { + let worker = WorkerTestConfig::new(); + + let uuid = Uuid::new_v4(); + let echo_cmd = format!("echo 'short-lived-{}' && sleep 1", uuid); + + let mut cmd = worker.start(&["-n", &uuid.to_string(), "-c", &echo_cmd]); + cmd.assert().success(); + + // Attach with Ctrl+D queued — process will exit after ~1s + let mut cmd = worker.attach(&[&uuid.to_string()]); + cmd.timeout(Duration::from_secs(5)); + let output = cmd + .write_stdin([0x04]) + .output() + .expect("Failed to run attach"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains("Process exited") || stdout.contains("Detached"), + "Expected exit or detach message, got: {}", + stdout + ); +} + +#[test] +fn test_attach_sends_terminal_size() { + let worker = WorkerTestConfig::new(); + + let uuid = Uuid::new_v4(); + // Process that prints its terminal size repeatedly (bounded to avoid leaking) + let cmd_str = format!( + "echo 'ready-{}'; for i in $(seq 1 30); do stty size; sleep 0.2; done", + uuid + ); + + let mut cmd = worker.start(&["-n", &uuid.to_string(), "-c", &cmd_str]); + cmd.assert().success(); + + // Give the process time to start + std::thread::sleep(Duration::from_millis(500)); + + // Attach — sends terminal size (24x80 in test env) in the handshake. + // Then detach. + let mut cmd = worker.attach(&[&uuid.to_string()]); + cmd.write_stdin([0x04]) + .output() + .expect("Failed to run attach"); + + // Wait a bit then attach again — the size output should be in the screen state + std::thread::sleep(Duration::from_millis(500)); + + let mut cmd = worker.attach(&[&uuid.to_string()]); + cmd.timeout(Duration::from_secs(2)); + let output = cmd.output().expect("Failed to run attach"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains("24 80"), + "Expected terminal size '24 80' in output, got: {}", + stdout + ); + + worker.stop(&[&uuid.to_string()]).assert().success(); +} + +#[test] +fn test_attach_multiple_clients() { + let worker = WorkerTestConfig::new(); + + let project_name = worker.project_name(&WorkerTestProject::One); + + let mut cmd = worker.start(&[&project_name]); + cmd.assert().success(); + + // Give the process time to produce output + std::thread::sleep(Duration::from_millis(500)); + + // Attach first client — let it run briefly + let mut cmd1 = worker.attach(&[&project_name]); + cmd1.timeout(Duration::from_secs(2)); + let output1 = cmd1.output().expect("Failed to run first attach"); + let stdout1 = String::from_utf8_lossy(&output1.stdout); + + // Attach second client — let it run briefly + let mut cmd2 = worker.attach(&[&project_name]); + cmd2.timeout(Duration::from_secs(2)); + let output2 = cmd2.output().expect("Failed to run second attach"); + let stdout2 = String::from_utf8_lossy(&output2.stdout); + + // Both clients should have received output + assert!( + stdout1.contains("Hello from mock!"), + "First client should have received output, got: {}", + stdout1 + ); + assert!( + stdout2.contains("Hello from mock!"), + "Second client should have received output, got: {}", + stdout2 + ); +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ad683e6..371f03f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use std::fs::DirEntry; -use assert_cmd::{cargo::cargo_bin, Command}; +use assert_cmd::{Command, cargo::cargo_bin}; use serde::Deserialize; use sysinfo::{Pid, System}; use tempfile::TempDir; @@ -39,7 +39,7 @@ pub enum WorkerTestGroup { } pub struct WorkerTestConfig { - dir: TempDir, + pub dir: TempDir, cmds: [String; 6], names: [Uuid; 6], groups: [Uuid; 2], @@ -164,6 +164,10 @@ impl WorkerTestConfig { self.run_cmd("status", None) } + pub fn attach(&self, project: &[&str]) -> Command { + self.run_cmd("attach", Some(project)) + } + // Depends on `new()`. Used for asserting that the projects have actually started pub fn group_projects(&self, group: &WorkerTestProject) -> &[WorkerTestProject; 2] { match group { diff --git a/tests/logs.rs b/tests/logs.rs index 0a901a0..122635e 100644 --- a/tests/logs.rs +++ b/tests/logs.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use common::{WorkerTestConfig, WorkerTestProject}; use uuid::Uuid; @@ -24,21 +24,19 @@ fn test_logs_success() { let mut cmd = worker.start(&[&project_name]); cmd.assert().success(); - let timeout = Duration::new(1, 0); - let start = Instant::now(); - - // Try multiple times since it may not output immediately - while Instant::now().duration_since(start) < timeout { - let mut cmd = worker.logs(&[&project_name]); - cmd.assert().success(); - - let output = &cmd.output().unwrap().stdout; - let stdout = std::str::from_utf8(output).unwrap(); - if stdout.contains("Hello from mock!") { - return; - } - } - unreachable!("Couldn't find output in 1 second") + // Give the process time to produce output + std::thread::sleep(Duration::from_millis(500)); + + let mut cmd = worker.logs(&[&project_name]); + cmd.timeout(Duration::from_secs(2)); + let output = cmd.output().expect("Failed to run logs"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains("Hello from mock!"), + "Expected output in logs, got: {}", + stdout + ); } #[test] @@ -51,20 +49,17 @@ fn test_logs_command_success() { let mut cmd = worker.start(&["-n", &uuid.to_string(), "-c", &echo_cmd]); cmd.assert().success(); - let timeout = Duration::new(1, 0); - let start = Instant::now(); - - // Try multiple times since it may not output immediately - while Instant::now().duration_since(start) < timeout { - let mut cmd = worker.logs(&[&uuid.to_string()]); - cmd.assert().success(); - - let output = &cmd.output().unwrap().stdout; - let stdout = std::str::from_utf8(output).unwrap(); - println!("stdout: {}", stdout); - if stdout.contains(&format!("Hello from {}!", uuid)) { - return; - } - } - unreachable!("Couldn't find output in 1 second") + // Give the process time to produce output + std::thread::sleep(Duration::from_millis(500)); + + let mut cmd = worker.logs(&[&uuid.to_string()]); + cmd.timeout(Duration::from_secs(2)); + let output = cmd.output().expect("Failed to run logs"); + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + stdout.contains(&format!("Hello from {}!", uuid)), + "Expected output in logs, got: {}", + stdout + ); } From e442259f3ee03f16609f8e9d3559b14533f83971 Mon Sep 17 00:00:00 2001 From: Sebastian Lyng Johansen Date: Sat, 18 Apr 2026 12:17:39 +0200 Subject: [PATCH 2/2] fix: cursor position and size on attach/logs --- src/main.rs | 7 ++----- src/project.rs | 2 +- src/pty.rs | 12 +++++------- src/pty_server.rs | 25 +++++++++++++------------ 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main.rs b/src/main.rs index e17e9c0..d4294fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,6 @@ use project::Project; use crate::{ project::RunningProject, - pty::Message, pty_client::{DetachReason, PtyClient}, }; @@ -94,8 +93,7 @@ fn run(config: &WorkerConfig, project: Project) -> Result<(), anyhow::Error> { } fn attach(config: &WorkerConfig, args: AttachArgs) -> Result<(), anyhow::Error> { - let mut client = PtyClient::connect(&config.sock_file(&args.project))?; - client.send(Message::DumpScreen); + let client = PtyClient::connect(&config.sock_file(&args.project))?; let reason = client.event_loop(false); @@ -136,8 +134,7 @@ fn list(config: &WorkerConfig, args: ListArgs) -> Result<(), anyhow::Error> { } fn logs(config: &WorkerConfig, args: LogsArgs) -> Result<(), anyhow::Error> { - let mut client = PtyClient::connect(&config.sock_file(&args.project))?; - client.send(Message::DumpScreen); + let client = PtyClient::connect(&config.sock_file(&args.project))?; client.event_loop(true); Ok(()) } diff --git a/src/project.rs b/src/project.rs index 4c12a72..baf52e7 100644 --- a/src/project.rs +++ b/src/project.rs @@ -123,7 +123,7 @@ impl Project { signal(libc::SIGINT, libc::SIG_IGN); signal(libc::SIGQUIT, libc::SIG_IGN); signal(libc::SIGTSTP, libc::SIG_IGN); - let server = PtyServer::bind(&sock_path)?; + let server = PtyServer::bind(&sock_path, rows, cols)?; // Signal readiness to intermediate let _ = pipe_w.write_all(&[1u8]); drop(pipe_w); diff --git a/src/pty.rs b/src/pty.rs index e6103cf..3d15ac4 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -57,8 +57,6 @@ pub fn get_terminal_size() -> (u16, u16) { pub enum Message { /// Resize the PTY. Resize { rows: u16, cols: u16 }, - /// Request the current screen state with escape sequences and colors. - DumpScreen, /// Request the current screen state as plain text. DumpText, /// Detach from the session. @@ -69,7 +67,6 @@ pub enum Message { impl Message { const RESIZE: u8 = 0x01; - const DUMP_SCREEN: u8 = 0x02; const DUMP_TEXT: u8 = 0x03; const CTRL_D: u8 = 0x04; @@ -81,7 +78,6 @@ impl Message { buf.extend_from_slice(&cols.to_be_bytes()); buf } - Message::DumpScreen => vec![Self::DUMP_SCREEN], Message::DumpText => vec![Self::DUMP_TEXT], Message::Detach => vec![Self::CTRL_D], Message::Input(data) => data.clone(), @@ -97,9 +93,6 @@ impl Message { let cols = u16::from_be_bytes([buf[3], buf[4]]); return Message::Resize { rows, cols }; } - if buf.len() == 1 && buf[0] == Self::DUMP_SCREEN { - return Message::DumpScreen; - } if buf.len() == 1 && buf[0] == Self::DUMP_TEXT { return Message::DumpText; } @@ -122,6 +115,11 @@ pub fn enter_raw_mode() -> Option { if libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, &raw) != 0 { return None; } + // Switch to alternate screen so avt.dump()'s absolute cursor + // positioning lands on a fresh (1,1)-based buffer instead of + // overlapping the shell history. + let _ = std::io::Write::write_all(&mut std::io::stdout(), b"\x1b[?1049h\x1b[H"); + let _ = std::io::Write::flush(&mut std::io::stdout()); Some(original) } } diff --git a/src/pty_server.rs b/src/pty_server.rs index c44ee5c..2b0cd77 100644 --- a/src/pty_server.rs +++ b/src/pty_server.rs @@ -4,7 +4,7 @@ use std::os::unix::net::{UnixListener, UnixStream}; use std::path::Path; use crate::libc::{PollResult, PollSet, has_child_exited}; -use crate::pty::{Message, get_terminal_size, set_window_size}; +use crate::pty::{Message, set_window_size}; pub enum ServerEvent<'a> { /// PTY master produced output. @@ -25,11 +25,10 @@ pub struct PtyServer { } impl PtyServer { - pub fn bind(path: &Path) -> std::io::Result { + pub fn bind(path: &Path, rows: u16, cols: u16) -> std::io::Result { let _ = std::fs::remove_file(path); let listener = UnixListener::bind(path)?; listener.set_nonblocking(true)?; - let (rows, cols) = get_terminal_size(); Ok(PtyServer { listener, clients: Vec::new(), @@ -105,10 +104,6 @@ impl PtyServer { Message::Resize { rows, cols } => { return ServerEvent::ClientResize(rows, cols); } - Message::DumpScreen => { - let screen = self.vt.dump().replace('\u{9b}', "\x1b["); - let _ = self.clients[i].write_all(screen.as_bytes()); - } Message::DumpText => { for line in self.vt.text() { if !line.is_empty() { @@ -134,13 +129,19 @@ impl PtyServer { // Read the initial resize (blocking, before set_nonblocking) let mut buf = [0u8; 5]; - if let Ok(n) = stream.read(&mut buf) { - if let Message::Resize { rows, cols } = Message::decode(&buf[..n]) { - set_window_size(master, rows, cols); - self.vt.resize(cols as usize, rows as usize); - } + if let Ok(n) = stream.read(&mut buf) + && let Message::Resize { rows, cols } = Message::decode(&buf[..n]) + { + set_window_size(master, rows, cols); + self.vt.resize(cols as usize, rows as usize); } + // Send the current screen state before joining the broadcast list. + // Otherwise live PTY output can reach the client before the dump, + // and the dump's cursor escapes smear that output. + let screen = self.vt.dump().replace('\u{9b}', "\x1b["); + let _ = stream.write_all(screen.as_bytes()); + stream.set_nonblocking(true).ok(); self.clients.push(stream); }