Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ session-events.jsonl.*
*.swp
*~

# Local tooling (not for the team)
.codegraph/
.cursor/

# OS
.DS_Store
Thumbs.db
Expand All @@ -62,3 +66,6 @@ builder-debug.yml

# playwright artifacts
frontend/test-results/

# built daemon binary copied into the frontend bundle dir
frontend/daemon/
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/aider/aider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestContextCancellation(t *testing.T) {
func TestResolveAiderBinaryFallback(t *testing.T) {
// When the binary is not on PATH or any well-known location, the resolver
// MUST surface ports.ErrAgentBinaryNotFound rather than a silent string
// fallback that lets a missing CLI launch into an empty zellij pane.
// fallback that lets a missing CLI launch into an empty tmux pane.
bin, err := ResolveAiderBinary(context.Background())
if err != nil {
if !errors.Is(err, ports.ErrAgentBinaryNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/auggie/auggie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestSessionInfoNoOp(t *testing.T) {
func TestResolveAuggieBinaryFallback(t *testing.T) {
// When the binary is not on PATH or any well-known location, the resolver
// MUST surface ports.ErrAgentBinaryNotFound rather than a silent string
// fallback that lets a missing CLI launch into an empty zellij pane.
// fallback that lets a missing CLI launch into an empty tmux pane.
bin, err := ResolveAuggieBinary(context.Background())
if err != nil {
if !errors.Is(err, ports.ErrAgentBinaryNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/claudecode/claudecode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//
// Claude Code starts an interactive session by default (no -p/--print), which
// is exactly what AO wants: a live agent the user can attach to in the
// browser terminal or via `zellij attach`. The initial task prompt is passed
// browser terminal or via `tmux attach`. The initial task prompt is passed
// as the positional argument; the orchestrator system prompt (if any) is
// appended to Claude's default system prompt so its built-in coding
// instructions are preserved.
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/devin/devin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestGetAgentHooksCtxCancelled(t *testing.T) {
func TestResolveDevinBinaryFallback(t *testing.T) {
// When the binary is not on PATH or any well-known location, the resolver
// MUST surface ports.ErrAgentBinaryNotFound rather than a silent string
// fallback that lets a missing CLI launch into an empty zellij pane.
// fallback that lets a missing CLI launch into an empty tmux pane.
bin, err := ResolveDevinBinary(context.Background())
if err != nil {
if !errors.Is(err, ports.ErrAgentBinaryNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/goose/goose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestSessionInfoFalseWhenNoHookMetadata(t *testing.T) {
func TestResolveGooseBinaryFallback(t *testing.T) {
// When the binary is not on PATH or any well-known location, the resolver
// MUST surface ports.ErrAgentBinaryNotFound rather than a silent string
// fallback that lets a missing CLI launch into an empty zellij pane.
// fallback that lets a missing CLI launch into an empty tmux pane.
bin, err := ResolveGooseBinary(context.Background())
if err != nil {
if !errors.Is(err, ports.ErrAgentBinaryNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/kilocode/kilocode.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func kilocodePermissionConfig(mode ports.PermissionMode) map[string]string {
// kilocode. A bare `KILO_CONFIG_CONTENT=...` argv element would not work: the
// runtime shell-quotes every element, and a quoted token is run as a command
// rather than read as an assignment — hence the explicit `env` wrapper.
// POSIX-only, which matches the zellij runtime.
// POSIX-only, which matches the tmux runtime.
func kilocodePermissionEnvPrefix(mode ports.PermissionMode) []string {
config := kilocodePermissionConfig(mode)
if len(config) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/kiro/kiro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestDeriveActivityState(t *testing.T) {
func TestResolveKiroBinaryFallback(t *testing.T) {
// When the binary is not on PATH or any well-known location, the resolver
// MUST surface ports.ErrAgentBinaryNotFound rather than a silent string
// fallback that lets a missing CLI launch into an empty zellij pane.
// fallback that lets a missing CLI launch into an empty tmux pane.
bin, err := ResolveKiroBinary(context.Background())
if err != nil {
if !errors.Is(err, ports.ErrAgentBinaryNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/adapters/agent/qwen/qwen.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (p *Plugin) SessionInfo(ctx context.Context, session ports.SessionRef) (por
// searching PATH then a handful of well-known install locations (Homebrew, npm
// global). Returns ports.ErrAgentBinaryNotFound when none of those find the
// binary — better than the previous silent `"qwen"` fallback, which let an
// empty zellij pane masquerade as a live session.
// empty tmux pane masquerade as a live session.
func ResolveQwenBinary(ctx context.Context) (string, error) {
if err := ctx.Err(); err != nil {
return "", err
Expand Down
124 changes: 124 additions & 0 deletions backend/internal/adapters/runtime/conpty/attach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// attach.go - conpty Attach: a loopback Stream over the B3 pty-host. Unlike
// tmux, conpty does not spawn an attach CLI; it dials the session's
// loopback host and speaks the B1 framing protocol directly. The host replays
// the scrollback Snapshot as the first MsgTerminalData on connect, so a fresh
// Read naturally yields the repaint first.
package conpty

import (
"context"
"encoding/json"
"fmt"
"io"
"sync"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

var _ ports.Attacher = (*Runtime)(nil)

// Attach opens a fresh attach Stream for the session by dialing its loopback
// pty-host. rows/cols size the host's PTY from birth when known (a MsgResize is
// sent right after connect). ctx cancellation closes the Stream.
func (r *Runtime) Attach(ctx context.Context, handle ports.RuntimeHandle, rows, cols uint16) (ports.Stream, error) {
sess := r.resolve(handle.ID)
if sess == nil {
return nil, fmt.Errorf("conpty: session %q not found", handle.ID)
}
conn, err := dialHost(sess.addr, dialTimeout)
if err != nil {
return nil, fmt.Errorf("conpty: dial host for %q: %w", handle.ID, err)
}

pr, pw := io.Pipe()
s := &loopbackStream{conn: conn, pr: pr, pw: pw}

// Pump host frames: MsgTerminalData payloads go into the pipe that Read
// drains. The first such frame is the scrollback snapshot, so the replay
// arrives before any live output.
go s.pump()

// ctx cancellation must terminate the stream (mirrors the unix/windows
// spawn paths closing the PTY on ctx.Done).
go func() {
<-ctx.Done()
_ = s.Close()
}()

if rows > 0 && cols > 0 {
if err := s.Resize(rows, cols); err != nil {
_ = s.Close()
return nil, err
}
}
return s, nil
}

// loopbackStream is a ports.Stream backed by a single loopback connection to the
// pty-host. The pump goroutine reframes host output into an io.Pipe so Read
// presents a plain byte stream; Write/Resize encode client frames onto the conn.
type loopbackStream struct {
conn io.ReadWriteCloser
pr *io.PipeReader
pw *io.PipeWriter

closeOnce sync.Once
}

// pump reads framed host messages and writes MsgTerminalData payloads into the
// pipe. It closes the pipe when the connection ends so Read returns EOF.
func (s *loopbackStream) pump() {
parser := NewMessageParser(func(msgType byte, payload []byte) {
if msgType == MsgTerminalData {
// Write blocks until Read drains, preserving back-pressure and order.
_, _ = s.pw.Write(payload)
}
})
buf := make([]byte, 4096)
for {
n, err := s.conn.Read(buf)
if n > 0 {
parser.Feed(buf[:n])
}
if err != nil {
_ = s.pw.CloseWithError(err)
return
}
}
}

func (s *loopbackStream) Read(p []byte) (int, error) { return s.pr.Read(p) }

func (s *loopbackStream) Write(p []byte) (int, error) {
frame, err := EncodeMessage(MsgTerminalInput, p)
if err != nil {
return 0, err
}
if _, err := s.conn.Write(frame); err != nil {
return 0, err
}
return len(p), nil
}

func (s *loopbackStream) Resize(rows, cols uint16) error {
payload, _ := json.Marshal(ResizePayload{Cols: int(cols), Rows: int(rows)})
frame, err := EncodeMessage(MsgResize, payload) // small JSON payload, never overflows uint32
if err != nil {
return err
}
_, err = s.conn.Write(frame)
return err
}

// Close closes the conn and the pipe. Idempotent. Closing the conn unblocks
// pump's Read, which then closes the pipe-writer too; closing both here makes
// Close safe to call directly (e.g. on ctx cancel) without waiting for pump.
func (s *loopbackStream) Close() error {
var err error
s.closeOnce.Do(func() {
err = s.conn.Close()
_ = s.pw.Close()
_ = s.pr.Close()
})
return err
}
151 changes: 151 additions & 0 deletions backend/internal/adapters/runtime/conpty/attach_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package conpty

import (
"bytes"
"context"
"io"
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// runtimeForFixture wires a conpty Runtime to a running serveFixture by stuffing
// the fixture's loopback addr into the session map under the given id, so Attach
// resolves it without a real Windows spawn.
func runtimeForFixture(id string, f *serveFixture) *Runtime {
r := New(Options{})
r.mu.Lock()
r.sessions[id] = &hostSession{addr: f.addr, pid: f.pty.PID()}
r.mu.Unlock()
return r
}

func readUntil(t *testing.T, s io.Reader, want string, timeout time.Duration) string {
t.Helper()
type res struct {
out string
}
done := make(chan res, 1)
go func() {
var buf []byte
tmp := make([]byte, 4096)
for {
n, err := s.Read(tmp)
if n > 0 {
buf = append(buf, tmp[:n]...)
if bytes.Contains(buf, []byte(want)) {
done <- res{string(buf)}
return
}
}
if err != nil {
done <- res{string(buf)}
return
}
}
}()
select {
case r := <-done:
return r.out
case <-time.After(timeout):
t.Fatalf("timed out reading for %q", want)
return ""
}
}

// TestAttachReplaysScrollback: the host sends the ring snapshot as the first
// MsgTerminalData on connect, so a fresh Read on the Stream yields the replay.
func TestAttachReplaysScrollback(t *testing.T) {
f := startServe(t, 300)
defer f.cancel()
f.ring.Append([]byte("scrollback-line\n"))

r := runtimeForFixture("sess", f)
s, err := r.Attach(context.Background(), nameHandle("sess"), 0, 0)
if err != nil {
t.Fatalf("Attach: %v", err)
}
defer s.Close()

out := readUntil(t, s, "scrollback-line", 2*time.Second)
if !bytes.Contains([]byte(out), []byte("scrollback-line")) {
t.Fatalf("scrollback not replayed on Read; got %q", out)
}
}

// TestAttachWriteReachesPTY: Write on the Stream sends MsgTerminalInput, which
// the host forwards to the fakePTY's input.
func TestAttachWriteReachesPTY(t *testing.T) {
f := startServe(t, 301)
defer f.cancel()

r := runtimeForFixture("sess", f)
s, err := r.Attach(context.Background(), nameHandle("sess"), 0, 0)
if err != nil {
t.Fatalf("Attach: %v", err)
}
defer s.Close()

keystrokes := []byte("ls -la\r")
if _, err := s.Write(keystrokes); err != nil {
t.Fatalf("Write: %v", err)
}
buf := make([]byte, len(keystrokes))
if _, err := io.ReadFull(f.pty.inR, buf); err != nil {
t.Fatalf("read pty input: %v", err)
}
if string(buf) != string(keystrokes) {
t.Fatalf("pty input = %q, want %q", buf, keystrokes)
}
}

// TestAttachResizeReachesPTY: an initial size on Attach plus a later Resize both
// reach the fakePTY.Resize via MsgResize frames.
func TestAttachResizeReachesPTY(t *testing.T) {
f := startServe(t, 302)
defer f.cancel()

r := runtimeForFixture("sess", f)
// Attach with a birth size: the implementation sends an initial MsgResize.
s, err := r.Attach(context.Background(), nameHandle("sess"), 40, 132)
if err != nil {
t.Fatalf("Attach: %v", err)
}
defer s.Close()

if err := s.Resize(50, 160); err != nil {
t.Fatalf("Resize: %v", err)
}

// Poll for both resizes (birth + explicit) to arrive on the fakePTY.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
f.pty.resizeMu.Lock()
n := len(f.pty.resizes)
var last ResizePayload
if n > 0 {
last = f.pty.resizes[n-1]
}
f.pty.resizeMu.Unlock()
if n >= 2 && last.Cols == 160 && last.Rows == 50 {
return
}
time.Sleep(10 * time.Millisecond)
}
f.pty.resizeMu.Lock()
defer f.pty.resizeMu.Unlock()
t.Fatalf("resizes did not reach pty as expected: %+v", f.pty.resizes)
}

// TestAttachUnknownSession: Attach to a session with no resolvable addr errors.
func TestAttachUnknownSession(t *testing.T) {
r := New(Options{})
if _, err := r.Attach(context.Background(), nameHandle("nope"), 0, 0); err == nil {
t.Fatal("expected error attaching to unknown session")
}
}

func nameHandle(id string) ports.RuntimeHandle {
return ports.RuntimeHandle{ID: id}
}
Loading