Skip to content

Persistent coding-agent processes with streaming I/O (replace per-round cold spawns) #83

@jeremyplichta

Description

@jeremyplichta

Summary

Today, tt agent-loop spawns a fresh coding-CLI process per round (e.g. cat prompt.md | claude --print). Each round pays a full cold-start cost: process startup, system-prompt re-ingestion, MCP tool re-discovery, and a cold LLM prompt cache.

Propose switching the agent loop to a long-lived CLI subprocess per agent, where the Rust worker bridges Redis ⇄ the CLI's stdin/stdout using the CLI's streaming-JSON protocol. Messages arriving in Redis get written to the CLI's stdin; assistant events and completion markers are parsed from stdout and mapped onto the existing event stream (#53).

This preserves the round / turn abstraction but eliminates cold-start-per-round latency, preserves prompt cache across turns, and lets urgent messages interrupt in near-real-time instead of waiting for the next loop cycle.

Current Behavior

src/main.rsCommands::AgentLoop:

loop {
    if round >= max_rounds { break; }
    // Drain Redis inbox, classify, build prompt Markdown
    std::fs::write(&prompt_file, &prompt)?;
    let shell_cmd = build_cli_command(&cli_name, &cli_cmd, &prompt_file);
    let status = Command::new("sh").arg("-c").arg(&shell_cmd)
        .stdin(Stdio::null())
        .stdout(log_file).stderr(log_file)
        .status();
    // ... requeue on failure, increment round, sleep 1s
}

Per round this pays:

  • ~2–5s of process + CLI cold start
  • Full reprocessing of system prompt, repo context, and MCP tool list
  • Cold LLM KV / prompt cache
  • Redis inbox is read only at the top of each round — urgent messages that arrive while the CLI is running wait until the next iteration
  • tt interrupt / tt kill are only honored at round boundaries

Proposed Behavior

Introduce a CodingAgent abstraction with a streaming I/O lifecycle:

trait CodingAgent: Send {
    async fn send(&mut self, msg: AgentInput) -> Result<()>;
    fn events(&mut self) -> impl Stream<Item = AgentEvent>;
    async fn shutdown(self, mode: ShutdownMode) -> Result<ExitStatus>;
}

enum AgentInput {
    UserMessage(String),       // batched inbox turn
    UrgentMessage(String),     // injected mid-turn if supported
    Cancel,                    // interrupt current turn
}

enum AgentEvent {
    TurnStarted,
    AssistantDelta(String),
    ToolCall { name: String, args: serde_json::Value },
    TurnCompleted { summary: Option<String> },
    AwaitingInput,
    SessionError(String),
    Exited(ExitStatus),
}

Two backends:

  1. OneShotAgent — wraps the current sh -c "cat prompt | cli --print" behavior. Default for CLIs without a streaming protocol. Preserves today's semantics exactly.
  2. StreamingAgent — keeps the CLI alive as a child process with stdin/stdout piped. Uses the CLI's JSON protocol where available:
    • Claude Code: --input-format stream-json --output-format stream-json
    • Codex: codex proto (JSON-line protocol)
    • Auggie: --output-format json (+ stdin continuous mode — to validate)
    • Aider / Gemini / Copilot / Cursor: stay on OneShotAgent for now

The agent loop becomes event-driven:

loop {
    tokio::select! {
        // Redis inbox has new messages
        inbox = channel.wait_for_messages(agent_id) => {
            let batch = build_turn_input(inbox);
            agent.send(AgentInput::UserMessage(batch)).await?;
        }
        // CLI emitted an event
        Some(event) = agent.events().next() => {
            map_event_to_redis_stream(event).await?;
            if matches!(event, AgentEvent::TurnCompleted {..}) {
                rounds_completed += 1;
            }
        }
        // Control signals (interrupt, kill, pause)
        Some(sig) = control.recv() => handle_signal(sig, &mut agent).await?,
        // Idle timeout → drain
        _ = idle_timer.tick(), if idle() => {
            agent.shutdown(ShutdownMode::Graceful).await?;
            break;
        }
    }
}

Benefits

  1. Latency: eliminate 2–5s of per-round cold start (compounds with max_rounds)
  2. Token cost / quality: LLM keeps prompt cache warm; system prompt & repo context processed once
  3. Real-time inbox handling: urgent messages can be injected mid-turn rather than deferred
  4. Responsive control plane: tt interrupt / tt kill take effect immediately, not at round boundary
  5. Better idle/drain alignment: the Draining/Cold states added in Adopt RAR worker lifecycle state machine for agents #54 become backed by an actual process lifecycle
  6. Richer observability: typed stdout events flow into the Add Redis Stream-based event log for real-time progress #53 event stream instead of opaque per-round .log files
  7. Feeds A2A streaming (Add A2A (Agent-to-Agent) protocol endpoint to townhall #62) and SSE (Real-time event streaming via SSE and WebSocket for mobile clients #59) with per-turn granularity instead of per-round

Costs / Risks

  1. Per-CLI adapter: each streaming CLI needs a protocol-specific adapter
  2. Session fragility: worker crash loses the session → mitigate with the CLI's --resume <session-id>, persisted on the agent hash
  3. Scale-to-zero interaction (Scale-to-zero agent workers with queue-depth scaling signal #57): idle timeout must cleanly shut down the child (EOF stdin → wait → SIGTERM) so the container exits 0
  4. Parse bugs / hangs: waiting for a sentinel event can deadlock the worker → hard timeouts + protocol versioning
  5. Backpressure: high-volume stdout must be consumed eagerly to avoid filling the pipe
  6. rounds_completed semantics shift: today a "round" = one CLI invocation; in the new model a "round" = one assistant turn. Keep the counter and the per-turn log file ({name}_round_{N}.log) to preserve tt status --deep, docs, tests.
  7. Tests heavily exercise the one-shot path; both backends must be covered.

Phased Plan

Phase A — Adapter abstraction (no behavior change)

  • Add src/agent_runtime/mod.rs with CodingAgent trait, AgentInput, AgentEvent, ShutdownMode
  • Implement OneShotAgent that wraps today's build_cli_command + sh -c path
  • Refactor Commands::AgentLoop to drive a CodingAgent instead of calling Command::status() directly
  • All existing tests must pass unchanged
  • Feature flag: [agent].persistent = false (default)

Phase B — Streaming Claude adapter

  • Implement StreamingAgent for Claude Code using --input-format stream-json --output-format stream-json
  • Store the session ID on the agent hash for --resume on worker restart
  • Opt-in via [agent].persistent = true on a per-CLI basis
  • Tests: happy-path turn, urgent-interrupt injection, unexpected CLI exit, stdin-close graceful shutdown

Phase C — Event-driven agent loop

  • Replace the for round in 0..max_rounds loop with a tokio::select! over inbox / stdout / control / idle
  • Define "round" = "one TurnCompleted event"; keep the rounds_completed counter and --max-rounds cap
  • Preserve .tt/logs/{name}_round_{N}.log by capturing per-turn stdout/stderr into per-turn files

Phase D — Event stream mapping

Phase E — Scale-to-zero & drain alignment (#57, #54)

  • ShutdownMode::Graceful = close stdin, wait drain_timeout_secs, SIGTERM, SIGKILL
  • On idle timeout, transition Working → Draining → Stopped via the real process lifecycle
  • Ensure the worker process exits 0 for the autoscaler

Phase F — Additional adapters

  • Codex via codex proto
  • Auggie streaming (if/when it ships a stable stdin-continuous + JSON mode)
  • Aider / Gemini / Copilot / Cursor stay on OneShotAgent

Acceptance Criteria

  • CodingAgent trait + OneShotAgent backend land with zero behavior change under default config
  • StreamingAgent Claude backend works end-to-end (spawn → multiple turns → clean shutdown)
  • Agent loop is event-driven; tt interrupt takes effect within 1s instead of at round boundary
  • Urgent Redis messages are injected into an in-flight turn when the backend supports it
  • rounds_completed, --max-rounds, and .tt/logs/{name}_round_{N}.log keep working (redefined as "turns")
  • CLI stdout events land on the Add Redis Stream-based event log for real-time progress #53 event stream with structured EventTypes
  • Session ID stored on the agent hash; worker restart resumes the CLI via --resume
  • Idle timeout in the persistent model produces a clean exit 0 compatible with Scale-to-zero agent workers with queue-depth scaling signal #57 autoscaling
  • [agent].persistent config toggle per CLI; default false during rollout
  • Integration tests cover: graceful shutdown, crash recovery, urgent injection, parse timeout, per-CLI adapter selection
  • README / docs updated with the one-shot vs streaming model diagram

Open Questions

  1. Should --max-rounds stay, become a token/cost cap, or both?
  2. Where do session IDs live for --resume? Agent hash vs. a dedicated key namespace?
  3. Does the Commands::Conductor path (which execs the CLI for interactive use) adopt CodingAgent or stay separate?
  4. Per-agent event stream vs. town-wide stream for high-volume streaming events — throttle or split?
  5. How do we surface "awaiting input" to input-required (A2A Add A2A (Agent-to-Agent) protocol endpoint to townhall #62) before the A2A endpoint lands — temporary REST probe or defer?

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions