Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 143 additions & 38 deletions crates/phux-client/src/attach/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::io;
use std::path::Path;

use bytes::BytesMut;
use bytes::{Buf, BytesMut};
use phux_protocol::wire::frame::{FrameKind, MAX_FRAME_LEN};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
Expand All @@ -38,11 +38,13 @@ pub struct Connection {
#[derive(Debug)]
pub struct FrameReader {
inner: OwnedReadHalf,
/// Reusable assembly buffer; we reset it per frame to avoid a fresh
/// allocation each read.
framed: BytesMut,
/// Reusable scratch for the body bytes. Cleared before each read.
body: BytesMut,
/// Streaming receive buffer. The socket is read in chunks (not one
/// `read_exact` per frame) so a single syscall can surface several
/// queued frames at once; [`Self::recv`] and [`Self::try_recv`] decode
/// complete frames out of the front and retain any partial tail for the
/// next read. This buffering is what lets the attach loop coalesce a
/// back-to-back output burst into one paint (phux-jhv8).
buf: BytesMut,
}

/// Write half — encodes one [`FrameKind`] per call.
Expand All @@ -67,8 +69,7 @@ impl Connection {
Ok(Self {
reader: FrameReader {
inner: read,
framed: BytesMut::with_capacity(4096),
body: BytesMut::with_capacity(4096),
buf: BytesMut::with_capacity(8192),
},
writer: FrameWriter {
inner: write,
Expand All @@ -89,8 +90,7 @@ impl Connection {
Self {
reader: FrameReader {
inner: read,
framed: BytesMut::with_capacity(4096),
body: BytesMut::with_capacity(4096),
buf: BytesMut::with_capacity(8192),
},
writer: FrameWriter {
inner: write,
Expand Down Expand Up @@ -119,6 +119,17 @@ impl Connection {
pub async fn recv(&mut self) -> Result<FrameKind, AttachError> {
self.reader.recv().await
}

/// Pull a frame that is *already available* without awaiting the socket.
///
/// Returns `Ok(Some(frame))` when a complete frame can be decoded from
/// data already buffered (or readable without blocking), `Ok(None)` when
/// the next frame is not yet fully here. Lets the attach loop drain a
/// back-to-back burst after the first `recv` so the whole run coalesces
/// into a single paint (phux-jhv8).
pub fn try_recv(&mut self) -> Result<Option<FrameKind>, AttachError> {
self.reader.try_recv()
}
}

impl FrameWriter {
Expand All @@ -141,42 +152,80 @@ impl FrameReader {
/// Read one complete frame off the wire.
///
/// Returns [`AttachError::Disconnected`] on a clean EOF — the SPEC §5
/// length prefix is the only legal cut point.
/// length prefix is the only legal cut point. Drains a complete frame
/// from the receive buffer when one is already buffered; otherwise reads more
/// bytes (awaiting the socket) until a full frame lands.
pub async fn recv(&mut self) -> Result<FrameKind, AttachError> {
let mut header = [0u8; LENGTH_PREFIX];
match self.inner.read_exact(&mut header).await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
loop {
if let Some(frame) = decode_buffered(&mut self.buf)? {
return Ok(frame);
}
// No complete frame buffered — pull more bytes. A read of zero is
// a clean EOF; mid-frame that is a truncated stream, but the only
// SPEC §5 cut point is a frame boundary, which `decode_buffered`
// already returned above.
let n = self
.inner
.read_buf(&mut self.buf)
.await
.map_err(AttachError::Io)?;
if n == 0 {
return Err(AttachError::Disconnected);
}
Err(err) => return Err(AttachError::Io(err)),
}
}

let body_len = u32::from_be_bytes(header);
if !(1..=MAX_FRAME_LEN).contains(&body_len) {
return Err(AttachError::Protocol(format!(
"server sent frame with out-of-range length {body_len}",
)));
/// Non-blocking sibling of [`Self::recv`]: decode a frame only if one is
/// already buffered or becomes readable without blocking.
///
/// Returns `Ok(None)` when the next frame is not yet fully available.
/// Used to drain a burst after the first `recv` so the attach loop paints
/// the run once instead of per frame (phux-jhv8).
pub fn try_recv(&mut self) -> Result<Option<FrameKind>, AttachError> {
// A frame may already be sitting in the buffer behind the one `recv`
// just returned; hand it over before touching the socket.
if let Some(frame) = decode_buffered(&mut self.buf)? {
return Ok(Some(frame));
}
let body_len_usize = body_len as usize;

self.body.clear();
self.body.resize(body_len_usize, 0);
self.inner
.read_exact(&mut self.body)
.await
.map_err(AttachError::Io)?;

// Reassemble length-prefix + body so the decoder sees a full frame.
self.framed.clear();
self.framed.extend_from_slice(&header);
self.framed.extend_from_slice(&self.body);
// Top up from the socket without blocking. `WouldBlock` just means
// nothing more is queued right now.
match self.inner.try_read_buf(&mut self.buf) {
Ok(0) => return Err(AttachError::Disconnected),
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(err) => return Err(AttachError::Io(err)),
}
decode_buffered(&mut self.buf)
}
}

let (frame, _rest) = FrameKind::decode(&self.framed).map_err(|err| {
AttachError::Protocol(format!("server sent undecodable frame: {err:?}"))
})?;
Ok(frame)
/// Decode and consume one complete frame from the front of `buf`.
///
/// Returns `Ok(None)` when fewer than a full frame's bytes are buffered (the
/// length prefix is missing, or the body has not all arrived). The decoded
/// frame's bytes are dropped from the front; any trailing partial frame stays
/// for the next read.
fn decode_buffered(buf: &mut BytesMut) -> Result<Option<FrameKind>, AttachError> {
if buf.len() < LENGTH_PREFIX {
return Ok(None);
}
let mut header = [0u8; LENGTH_PREFIX];
header.copy_from_slice(&buf[..LENGTH_PREFIX]);
let body_len = u32::from_be_bytes(header);
if !(1..=MAX_FRAME_LEN).contains(&body_len) {
return Err(AttachError::Protocol(format!(
"server sent frame with out-of-range length {body_len}",
)));
}
let frame_len = LENGTH_PREFIX + body_len as usize;
if buf.len() < frame_len {
// Body still in flight — wait for more bytes.
return Ok(None);
}
let (frame, _rest) = FrameKind::decode(&buf[..frame_len])
.map_err(|err| AttachError::Protocol(format!("server sent undecodable frame: {err:?}")))?;
buf.advance(frame_len);
Ok(Some(frame))
}

#[cfg(test)]
Expand Down Expand Up @@ -214,4 +263,60 @@ mod tests {
assert_eq!(decoded, frame);
assert!(rest.is_empty());
}

fn framed(seq: u64) -> BytesMut {
// A small, cheap-to-build frame with a distinguishing field so the
// burst-decode test can assert ordering.
let frame = FrameKind::FrameAck {
terminal_id: phux_protocol::ids::TerminalId::Local { id: 1 },
seq,
};
let mut buf = BytesMut::new();
frame.encode(&mut buf);
buf
}

#[test]
fn decode_buffered_drains_back_to_back_frames_in_order() {
// The coalescing path (phux-jhv8) relies on a single socket read
// surfacing several queued frames: decode_buffered must peel them off
// the front one at a time, in order, leaving nothing behind.
let mut buf = BytesMut::new();
for seq in 1..=3 {
buf.extend_from_slice(&framed(seq));
}
let mut seqs = Vec::new();
while let Some(FrameKind::FrameAck { seq, .. }) = decode_buffered(&mut buf).expect("decode")
{
seqs.push(seq);
}
assert_eq!(seqs, vec![1, 2, 3]);
assert!(buf.is_empty(), "fully consumed buffer");
}

#[test]
fn decode_buffered_holds_partial_frame() {
// A frame split across reads must not decode early: the prefix says
// more bytes are coming, so decode_buffered returns None and retains
// the partial bytes until the rest arrives.
let whole = framed(7);
let cut = whole.len() - 2;
let mut buf = BytesMut::from(&whole[..cut]);
assert!(
decode_buffered(&mut buf).expect("partial").is_none(),
"incomplete frame yields None"
);
assert_eq!(buf.len(), cut, "partial bytes retained");
// Deliver the tail; now it decodes and the buffer drains.
buf.extend_from_slice(&whole[cut..]);
let frame = decode_buffered(&mut buf).expect("complete");
assert!(matches!(frame, Some(FrameKind::FrameAck { seq: 7, .. })));
assert!(buf.is_empty());
}

#[test]
fn decode_buffered_empty_is_none() {
let mut buf = BytesMut::new();
assert!(decode_buffered(&mut buf).expect("empty").is_none());
}
}
118 changes: 117 additions & 1 deletion crates/phux-client/src/attach/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,45 @@ impl PaneSlot {
/// of pressing Escape stays snappy. xterm uses ~50ms by default; we match.
const ESC_FLUSH_IDLE: Duration = Duration::from_millis(50);

/// phux-jhv8: upper bound on how many already-queued frames one `recv`
/// wake-up drains before painting. A back-to-back output burst (nvim
/// startup) is a few dozen frames; the cap only guards against a server
/// that streams without pause starving the stdin/signal `select!` arms.
const FRAME_COALESCE_CAP: usize = 1024;

/// The terminal a frame would repaint under normal handling, if any — the
/// `vt_write` + render pair a coalesced burst can defer to a later same-pane
/// frame (phux-jhv8). Output and snapshot frames carry pane content; every
/// other frame (layout, lifecycle, control) paints through its own path or
/// not at all, so it never defers (returns `None`).
const fn frame_paint_target(frame: &FrameKind) -> Option<&TerminalId> {
match frame {
FrameKind::TerminalOutput { terminal_id, .. }
| FrameKind::TerminalSnapshot { terminal_id, .. } => Some(terminal_id),
_ => None,
}
}

/// Per-frame paint-deferral mask for a coalesced burst (phux-jhv8).
///
/// `targets[i]` is the pane frame `i` would repaint (`None` for control
/// frames). The result is `true` at `i` iff some later frame repaints the
/// *same* pane — meaning frame `i`'s paint is redundant and can be skipped
/// (its `vt_write` still applies). Each pane's LAST frame is therefore never
/// deferred, so every touched pane settles exactly once and none is left
/// stale; control frames (`None`) never defer.
fn coalesce_defer_flags(targets: &[Option<TerminalId>]) -> Vec<bool> {
(0..targets.len())
.map(|i| {
targets[i].as_ref().is_some_and(|pane| {
targets[i + 1..]
.iter()
.any(|later| later.as_ref() == Some(pane))
})
})
.collect()
}

/// Errors the attach loop can surface to its caller.
///
/// Most variants wrap a richer underlying cause; the driver is careful to
Expand Down Expand Up @@ -635,6 +674,8 @@ async fn main_loop<W: super::RenderSink>(
&mut pending_splits,
&mut pending_windows,
overlays.is_active(),
// Single replayed frame — no burst to coalesce, paint it.
false,
)?;
if outcome.exit {
return Ok(LoopExit::Detached);
Expand Down Expand Up @@ -701,7 +742,36 @@ async fn main_loop<W: super::RenderSink>(
// network delivers so the user sees output promptly.
frame = conn.recv() => {
match frame {
Ok(f) => {
Ok(first) => {
// phux-jhv8: drain every frame already queued so a
// back-to-back output burst (nvim startup, a
// full-screen redraw) applies all its vt_writes and
// paints ONCE — on the final frame — instead of a
// render + blocking flush per frame. The non-blocking
// try_recv stops the moment the socket would block, so
// a lone frame keeps the old one-frame-one-paint path.
let mut batch = vec![first];
while batch.len() < FRAME_COALESCE_CAP {
match conn.try_recv() {
Ok(Some(more)) => batch.push(more),
// Socket drained, or a clean EOF the next
// `recv()` will surface as Disconnected.
Ok(None) | Err(AttachError::Disconnected) => break,
Err(err) => return Err(err),
}
}
// Per-pane last-wins: a frame defers its paint iff a
// LATER frame in the burst repaints the same pane, so
// every touched pane (focused or not) settles exactly
// once on its final frame. No pane is left stale, and
// the hot single-pane case collapses to one paint.
let paint_targets: Vec<Option<TerminalId>> = batch
.iter()
.map(|f| frame_paint_target(f).cloned())
.collect();
let defer_flags = coalesce_defer_flags(&paint_targets);
for (frame_idx, f) in batch.into_iter().enumerate() {
let defer_paint = defer_flags[frame_idx];
// phux-tnh: snapshot the current per-leaf rects
// BEFORE the frame may fold (close) or split the
// layout, so a TerminalClosed/Spawned can diff
Expand Down Expand Up @@ -732,6 +802,7 @@ async fn main_loop<W: super::RenderSink>(
&mut pending_splits,
&mut pending_windows,
overlays.is_active(),
defer_paint,
)?;
if outcome.exit {
return Ok(LoopExit::Detached);
Expand Down Expand Up @@ -832,6 +903,7 @@ async fn main_loop<W: super::RenderSink>(
// MetadataValue can't trample state.
layout_get_request_id = None;
}
}
}
Err(AttachError::Disconnected) if detach_pending => {
// Server closed the socket without a `DETACHED`
Expand Down Expand Up @@ -1605,6 +1677,50 @@ mod tests {
assert!(msg.contains("attach loop io error"));
}

#[test]
fn coalesce_defers_every_pane_frame_but_its_last() {
// phux-jhv8: in a coalesced burst, every output frame for a pane
// defers EXCEPT that pane's final frame, which settles the screen.
let p = |id| Some(TerminalId::Local { id });
// Single-pane burst: only the last frame paints.
assert_eq!(
coalesce_defer_flags(&[p(2), p(2), p(2)]),
vec![true, true, false]
);
// A lone frame never defers (preserves the one-frame-one-paint path).
assert_eq!(coalesce_defer_flags(&[p(2)]), vec![false]);
}

#[test]
fn coalesce_keys_deferral_per_pane_not_globally() {
// Two panes interleaved: each pane's LAST frame paints, so neither is
// left stale even when the burst ends on the other pane's output.
let p = |id| Some(TerminalId::Local { id });
// A(defer, later A) B(defer, later B) A(last A) B(last B)
assert_eq!(
coalesce_defer_flags(&[p(1), p(2), p(1), p(2)]),
vec![true, true, false, false]
);
// Burst ending on a non-focused pane B must still paint A's last frame.
assert_eq!(
coalesce_defer_flags(&[p(1), p(1), p(2)]),
vec![true, false, false]
);
}

#[test]
fn coalesce_control_frames_never_defer() {
// `None` (a non-painting control frame) never defers, and never
// counts as a later same-pane paint for the frames before it.
let p = |id| Some(TerminalId::Local { id });
assert_eq!(
coalesce_defer_flags(&[p(1), None, p(1)]),
vec![true, false, false]
);
assert_eq!(coalesce_defer_flags(&[None, None]), vec![false, false]);
assert_eq!(coalesce_defer_flags(&[]), Vec::<bool>::new());
}

#[test]
fn attach_error_disconnected_is_distinct_from_io() {
let a = AttachError::Disconnected;
Expand Down
Loading
Loading