Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
c16e57a
feat(runtime/acp): scaffold multi-session-per-agentrun (Manager.sessi…
wjueyao May 18, 2026
c17f298
feat(runtime/acp): add PromptSession/CancelSession/SetModelSession va…
wjueyao May 18, 2026
36594b8
feat(agentrun/api): wire types + method constants for multi-session
wjueyao May 18, 2026
e933264
feat(agentrun): wire RPC layer for multi-session + client methods
wjueyao May 18, 2026
601260e
feat(ari): agentrun/new-session, agentrun/end-session, agentrun/list-…
wjueyao May 18, 2026
e46e1b5
feat(massctl, ari/client): expose multi-session via CLI + ARI client API
wjueyao May 18, 2026
926547e
test(runtime/acp): bookkeeping tests for multi-session map operations
wjueyao May 18, 2026
70fb471
fix(ari/api): preserve camelCase YAML keys via explicit yaml tags
wjueyao May 18, 2026
36a0245
feat(jsonrpc): OptionalUnaryCommand for absent-params back-compat
wjueyao May 18, 2026
9f66f50
refactor(runtime/acp): single-point sessionID resolution + inflight g…
wjueyao May 18, 2026
b4c1dc7
fix(runtime/acp): process-wide Phase + clear sessions on teardown
wjueyao May 19, 2026
fc11f49
fix(ari): align session method names + recovery gate on session ops
wjueyao May 19, 2026
9e378fb
refactor(runtime/acp): cwd-required, drop stale doc TODO, rename Sess…
wjueyao May 19, 2026
265bcac
fix: per-session sessionId stamping for watch filtering (B3)
wjueyao May 19, 2026
c96eca8
fix(runtime/acp): guard activePrompts decrement against clearSessions…
wjueyao May 19, 2026
d919a5b
fix(runtime/acp): don't clobber Stopped Phase from late prompt writeS…
wjueyao May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/mass/commands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ func run(cmd *cobra.Command, bundle, stateDir, permissions, id string, logCfg *l
}

if cfg.Session.SystemPrompt != "" {
trans.NotifyTurnStart()
trans.NotifyUserPrompt([]runapi.ContentBlock{runapi.TextBlock(acpruntime.BuildSeedSystemPrompt(cfg.Session.SystemPrompt))})
// Seed prompt targets the initial session — empty sessionID resolves
// to it inside the Translator.
trans.NotifyTurnStart("")
trans.NotifyUserPrompt("", []runapi.ContentBlock{runapi.TextBlock(acpruntime.BuildSeedSystemPrompt(cfg.Session.SystemPrompt))})
resp, err := mgr.SeedSystemPrompt(ctx)
stopReason := "error"
if err == nil {
Expand All @@ -151,7 +153,7 @@ func run(cmd *cobra.Command, bundle, stateDir, permissions, id string, logCfg *l
if err != nil {
trans.NotifyError(err.Error())
}
trans.NotifyTurnEnd(acp.StopReason(stopReason))
trans.NotifyTurnEnd("", acp.StopReason(stopReason))
if err != nil {
return fmt.Errorf("agent-run: seed system prompt: %w", err)
}
Expand Down
14 changes: 14 additions & 0 deletions cmd/massctl/commands/agent/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ func (m *mockAgentRunOps) TaskRetry(context.Context, *pkgariapi.AgentRunTaskRetr
return nil, nil
}

// Multi-session stubs (no-ops — agent command tests don't exercise them).
func (m *mockAgentRunOps) PromptSession(context.Context, pkgariapi.ObjectKey, string, []runapi.ContentBlock) (*pkgariapi.AgentRunPromptResult, error) {
return nil, nil
}
func (m *mockAgentRunOps) NewSession(context.Context, *pkgariapi.AgentRunNewSessionParams) (*pkgariapi.AgentRunNewSessionResult, error) {
return nil, nil
}
func (m *mockAgentRunOps) EndSession(context.Context, pkgariapi.ObjectKey, string) error {
return nil
}
func (m *mockAgentRunOps) ListSessions(context.Context, pkgariapi.ObjectKey) (*pkgariapi.AgentRunListSessionsResult, error) {
return nil, nil
}

// mock WorkspaceOps (stub — not used in agent tests)
type mockWorkspaceOps struct{}

Expand Down
3 changes: 3 additions & 0 deletions cmd/massctl/commands/agentrun/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ Poll with: massctl ar get <name> -w <workspace>`,
cmd.AddCommand(newTaskCmd(getClient))
cmd.AddCommand(newChatCmd(getClient))
cmd.AddCommand(newDebugCmd())
cmd.AddCommand(newNewSessionCmd(getClient))
cmd.AddCommand(newEndSessionCmd(getClient))
cmd.AddCommand(newListSessionsCmd(getClient))
return cmd
}
46 changes: 46 additions & 0 deletions cmd/massctl/commands/agentrun/end_session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package agentrun

import (
"context"
"fmt"

"github.com/spf13/cobra"

"github.com/zoumo/mass/cmd/massctl/commands/cliutil"
pkgariapi "github.com/zoumo/mass/pkg/ari/api"
)

// newEndSessionCmd implements “massctl agentrun end-session“.
//
// Releases runtime tracking of a session id. The agent process keeps its
// per-session state until cancelled or process exits — ACP has no
// explicit end-session RPC. Refuses to end an agent's initial session
// (the one created by the agentrun handshake); kill the whole agent
// instead via “stop“.
func newEndSessionCmd(getClient cliutil.ClientFn) *cobra.Command {
var ws, sessionID string
cmd := &cobra.Command{
Use: "end-session name",
Short: "Release runtime tracking of an ACP session",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := getClient()
if err != nil {
return err
}
defer client.Close()

if err := client.AgentRuns().EndSession(context.Background(),
pkgariapi.ObjectKey{Workspace: ws, Name: args[0]}, sessionID); err != nil {
return fmt.Errorf("ending session %s on %s/%s: %w", sessionID, ws, args[0], err)
}
fmt.Fprintf(cmd.OutOrStdout(), "session %s ended on %s/%s\n", sessionID, ws, args[0])
return nil
},
}
cmd.Flags().StringVarP(&ws, "workspace", "w", "", "Workspace name (required)")
cmd.Flags().StringVar(&sessionID, "session-id", "", "Session id to end (required) — must not be the initial session")
_ = cmd.MarkFlagRequired("workspace")
_ = cmd.MarkFlagRequired("session-id")
return cmd
}
46 changes: 46 additions & 0 deletions cmd/massctl/commands/agentrun/list_sessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package agentrun

import (
"context"
"fmt"

"github.com/spf13/cobra"

"github.com/zoumo/mass/cmd/massctl/commands/cliutil"
pkgariapi "github.com/zoumo/mass/pkg/ari/api"
)

// newListSessionsCmd implements “massctl agentrun list-sessions“.
//
// Prints active session ids on an agent-run, one per line. Useful for
// scripts that drive multi-session pools and need to enumerate or
// clean up sessions.
func newListSessionsCmd(getClient cliutil.ClientFn) *cobra.Command {
var ws string
cmd := &cobra.Command{
Use: "list-sessions name",
Aliases: []string{"ls-sessions"},
Short: "List active ACP session ids on an agent-run",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := getClient()
if err != nil {
return err
}
defer client.Close()

out, err := client.AgentRuns().ListSessions(context.Background(),
pkgariapi.ObjectKey{Workspace: ws, Name: args[0]})
if err != nil {
return fmt.Errorf("listing sessions on %s/%s: %w", ws, args[0], err)
}
for _, id := range out.SessionIDs {
fmt.Fprintln(cmd.OutOrStdout(), id)
}
return nil
},
}
cmd.Flags().StringVarP(&ws, "workspace", "w", "", "Workspace name (required)")
_ = cmd.MarkFlagRequired("workspace")
return cmd
}
17 changes: 17 additions & 0 deletions cmd/massctl/commands/agentrun/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ func (m *mockAgentRunOps) TaskRetry(ctx context.Context, params *pkgariapi.Agent
return &pkgariapi.AgentTask{}, nil
}

// Multi-session stubs — tests in this package don't exercise these yet.
func (m *mockAgentRunOps) PromptSession(ctx context.Context, key pkgariapi.ObjectKey, sessionID string, prompt []runapi.ContentBlock) (*pkgariapi.AgentRunPromptResult, error) {
return m.Prompt(ctx, key, prompt)
}

func (m *mockAgentRunOps) NewSession(_ context.Context, _ *pkgariapi.AgentRunNewSessionParams) (*pkgariapi.AgentRunNewSessionResult, error) {
return &pkgariapi.AgentRunNewSessionResult{SessionID: "mock-session"}, nil
}

func (m *mockAgentRunOps) EndSession(_ context.Context, _ pkgariapi.ObjectKey, _ string) error {
return nil
}

func (m *mockAgentRunOps) ListSessions(_ context.Context, _ pkgariapi.ObjectKey) (*pkgariapi.AgentRunListSessionsResult, error) {
return &pkgariapi.AgentRunListSessionsResult{}, nil
}

// ── mock WorkspaceOps (stub — not used in agentrun tests) ────────────────────

type mockWorkspaceOps struct{}
Expand Down
60 changes: 60 additions & 0 deletions cmd/massctl/commands/agentrun/new_session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package agentrun

import (
"context"
"fmt"

"github.com/spf13/cobra"

"github.com/zoumo/mass/cmd/massctl/commands/cliutil"
pkgariapi "github.com/zoumo/mass/pkg/ari/api"
)

// newNewSessionCmd implements “massctl agentrun new-session“.
//
// Opens an additional ACP session on a running agent-run — agent process
// is reused (no fork+exec), but the session has its own cwd and state.
// Returns the sessionId to stdout; pass it via subsequent “prompt
// --session-id“ etc.
func newNewSessionCmd(getClient cliutil.ClientFn) *cobra.Command {
var ws, cwd string
cmd := &cobra.Command{
Use: "new-session name",
Short: "Open an additional ACP session on a running agent-run",
Long: `Opens an additional ACP session on the running agent process.

The session is multiplexed onto the existing process — no new fork/exec
— but has its own cwd, model state, and message history. Returns the
agent-issued sessionId on stdout; pass it to subsequent prompt / cancel
/ end-session via --session-id.`,
Example: ` # Open a fresh session scoped to /tmp/case-7
sid=$(massctl ar new-session worker -w my-ws --cwd /tmp/case-7)
massctl ar prompt worker -w my-ws --session-id "$sid" --text "Fix the bug"
massctl ar end-session worker -w my-ws --session-id "$sid"`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := getClient()
if err != nil {
return err
}
defer client.Close()

out, err := client.AgentRuns().NewSession(context.Background(), &pkgariapi.AgentRunNewSessionParams{
Workspace: ws,
Name: args[0],
Cwd: cwd,
})
if err != nil {
return fmt.Errorf("opening session for %s/%s: %w", ws, args[0], err)
}
// Print sessionId only (script-friendly).
fmt.Fprintln(cmd.OutOrStdout(), out.SessionID)
return nil
},
}
cmd.Flags().StringVarP(&ws, "workspace", "w", "", "Workspace name (required)")
cmd.Flags().StringVar(&cwd, "cwd", "", "Session cwd (required) — the working directory the agent uses for this session")
_ = cmd.MarkFlagRequired("workspace")
_ = cmd.MarkFlagRequired("cwd")
return cmd
}
26 changes: 21 additions & 5 deletions cmd/massctl/commands/agentrun/prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

func newPromptCmd(getClient cliutil.ClientFn) *cobra.Command {
var (
ws string
text string
wait bool
ws string
text string
wait bool
sessionID string
)
cmd := &cobra.Command{
Use: "prompt name",
Expand All @@ -37,7 +38,9 @@ func newPromptCmd(getClient cliutil.ClientFn) *cobra.Command {
key := pkgariapi.ObjectKey{Workspace: ws, Name: name}

if !wait {
result, err := client.AgentRuns().Prompt(ctx, key, []runapi.ContentBlock{runapi.TextBlock(text)})
// PromptSession with empty sessionID == Prompt — single entry point.
result, err := client.AgentRuns().PromptSession(ctx, key, sessionID,
[]runapi.ContentBlock{runapi.TextBlock(text)})
if err != nil {
return err
}
Expand Down Expand Up @@ -75,12 +78,20 @@ func newPromptCmd(getClient cliutil.ClientFn) *cobra.Command {

// Send prompt (fire-and-forget).
if err := runClient.SendPrompt(ctx, &runapi.SessionPromptParams{
Prompt: []runapi.ContentBlock{runapi.TextBlock(text)},
SessionID: sessionID,
Prompt: []runapi.ContentBlock{runapi.TextBlock(text)},
}); err != nil {
return fmt.Errorf("send_prompt: %w", err)
}

// Collect agent_message text until turn_end.
//
// When --session-id is set, filter events to that session — without
// this, two concurrent sessions on the same agent will cross-talk
// (agent_message from session B counted as session A's, turn_end
// from B exits early). When sessionID is empty (single-session
// legacy default), every event is accepted; Translator stamps
// initial-session events with the initial session id.
var parts []string
timeout := time.After(5 * time.Minute)
for {
Expand All @@ -89,6 +100,9 @@ func newPromptCmd(getClient cliutil.ClientFn) *cobra.Command {
if !ok {
return fmt.Errorf("event stream closed before turn_end")
}
if sessionID != "" && ev.SessionID != sessionID {
continue
}
if ev.Type == runapi.EventTypeTurnEnd {
fmt.Fprintln(cmd.OutOrStdout(), strings.Join(parts, ""))
return nil
Expand All @@ -109,6 +123,8 @@ func newPromptCmd(getClient cliutil.ClientFn) *cobra.Command {
cmd.Flags().StringVarP(&ws, "workspace", "w", "", "Workspace name (required)")
cmd.Flags().StringVar(&text, "text", "", "Prompt text (required)")
cmd.Flags().BoolVar(&wait, "wait", false, "Wait for turn to complete and print agent response")
cmd.Flags().StringVar(&sessionID, "session-id", "",
"Session id to prompt (defaults to the agent's initial session). Use new-session to open one.")
_ = cmd.MarkFlagRequired("workspace")
_ = cmd.MarkFlagRequired("text")
return cmd
Expand Down
14 changes: 14 additions & 0 deletions cmd/massctl/commands/workspace/create/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func (m *mockAgentRunOps) TaskRetry(context.Context, *pkgariapi.AgentRunTaskRetr
return &pkgariapi.AgentTask{}, nil
}

// Multi-session stubs (no-ops).
func (m *mockAgentRunOps) PromptSession(context.Context, pkgariapi.ObjectKey, string, []runapi.ContentBlock) (*pkgariapi.AgentRunPromptResult, error) {
return &pkgariapi.AgentRunPromptResult{}, nil
}
func (m *mockAgentRunOps) NewSession(context.Context, *pkgariapi.AgentRunNewSessionParams) (*pkgariapi.AgentRunNewSessionResult, error) {
return &pkgariapi.AgentRunNewSessionResult{}, nil
}
func (m *mockAgentRunOps) EndSession(context.Context, pkgariapi.ObjectKey, string) error {
return nil
}
func (m *mockAgentRunOps) ListSessions(context.Context, pkgariapi.ObjectKey) (*pkgariapi.AgentRunListSessionsResult, error) {
return &pkgariapi.AgentRunListSessionsResult{}, nil
}

// ── mock WorkspaceOps (stub — not used in create tests) ──────────────────────

type mockWorkspaceOps struct{}
Expand Down
14 changes: 14 additions & 0 deletions cmd/massctl/commands/workspace/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ func (m *mockAgentRunOps) TaskRetry(context.Context, *pkgariapi.AgentRunTaskRetr
return &pkgariapi.AgentTask{}, nil
}

// Multi-session stubs (no-ops — workspace command tests don't exercise them).
func (m *mockAgentRunOps) PromptSession(context.Context, pkgariapi.ObjectKey, string, []runapi.ContentBlock) (*pkgariapi.AgentRunPromptResult, error) {
return &pkgariapi.AgentRunPromptResult{}, nil
}
func (m *mockAgentRunOps) NewSession(context.Context, *pkgariapi.AgentRunNewSessionParams) (*pkgariapi.AgentRunNewSessionResult, error) {
return &pkgariapi.AgentRunNewSessionResult{}, nil
}
func (m *mockAgentRunOps) EndSession(context.Context, pkgariapi.ObjectKey, string) error {
return nil
}
func (m *mockAgentRunOps) ListSessions(context.Context, pkgariapi.ObjectKey) (*pkgariapi.AgentRunListSessionsResult, error) {
return &pkgariapi.AgentRunListSessionsResult{}, nil
}

// ── mock SystemOps (stub — not used in workspace tests) ────────────────────────

type mockSystemOps struct{}
Expand Down
8 changes: 8 additions & 0 deletions pkg/agentrun/api/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ const (
MethodSessionSetModel = "session/set_model"
MethodRuntimePhase = "runtime/status"
MethodRuntimeStop = "runtime/stop"

// Multi-session RPCs — open / end additional ACP sessions on a long-
// lived agent process so callers can multiplex tasks without
// fork+exec per task. See pkg/agentrun/runtime/acp/runtime.go's
// NewSession / EndSession for the underlying runtime contract.
MethodSessionNew = "session/new"
MethodSessionEnd = "session/end"
MethodSessionList = "session/list"
)

// Run notification methods.
Expand Down
Loading