diff --git a/go.mod b/go.mod index 35aee15..5c1a486 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ActiveState/termtest/xpty v0.6.0 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d github.com/charmbracelet/bubbletea v1.3.4 + github.com/coder/acp-go-sdk v0.6.3 github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 github.com/coder/quartz v0.1.2 github.com/danielgtaylor/huma/v2 v2.32.0 diff --git a/go.sum b/go.sum index a98ca56..1d70abb 100644 --- a/go.sum +++ b/go.sum @@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ= +github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko= github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8= github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4= github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs= diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go new file mode 100644 index 0000000..f58e7ff --- /dev/null +++ b/x/acpio/acp_conversation.go @@ -0,0 +1,247 @@ +package acpio + +import ( + "context" + "log/slog" + "slices" + "strings" + "sync" + + st "github.com/coder/agentapi/lib/screentracker" + "github.com/coder/quartz" +) + +// Compile-time assertion that ACPConversation implements st.Conversation +var _ st.Conversation = (*ACPConversation)(nil) + +// ChunkableAgentIO extends AgentIO with chunk callback support for streaming responses. +// This interface is what ACPConversation needs from its AgentIO implementation. +type ChunkableAgentIO interface { + st.AgentIO + SetOnChunk(fn func(chunk string)) +} + +// ACPConversation tracks conversations with ACP-based agents. +// Unlike PTY-based Conversation, ACP has blocking writes where the +// response is complete when Write() returns. +type ACPConversation struct { + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + agentIO ChunkableAgentIO + messages []st.ConversationMessage + prompting bool // true while agent is processing + streamingResponse strings.Builder + logger *slog.Logger + emitter st.Emitter + initialPrompt []st.MessagePart + clock quartz.Clock +} + +// noopEmitter is a no-op implementation of Emitter for when no emitter is provided. +type noopEmitter struct{} + +func (noopEmitter) EmitMessages([]st.ConversationMessage) {} +func (noopEmitter) EmitStatus(st.ConversationStatus) {} +func (noopEmitter) EmitScreen(string) {} + +// NewACPConversation creates a new ACPConversation. +// If emitter is provided, it will receive events when messages/status/screen change. +// If clock is nil, a real clock will be used. +func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *slog.Logger, initialPrompt []st.MessagePart, emitter st.Emitter, clock quartz.Clock) *ACPConversation { + if logger == nil { + logger = slog.Default() + } + if clock == nil { + clock = quartz.NewReal() + } + if emitter == nil { + emitter = noopEmitter{} + } + ctx, cancel := context.WithCancel(ctx) + c := &ACPConversation{ + ctx: ctx, + cancel: cancel, + agentIO: agentIO, + logger: logger, + initialPrompt: initialPrompt, + emitter: emitter, + clock: clock, + } + return c +} + +// Messages returns the conversation history. +func (c *ACPConversation) Messages() []st.ConversationMessage { + c.mu.Lock() + defer c.mu.Unlock() + return slices.Clone(c.messages) +} + +// Send sends a message to the agent asynchronously. +// It returns immediately after recording the user message and starts +// the agent request in a background goroutine. Returns an error if +// a message is already being processed. +func (c *ACPConversation) Send(messageParts ...st.MessagePart) error { + message := "" + for _, part := range messageParts { + message += part.String() + } + + // Validate whitespace BEFORE trimming (match PTY behavior) + if message != strings.TrimSpace(message) { + return st.ErrMessageValidationWhitespace + } + + if message == "" { + return st.ErrMessageValidationEmpty + } + + // Check if already prompting and set state atomically + c.mu.Lock() + if c.prompting { + c.mu.Unlock() + return st.ErrMessageValidationChanging + } + c.messages = append(c.messages, st.ConversationMessage{ + Id: len(c.messages), + Role: st.ConversationRoleUser, + Message: message, + Time: c.clock.Now(), + }) + // Add placeholder for streaming agent response + c.messages = append(c.messages, st.ConversationMessage{ + Id: len(c.messages), + Role: st.ConversationRoleAgent, + Message: "", + Time: c.clock.Now(), + }) + c.streamingResponse.Reset() + c.prompting = true + status := c.statusLocked() + c.mu.Unlock() + + // Emit status change to "running" before starting the prompt + c.emitter.EmitStatus(status) + + c.logger.Debug("ACPConversation sending message", "message", message) + + // Run the blocking write in a goroutine so HTTP returns immediately + go c.executePrompt(messageParts) + + return nil +} + +// Start sets up chunk handling and sends the initial prompt if provided. +func (c *ACPConversation) Start(ctx context.Context) { + // Wire up the chunk callback for streaming + c.agentIO.SetOnChunk(c.handleChunk) + + // Send initial prompt if provided + if len(c.initialPrompt) > 0 { + err := c.Send(c.initialPrompt...) + if err != nil { + c.logger.Error("ACPConversation failed to send initial prompt", "error", err) + } + } else { + // No initial prompt means we start in stable state + c.emitter.EmitStatus(c.Status()) + } +} + +// Status returns the current conversation status. +func (c *ACPConversation) Status() st.ConversationStatus { + c.mu.Lock() + defer c.mu.Unlock() + return c.statusLocked() +} + +// statusLocked returns the status without acquiring the lock (caller must hold lock). +func (c *ACPConversation) statusLocked() st.ConversationStatus { + if c.prompting { + return st.ConversationStatusChanging // agent is processing + } + return st.ConversationStatusStable +} + +// Stop cancels any in-progress operations. +func (c *ACPConversation) Stop() { + c.cancel() +} + +// Text returns the current streaming response text. +func (c *ACPConversation) Text() string { + c.mu.Lock() + defer c.mu.Unlock() + return c.streamingResponse.String() +} + +// handleChunk is called for each streaming chunk from the agent. +func (c *ACPConversation) handleChunk(chunk string) { + c.mu.Lock() + c.streamingResponse.WriteString(chunk) + // Update the last message (the streaming agent response) + if len(c.messages) > 0 { + c.messages[len(c.messages)-1].Message = c.streamingResponse.String() + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) +} + +// executePrompt runs the actual agent request in background +func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) { + var err error + for _, part := range messageParts { + if c.ctx.Err() != nil { + err = c.ctx.Err() + break + } + if partErr := part.Do(c.agentIO); partErr != nil { + err = partErr + break + } + } + + c.mu.Lock() + c.prompting = false + + if err != nil { + c.logger.Error("ACPConversation message failed", "error", err) + // Remove the agent's streaming message on error (may be empty or partial) + if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { + c.messages = c.messages[:len(c.messages)-1] + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) + return + } + + // Final response should already be in the last message via streaming + // but ensure it's finalized + response := c.streamingResponse.String() + if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { + c.messages[len(c.messages)-1].Message = strings.TrimSpace(response) + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) + + c.logger.Debug("ACPConversation message complete", "responseLen", len(response)) +} diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go new file mode 100644 index 0000000..0811bc3 --- /dev/null +++ b/x/acpio/acp_conversation_test.go @@ -0,0 +1,474 @@ +package acpio_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/coder/quartz" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/agentapi/lib/screentracker" + "github.com/coder/agentapi/x/acpio" +) + +// mockAgentIO implements acpio.ChunkableAgentIO for testing. +// It provides a channel-based synchronization mechanism to test ACPConversation +// without relying on time.Sleep. +type mockAgentIO struct { + mu sync.Mutex + written []byte + screenContent string + onChunkFn func(chunk string) + + // Control behavior + writeErr error + // writeBlock is a channel that, if non-nil, will cause Write to block until closed. + // This allows tests to control when the write completes. + writeBlock chan struct{} + // writeStarted is closed when Write begins (before blocking on writeBlock). + // This allows tests to synchronize on the write starting. + writeStarted chan struct{} +} + +// mockEmitter implements screentracker.Emitter for testing. +type mockEmitter struct { + mu sync.Mutex + messagesCalls int + statusCalls int + screenCalls int + lastMessages []screentracker.ConversationMessage + lastStatus screentracker.ConversationStatus + lastScreen string +} + +func newMockEmitter() *mockEmitter { + return &mockEmitter{} +} + +func (m *mockEmitter) EmitMessages(messages []screentracker.ConversationMessage) { + m.mu.Lock() + defer m.mu.Unlock() + m.messagesCalls++ + m.lastMessages = messages +} + +func (m *mockEmitter) EmitStatus(status screentracker.ConversationStatus) { + m.mu.Lock() + defer m.mu.Unlock() + m.statusCalls++ + m.lastStatus = status +} + +func (m *mockEmitter) EmitScreen(screen string) { + m.mu.Lock() + defer m.mu.Unlock() + m.screenCalls++ + m.lastScreen = screen +} + +func (m *mockEmitter) TotalCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.messagesCalls + m.statusCalls + m.screenCalls +} + +func newMockAgentIO() *mockAgentIO { + return &mockAgentIO{} +} + +func (m *mockAgentIO) Write(data []byte) (int, error) { + // Signal that write has started + m.mu.Lock() + started := m.writeStarted + block := m.writeBlock + m.mu.Unlock() + + if started != nil { + close(started) + } + + // Block if configured to do so (for testing concurrent sends) + if block != nil { + <-block + } + + m.mu.Lock() + defer m.mu.Unlock() + if m.writeErr != nil { + return 0, m.writeErr + } + m.written = append(m.written, data...) + return len(data), nil +} + +func (m *mockAgentIO) ReadScreen() string { + m.mu.Lock() + defer m.mu.Unlock() + return m.screenContent +} + +func (m *mockAgentIO) SetOnChunk(fn func(chunk string)) { + m.mu.Lock() + defer m.mu.Unlock() + m.onChunkFn = fn +} + +// SimulateChunks simulates the agent sending streaming chunks. +// This triggers the onChunk callback as if the agent was responding. +func (m *mockAgentIO) SimulateChunks(chunks ...string) { + m.mu.Lock() + fn := m.onChunkFn + m.mu.Unlock() + for _, chunk := range chunks { + if fn != nil { + fn(chunk) + } + } +} + +// GetWritten returns all data written to the agent. +func (m *mockAgentIO) GetWritten() []byte { + m.mu.Lock() + defer m.mu.Unlock() + return append([]byte(nil), m.written...) +} + +// BlockWrite sets up blocking for the next Write call and returns: +// - started: a channel that is closed when Write begins +// - done: a channel to close to unblock the Write +func (m *mockAgentIO) BlockWrite() (started chan struct{}, done chan struct{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.writeStarted = make(chan struct{}) + m.writeBlock = make(chan struct{}) + return m.writeStarted, m.writeBlock +} + +func Test_NewACPConversation(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + require.NotNil(t, conv) +} + +func Test_Messages_InitiallyEmpty(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + messages := conv.Messages() + + assert.Empty(t, messages) +} + +func Test_Status_InitiallyStable(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + status := conv.Status() + + assert.Equal(t, screentracker.ConversationStatusStable, status) +} + +func Test_Send_AddsUserMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize with the goroutine + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + err := conv.Send(screentracker.MessagePartText{Content: "hello"}) + require.NoError(t, err) + + // Wait for the write goroutine to start + <-started + + messages := conv.Messages() + require.Len(t, messages, 2) // user message + placeholder agent message + + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "hello", messages[0].Message) + assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Send_RejectsEmptyMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + err := conv.Send(screentracker.MessagePartText{Content: ""}) + + assert.ErrorIs(t, err, screentracker.ErrMessageValidationEmpty) +} + +func Test_Send_RejectsWhitespace(t *testing.T) { + tests := []struct { + name string + content string + }{ + {"leading space", " hello"}, + {"trailing space", "hello "}, + {"leading newline", "\nhello"}, + {"trailing newline", "hello\n"}, + {"both sides", " hello "}, + {"newlines both sides", "\nhello\n"}, + {"leading tab", "\thello"}, + {"trailing tab", "hello\t"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + err := conv.Send(screentracker.MessagePartText{Content: tt.content}) + + assert.ErrorIs(t, err, screentracker.ErrMessageValidationWhitespace) + }) + } +} + +func Test_Send_RejectsDuplicateSend(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so it doesn't complete immediately + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // First send should succeed + err := conv.Send(screentracker.MessagePartText{Content: "first"}) + require.NoError(t, err) + + // Wait for the write to start (ensuring we're in "prompting" state) + <-started + + // Second send while first is processing should fail + err = conv.Send(screentracker.MessagePartText{Content: "second"}) + assert.ErrorIs(t, err, screentracker.ErrMessageValidationChanging) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Status_ChangesWhileProcessing(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can observe status changes + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Initially stable + assert.Equal(t, screentracker.ConversationStatusStable, conv.Status()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Status should be changing while processing + assert.Equal(t, screentracker.ConversationStatusChanging, conv.Status()) + + // Unblock the write + close(done) + + // Give the goroutine a chance to complete (status update happens after Write returns) + require.Eventually(t, func() bool { + return conv.Status() == screentracker.ConversationStatusStable + }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") +} + +func Test_Text_ReturnsStreamingContent(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate streaming during processing + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Initially empty + assert.Equal(t, "", conv.Text()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "question"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Simulate streaming chunks from agent + mock.SimulateChunks("Hello", " ", "world!") + + // Text should contain the streamed content + assert.Equal(t, "Hello world!", conv.Text()) + + // The last message should also be updated + messages := conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, "Hello world!", messages[1].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Emitter_CalledOnChanges(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate chunks during processing + started, done := mock.BlockWrite() + + emitter := newMockEmitter() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, emitter, mClock) + conv.Start(context.Background()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Simulate chunks - each should trigger emitter calls + mock.SimulateChunks("chunk1") + mock.SimulateChunks("chunk2") + + emitter.mu.Lock() + messagesCallsBeforeComplete := emitter.messagesCalls + emitter.mu.Unlock() + + // Should have emit calls from chunks (each chunk emits messages, status, and screen) + assert.Equal(t, 2, messagesCallsBeforeComplete) + + // Unblock the write to complete processing + close(done) + + // Wait for completion emit + require.Eventually(t, func() bool { + emitter.mu.Lock() + c := emitter.messagesCalls + emitter.mu.Unlock() + return c >= 3 // 2 from chunks + 1 from completion + }, 100*time.Millisecond, 5*time.Millisecond, "should receive completion emit") +} + +func Test_InitialPrompt_SentOnStart(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize with the initial prompt send + started, done := mock.BlockWrite() + + initialPrompt := []screentracker.MessagePart{ + screentracker.MessagePartText{Content: "initial prompt"}, + } + + conv := acpio.NewACPConversation(context.Background(), mock, nil, initialPrompt, nil, mClock) + conv.Start(context.Background()) + + // Wait for write to start (initial prompt is being sent) + <-started + + // Should have user message from initial prompt + messages := conv.Messages() + require.GreaterOrEqual(t, len(messages), 1) + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "initial prompt", messages[0].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Messages_AreCopied(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Get messages and modify + messages := conv.Messages() + require.Len(t, messages, 2) + messages[0].Message = "modified" + + // Original should be unchanged + originalMessages := conv.Messages() + assert.Equal(t, "test", originalMessages[0].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_ErrorRemovesPartialMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate partial content before error + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Should have user message + placeholder agent message + messages := conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) + + // Simulate the agent streaming partial content before the error + mock.SimulateChunks("partial ", "response ", "content") + + // Verify partial content is in the agent message + messages = conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, "partial response content", messages[1].Message) + + // Now configure the mock to return an error and unblock + mock.mu.Lock() + mock.writeErr = assert.AnError + mock.mu.Unlock() + close(done) + + // Wait for the conversation to stabilize after the error + require.Eventually(t, func() bool { + return conv.Status() == screentracker.ConversationStatusStable + }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") + + // The partial agent message should be removed on error. + // Only the user message should remain. + messages = conv.Messages() + require.Len(t, messages, 1, "partial agent message should be removed on error") + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "test", messages[0].Message) +} diff --git a/x/acpio/acpio.go b/x/acpio/acpio.go new file mode 100644 index 0000000..77db963 --- /dev/null +++ b/x/acpio/acpio.go @@ -0,0 +1,239 @@ +package acpio + +import ( + "context" + "fmt" + "io" + "log/slog" + "strings" + "sync" + "time" + + acp "github.com/coder/acp-go-sdk" + st "github.com/coder/agentapi/lib/screentracker" +) + +// Compile-time assertion that ACPAgentIO implements st.AgentIO +var _ st.AgentIO = (*ACPAgentIO)(nil) + +// DefaultPromptTimeout is the maximum time to wait for an agent response. +const DefaultPromptTimeout = 5 * time.Minute + +// ACPAgentIO implements screentracker.AgentIO using the ACP protocol +type ACPAgentIO struct { + ctx context.Context + conn *acp.ClientSideConnection + sessionID acp.SessionId + mu sync.RWMutex + response strings.Builder + logger *slog.Logger + onChunk func(chunk string) // called on each streaming chunk +} + +// acpClient implements acp.Client to handle callbacks from the agent +type acpClient struct { + agentIO *ACPAgentIO +} + +var _ acp.Client = (*acpClient)(nil) + +func (c *acpClient) SessionUpdate(ctx context.Context, params acp.SessionNotification) error { + c.agentIO.logger.Debug("SessionUpdate received", + "sessionId", params.SessionId, + "hasAgentMessageChunk", params.Update.AgentMessageChunk != nil) + + if params.Update.AgentMessageChunk != nil { + if text := params.Update.AgentMessageChunk.Content.Text; text != nil { + c.agentIO.logger.Debug("AgentMessageChunk text", + "text", text.Text, + "textLen", len(text.Text)) + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(text.Text) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(text.Text) + } + } + } + + // Handle tool calls - format as text and append to response + if params.Update.ToolCall != nil { + tc := params.Update.ToolCall + formatted := fmt.Sprintf("\n[Tool: %s] %s\n", tc.Kind, tc.Title) + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(formatted) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(formatted) + } + } + + if params.Update.ToolCallUpdate != nil { + tcu := params.Update.ToolCallUpdate + var formatted string + if tcu.Status != nil { + formatted = fmt.Sprintf("[Tool Status: %s]\n", *tcu.Status) + } + if formatted != "" { + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(formatted) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(formatted) + } + } + } + + return nil +} + +func (c *acpClient) RequestPermission(ctx context.Context, params acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { + // Auto-approve all permissions for Phase 1 + return acp.RequestPermissionResponse{ + Outcome: acp.RequestPermissionOutcome{ + Selected: &acp.RequestPermissionOutcomeSelected{OptionId: "allow"}, + }, + }, nil +} + +func (c *acpClient) ReadTextFile(ctx context.Context, params acp.ReadTextFileRequest) (acp.ReadTextFileResponse, error) { + return acp.ReadTextFileResponse{}, nil +} + +func (c *acpClient) WriteTextFile(ctx context.Context, params acp.WriteTextFileRequest) (acp.WriteTextFileResponse, error) { + return acp.WriteTextFileResponse{}, nil +} + +func (c *acpClient) CreateTerminal(ctx context.Context, params acp.CreateTerminalRequest) (acp.CreateTerminalResponse, error) { + return acp.CreateTerminalResponse{}, nil +} + +func (c *acpClient) KillTerminalCommand(ctx context.Context, params acp.KillTerminalCommandRequest) (acp.KillTerminalCommandResponse, error) { + return acp.KillTerminalCommandResponse{}, nil +} + +func (c *acpClient) TerminalOutput(ctx context.Context, params acp.TerminalOutputRequest) (acp.TerminalOutputResponse, error) { + return acp.TerminalOutputResponse{}, nil +} + +func (c *acpClient) ReleaseTerminal(ctx context.Context, params acp.ReleaseTerminalRequest) (acp.ReleaseTerminalResponse, error) { + return acp.ReleaseTerminalResponse{}, nil +} + +func (c *acpClient) WaitForTerminalExit(ctx context.Context, params acp.WaitForTerminalExitRequest) (acp.WaitForTerminalExitResponse, error) { + return acp.WaitForTerminalExitResponse{}, nil +} + +// SetOnChunk sets a callback that will be called for each streaming chunk. +func (a *ACPAgentIO) SetOnChunk(fn func(chunk string)) { + a.mu.Lock() + defer a.mu.Unlock() + a.onChunk = fn +} + +// NewWithPipes creates an ACPAgentIO connected via the provided pipes +func NewWithPipes(ctx context.Context, toAgent io.Writer, fromAgent io.Reader, logger *slog.Logger, getwd func() (string, error)) (*ACPAgentIO, error) { + if logger == nil { + logger = slog.Default() + } + agentIO := &ACPAgentIO{ctx: ctx, logger: logger} + client := &acpClient{agentIO: agentIO} + + conn := acp.NewClientSideConnection(client, toAgent, fromAgent) + agentIO.conn = conn + + logger.Debug("Initializing ACP connection") + + // Initialize the connection + initResp, err := conn.Initialize(ctx, acp.InitializeRequest{ + ProtocolVersion: acp.ProtocolVersionNumber, + ClientCapabilities: acp.ClientCapabilities{}, + }) + if err != nil { + logger.Error("Failed to initialize ACP connection", "error", err) + return nil, err + } + logger.Debug("ACP initialized", "protocolVersion", initResp.ProtocolVersion) + + // Create a session + cwd, err := getwd() + if err != nil { + logger.Error("Failed to get working directory", "error", err) + return nil, err + } + sessResp, err := conn.NewSession(ctx, acp.NewSessionRequest{ + Cwd: cwd, + McpServers: []acp.McpServer{}, + }) + if err != nil { + logger.Error("Failed to create ACP session", "error", err) + return nil, err + } + agentIO.sessionID = sessResp.SessionId + logger.Debug("ACP session created", "sessionId", sessResp.SessionId) + + return agentIO, nil +} + +// Write sends a message to the agent via ACP prompt +func (a *ACPAgentIO) Write(data []byte) (int, error) { + text := string(data) + + // Strip bracketed paste escape sequences if present + text = strings.TrimPrefix(text, "\x1b[200~") + text = strings.TrimSuffix(text, "\x1b[201~") + + // Strip terminal hack sequences (x\b pattern used for Claude Code compatibility) + text = strings.TrimPrefix(text, "x\b") + + text = strings.TrimSpace(text) + + // Don't send empty prompts + if text == "" { + a.logger.Debug("Ignoring empty prompt", "rawDataLen", len(data)) + return len(data), nil + } + + // Clear previous response + a.mu.Lock() + a.response.Reset() + a.mu.Unlock() + + a.logger.Debug("Sending prompt", + "sessionId", a.sessionID, + "text", text, + "textLen", len(text), + "rawDataLen", len(data)) + + // Ensure the context has not been cancelled before writing prompt + if err := a.ctx.Err(); err != nil { + a.logger.Debug("Aborting write", "error", err) + return 0, err + } + // Use a timeout to prevent hanging indefinitely + promptCtx, cancel := context.WithTimeout(a.ctx, DefaultPromptTimeout) + defer cancel() + + resp, err := a.conn.Prompt(promptCtx, acp.PromptRequest{ + SessionId: a.sessionID, + Prompt: []acp.ContentBlock{acp.TextBlock(text)}, + }) + if err != nil { + a.logger.Error("Prompt failed", "error", err) + return 0, err + } + + a.logger.Debug("Prompt completed", "stopReason", resp.StopReason) + + return len(data), nil +} + +// ReadScreen returns the accumulated agent response +func (a *ACPAgentIO) ReadScreen() string { + a.mu.RLock() + defer a.mu.RUnlock() + return a.response.String() +}