From 919d71cc4628ac104633a922ab848c07ae4c30b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 26 Mar 2026 03:10:00 -0300 Subject: [PATCH 1/2] Add PTY event loop and wire session::run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Event loop (session/event_loop.rs): - stdin reader: raw bytes → KeyEvent → PopupState → forward/show/erase - PTY reader: blocking thread → channel → OscParser → strip OSC + passthrough - PTY writer: channel → blocking write to master fd - Terminal writer: blocking thread with stdout lock → passthrough + popup - SIGWINCH handler: resize PTY + update popup cols - Escape timeout: 10ms disambiguates standalone ESC from sequences - Accept: Ctrl-A + Ctrl-K + retype via PTY master (readline-compatible) Session entry (session/mod.rs): - Auto-starts daemon if not running - Writes integration script to temp file - Spawns shell via PTY with --rcfile - Enables raw mode with scopeguard restoration - Runs event loop via tokio runtime - Cleans up temp files on exit popup.rs: Made popup_lines field pub for event loop access Closes #45 Signed-off-by: André Ahlert --- src/session/event_loop.rs | 341 ++++++++++++++++++++++++++++++++++++++ src/session/mod.rs | 63 +++++-- src/session/popup.rs | 2 +- 3 files changed, 393 insertions(+), 13 deletions(-) create mode 100644 src/session/event_loop.rs diff --git a/src/session/event_loop.rs b/src/session/event_loop.rs new file mode 100644 index 0000000..39dd89d --- /dev/null +++ b/src/session/event_loop.rs @@ -0,0 +1,341 @@ +//! Session event loop: the core of the PTY wrapper. +//! +//! Runs three concurrent async tasks: +//! 1. **stdin reader**: raw bytes from real terminal → KeyEvent → PopupState +//! 2. **PTY reader**: bytes from PTY master → OscParser → forward to real terminal +//! 3. **SIGWINCH handler**: terminal resize → PTY window size update +//! +//! The event loop owns the real terminal output and writes popup ANSI +//! directly to it, separate from the PTY output stream. + +use super::keys::{self, KeyEvent}; +use super::osc::{OscEvent, OscParser}; +use super::popup::{PopupAction, PopupState}; +use crate::session::pty; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; +use std::process::Child; +use tokio::sync::mpsc; +use tracing::{debug, error, warn}; + +/// Actions the writer task should execute on the real terminal. +enum TerminalWrite { + /// Write bytes to the real terminal stdout (PTY output passthrough). + Passthrough(Vec), + /// Write popup ANSI string to the real terminal. + ShowPopup(String), + /// Erase popup (N item lines + 2 border lines). + ErasePopup(usize), +} + +/// Run the session event loop. This is the main entry point after PTY setup. +/// +/// Takes ownership of the PTY master fd and child process. +/// Runs until the child shell exits or stdin reaches EOF. +pub async fn run( + master: OwnedFd, + mut child: Child, + terminal_cols: u16, + _terminal_rows: u16, +) -> anyhow::Result<()> { + let master_fd = master.as_raw_fd(); + + // Channel for terminal writes (popup + PTY passthrough) + let (write_tx, mut write_rx) = mpsc::channel::(256); + + // Popup state + let popup = std::sync::Arc::new(tokio::sync::Mutex::new(PopupState::new(terminal_cols))); + + // Use blocking I/O for PTY master in a dedicated thread. + // tokio's AsyncFd requires the fd to be non-blocking, which can cause + // issues with PTY masters on some platforms. Blocking read in a thread is simpler. + let master_read_fd = master_fd; + let (pty_output_tx, mut pty_output_rx) = mpsc::channel::>(256); + let (pty_write_tx, mut pty_write_rx) = mpsc::channel::>(256); + + // PTY reader thread (blocking) + let pty_read_thread = std::thread::spawn(move || { + use std::io::Read; + let mut file = unsafe { std::fs::File::from_raw_fd(master_read_fd) }; + let mut buf = [0u8; 4096]; + loop { + match file.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + if pty_output_tx.blocking_send(buf[..n].to_vec()).is_err() { + break; + } + } + Err(e) => { + // EIO is normal when child exits + if e.raw_os_error() == Some(5) { + break; + } + error!("PTY read error: {e}"); + break; + } + } + } + // Don't close the fd here (it's shared with the write side) + std::mem::forget(file); + debug!("PTY read thread exited"); + }); + + // PTY writer task (uses blocking write from a tokio task) + let pty_writer_task = tokio::spawn(async move { + use std::io::Write; + // SAFETY: master_fd is valid and shared between read thread and this task. + // The read thread only reads; this task only writes. No concurrent same-op. + let mut file = unsafe { std::fs::File::from_raw_fd(master_fd) }; + while let Some(bytes) = pty_write_rx.recv().await { + if file.write_all(&bytes).is_err() { + break; + } + let _ = file.flush(); + } + std::mem::forget(file); // don't double-close + debug!("PTY writer task exited"); + }); + + // Task 1: Read raw stdin and route key events + let stdin_popup = popup.clone(); + let stdin_write_tx = write_tx.clone(); + let stdin_pty_tx = pty_write_tx.clone(); + let stdin_task = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + let mut stdin = tokio::io::stdin(); + let mut buf = [0u8; 256]; + let mut escape_pending = false; + + loop { + let n = if escape_pending { + // Wait briefly for escape sequence continuation + match tokio::time::timeout( + std::time::Duration::from_millis(10), + stdin.read(&mut buf), + ) + .await + { + Ok(Ok(0)) => break, + Ok(Ok(n)) => n, + Ok(Err(_)) => break, + Err(_) => { + // Timeout: standalone Escape + let (events, pending) = keys::parse_bytes(&[], true); + escape_pending = pending; + for event in events { + handle_key_event(event, &stdin_popup, &stdin_write_tx, &stdin_pty_tx) + .await; + } + continue; + } + } + } else { + match stdin.read(&mut buf).await { + Ok(0) => break, // EOF + Ok(n) => n, + Err(_) => break, + } + }; + + let (events, pending) = keys::parse_bytes(&buf[..n], escape_pending); + escape_pending = pending; + + for event in events { + handle_key_event(event, &stdin_popup, &stdin_write_tx, &stdin_pty_tx).await; + } + } + debug!("stdin reader exited"); + }); + + // Task 2: Process PTY output from read thread, strip OSC, forward to terminal + let pty_popup = popup.clone(); + let pty_terminal_tx = write_tx.clone(); + let pty_task = tokio::spawn(async move { + let mut osc_parser = OscParser::new(); + let cwd = std::env::current_dir() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + + while let Some(chunk) = pty_output_rx.recv().await { + let (passthrough, events) = osc_parser.feed(&chunk); + + if !passthrough.is_empty() { + let _ = pty_terminal_tx + .send(TerminalWrite::Passthrough(passthrough)) + .await; + } + + for event in events { + match event { + OscEvent::CommandLine { buffer, cursor } => { + let mut popup = pty_popup.lock().await; + let action = popup.on_command_line(buffer, cursor, &cwd).await; + dispatch_popup_action(action, &pty_terminal_tx).await; + } + OscEvent::PromptStart => { + let mut popup = pty_popup.lock().await; + if popup.visible { + let lines = popup.items.len().min(10); + popup.visible = false; + popup.items.clear(); + popup.popup_lines = 0; + let _ = pty_terminal_tx.send(TerminalWrite::ErasePopup(lines)).await; + } + } + OscEvent::PromptEnd => {} + } + } + } + debug!("PTY processor exited"); + }); + + // Task 3: Terminal writer (blocking thread, owns stdout) + let writer_thread = std::thread::spawn(move || { + use std::io::Write; + let mut stdout = std::io::stdout().lock(); + + while let Some(action) = write_rx.blocking_recv() { + match action { + TerminalWrite::Passthrough(bytes) => { + let _ = stdout.write_all(&bytes); + let _ = stdout.flush(); + } + TerminalWrite::ShowPopup(ansi) => { + let _ = stdout.write_all(b"\x1b[s"); + let _ = stdout.write_all(ansi.as_bytes()); + let _ = stdout.write_all(b"\x1b[u"); + let _ = stdout.flush(); + } + TerminalWrite::ErasePopup(lines) => { + use tabra::render::overlay; + let erase = overlay::erase_popup(lines); + let _ = stdout.write_all(erase.as_bytes()); + let _ = stdout.flush(); + } + } + } + debug!("terminal writer exited"); + }); + + // Task 4: SIGWINCH handler + let sigwinch_popup = popup.clone(); + let sigwinch_task = tokio::spawn(async move { + let mut signal = + match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()) { + Ok(s) => s, + Err(e) => { + warn!("failed to register SIGWINCH handler: {e}"); + return; + } + }; + + loop { + signal.recv().await; + if let Ok((rows, cols)) = pty::get_window_size() { + // Resize PTY + let ws = nix::libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + unsafe { + nix::libc::ioctl(master_fd, nix::libc::TIOCSWINSZ, &ws); + } + // Update popup cols + sigwinch_popup.lock().await.terminal_cols = cols; + } + } + }); + + // Wait for PTY processor to exit (child shell exited) + let _ = pty_task.await; + + // Clean up + stdin_task.abort(); + sigwinch_task.abort(); + pty_writer_task.abort(); + drop(write_tx); // close writer channel + let _ = writer_thread.join(); + let _ = pty_read_thread.join(); + + // Wait for child process + let _ = child.wait(); + + // The master fd was shared between read thread and writer task via from_raw_fd. + // Both use mem::forget to avoid double-close. The OwnedFd master will close + // on drop at the end of this function, which is the single close point. + // But we already forgot it in the threads, so we need to forget it here too + // to avoid the OwnedFd destructor closing an already-managed fd. + std::mem::forget(master); + + Ok(()) +} + +/// Handle a key event from stdin: route to popup or forward to PTY. +async fn handle_key_event( + event: KeyEvent, + popup: &std::sync::Arc>, + write_tx: &mpsc::Sender, + pty_tx: &mpsc::Sender>, +) { + let action = popup.lock().await.on_key(&event); + match action { + PopupAction::ForwardKey(bytes) => { + let _ = pty_tx.send(bytes).await; + } + PopupAction::Show(ansi) => { + let _ = write_tx.send(TerminalWrite::ShowPopup(ansi)).await; + } + PopupAction::Erase { lines } => { + let _ = write_tx.send(TerminalWrite::ErasePopup(lines)).await; + } + PopupAction::Accept { + token_start, + insert_text, + } => { + // Erase popup first + let lines = popup.lock().await.items.len().min(10); + let _ = write_tx.send(TerminalWrite::ErasePopup(lines)).await; + + // Insert text into the shell by sending Ctrl-A (go to start), + // Ctrl-K (kill to end), then the full replacement line. + // This works because readline processes these control chars. + let popup_guard = popup.lock().await; + let before = &popup_guard.last_buffer[..token_start]; + let after_cursor = if popup_guard.last_cursor < popup_guard.last_buffer.len() { + &popup_guard.last_buffer[popup_guard.last_cursor..] + } else { + "" + }; + let new_line = format!("{before}{insert_text} {after_cursor}"); + drop(popup_guard); + + // Ctrl-A (beginning of line) + Ctrl-K (kill line) + retype + let mut inject = vec![0x01u8]; // Ctrl-A + inject.push(0x0b); // Ctrl-K + inject.extend_from_slice(new_line.as_bytes()); + let _ = pty_tx.send(inject).await; + } + PopupAction::EraseAndForward { lines, bytes } => { + let _ = write_tx.send(TerminalWrite::ErasePopup(lines)).await; + let _ = pty_tx.send(bytes).await; + } + PopupAction::Nothing => {} + } +} + +/// Dispatch a PopupAction from the PTY reader (OSC command line event). +async fn dispatch_popup_action(action: PopupAction, write_tx: &mpsc::Sender) { + match action { + PopupAction::Show(ansi) => { + let _ = write_tx.send(TerminalWrite::ShowPopup(ansi)).await; + } + PopupAction::Erase { lines } => { + let _ = write_tx.send(TerminalWrite::ErasePopup(lines)).await; + } + PopupAction::Nothing => {} + _ => {} // Other actions not expected from on_command_line + } +} diff --git a/src/session/mod.rs b/src/session/mod.rs index 8d99e46..52238a6 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -12,14 +12,16 @@ //! tabra daemon (via Unix socket IPC) //! ``` +pub mod event_loop; pub mod integration; pub mod keys; pub mod osc; pub mod popup; pub mod pty; -use anyhow::Result; +use anyhow::{Context, Result}; use tabra::shell::ShellType; +use tracing::info; /// Entry point for `tabra session`. pub fn run(shell: Option) -> Result<()> { @@ -30,28 +32,65 @@ pub fn run(shell: Option) -> Result<()> { None => std::env::var("SHELL").unwrap_or_else(|_| "bash".to_string()), }; - tracing::info!("starting session with shell: {}", shell_path); + info!("starting session with shell: {}", shell_path); - // TODO: Phase 5 (event loop) will implement the full session here. - // For now, verify the foundation compiles and the PTY can be opened. + // Ensure daemon is running + if tabra::ipc::client::request_status().is_err() { + info!("daemon not running, starting..."); + std::process::Command::new("tabra") + .arg("daemon") + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .context("failed to start tabra daemon")?; + + // Wait for daemon to be ready + for _ in 0..10 { + std::thread::sleep(std::time::Duration::from_millis(100)); + if tabra::ipc::client::request_status().is_ok() { + break; + } + } + } + + // Write integration script to a temp file for bash --rcfile + let integration_script = match shell_path.as_str() { + s if s.contains("bash") => integration::bash_integration(), + s if s.contains("zsh") => integration::zsh_integration(), + s if s.contains("fish") => integration::fish_integration(), + _ => String::new(), + }; + + let tmp_dir = std::env::temp_dir().join(format!("tabra-session-{}", std::process::id())); + std::fs::create_dir_all(&tmp_dir)?; + let script_path = tmp_dir.join("integration.sh"); + std::fs::write(&script_path, &integration_script)?; + let script_path_str = script_path.to_string_lossy().to_string(); + + // Open PTY and set window size let pty_pair = pty::PtyPair::open()?; let (rows, cols) = pty::get_window_size()?; pty_pair.set_window_size(rows, cols)?; - tracing::info!("PTY opened, terminal size: {}x{}", cols, rows); + info!("PTY opened, terminal size: {}x{}", cols, rows); + + // Spawn shell inside PTY + let (master, child) = pty_pair.spawn_shell(&shell_path, &script_path_str)?; + info!("shell spawned: {}", shell_path); - // Enable raw mode with scopeguard restoration. - // Guard restores terminal on drop, including on panic. - // Phase 5 will run the event loop between enable and the implicit drop at end of scope. + // Enable raw mode with scopeguard restoration let original_termios = pty::enable_raw_mode()?; let _raw_guard = scopeguard::guard(original_termios, |t| pty::restore_mode(&t)); - // TODO: Phase 5 event loop goes here (between raw mode enable and guard drop) + // Run the async event loop + let rt = tokio::runtime::Runtime::new().context("failed to create tokio runtime")?; + rt.block_on(event_loop::run(master, child, cols, rows))?; - // Explicitly drop guard to restore terminal before printing + // Guard drops here, restoring terminal drop(_raw_guard); - eprintln!("tabra session: PTY wrapper mode is under development."); - eprintln!("For now, use: eval \"$(tabra init bash)\""); + // Cleanup temp files + let _ = std::fs::remove_dir_all(&tmp_dir); Ok(()) } diff --git a/src/session/popup.rs b/src/session/popup.rs index 457cd3c..febe178 100644 --- a/src/session/popup.rs +++ b/src/session/popup.rs @@ -40,7 +40,7 @@ pub struct PopupState { pub last_cursor: usize, pub terminal_cols: u16, pub theme: Theme, - popup_lines: usize, + pub popup_lines: usize, } impl PopupState { From 696331bd11a9ab4e5f3f49cb484ef9009aacbca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 26 Mar 2026 03:13:26 -0300 Subject: [PATCH 2/2] Fix shutdown: await aborted tasks, remove fd leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Await stdin_task, sigwinch_task, pty_writer_task after abort() to ensure they've stopped before master fd is closed - Remove mem::forget(master): OwnedFd drop is the intended single close point (both threads already mem::forget their File copies) - Prevents use-after-close on master fd during SIGWINCH or mid-write Signed-off-by: André Ahlert --- src/session/event_loop.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/session/event_loop.rs b/src/session/event_loop.rs index 39dd89d..d5e5e21 100644 --- a/src/session/event_loop.rs +++ b/src/session/event_loop.rs @@ -252,11 +252,18 @@ pub async fn run( // Wait for PTY processor to exit (child shell exited) let _ = pty_task.await; - // Clean up + // Clean up: abort tasks and await them to ensure they've stopped + // before closing the master fd. stdin_task.abort(); + let _ = stdin_task.await; + sigwinch_task.abort(); + let _ = sigwinch_task.await; + pty_writer_task.abort(); - drop(write_tx); // close writer channel + let _ = pty_writer_task.await; + + drop(write_tx); // close writer channel so writer thread exits let _ = writer_thread.join(); let _ = pty_read_thread.join(); @@ -264,11 +271,9 @@ pub async fn run( let _ = child.wait(); // The master fd was shared between read thread and writer task via from_raw_fd. - // Both use mem::forget to avoid double-close. The OwnedFd master will close - // on drop at the end of this function, which is the single close point. - // But we already forgot it in the threads, so we need to forget it here too - // to avoid the OwnedFd destructor closing an already-managed fd. - std::mem::forget(master); + // Both threads use mem::forget on their File to avoid closing the fd. + // The OwnedFd `master` is the sole owner and its drop here is the single close. + // (Do NOT mem::forget master: it must close so the PTY is properly cleaned up.) Ok(()) }