From 11b276fdd63a1a60e3978724ecf810ef536bb80d Mon Sep 17 00:00:00 2001 From: Yun Long Date: Tue, 9 Jun 2026 17:09:55 +0800 Subject: [PATCH] Fix(threads): Attach related tool calls to root message --- AGENTS.md | 1 + cli/serve/serve.go | 29 +++- cli/serve/serve_test.go | 58 +++++++ internal/api/handler.go | 4 + internal/api/handler_test.go | 158 ++++++++++++++++++ internal/api/participant_bridge.go | 134 ++++++++++++++- internal/channel/codexbridge/bridge.go | 74 ++++++-- internal/channel/codexbridge/bridge_test.go | 72 ++++++-- internal/im/service.go | 1 + internal/im/service_thread_test.go | 46 +++++ .../MessageContent/MessageContent.css | 48 ++++++ .../MessageContent/MessageContent.tsx | 20 ++- .../tests/components/MessageContent.test.tsx | 9 + 13 files changed, 616 insertions(+), 38 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 6c51964f..7ffe199a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -63,6 +63,7 @@ make release - Use targeted tests first for local changes. - Run `go test ./...` for shared or cross-package changes. - Run `make` when touching build, CGO, linker flags, or packaging. +- When testing or debugging `csgclaw serve` and a browser page is unnecessary, launch with `--no-browser` to avoid stealing focus or opening extra tabs. - If you skip verification, say so clearly. ## References diff --git a/cli/serve/serve.go b/cli/serve/serve.go index e26d7468..3d47e6b4 100644 --- a/cli/serve/serve.go +++ b/cli/serve/serve.go @@ -109,6 +109,7 @@ func (c serveCmd) Run(ctx context.Context, run *command.Context, args []string, fs := run.NewFlagSet("serve", run.Program+" serve [-d|--daemon] [flags]", c.Summary()) daemon := fs.Bool("daemon", false, "run server in background") fs.BoolVar(daemon, "d", false, "run server in background") + noBrowser := fs.Bool("no-browser", false, "do not open the browser after startup") logLevel := fs.String("log-level", "info", "log level: debug, info, warn, error") defaultLogPath, err := defaultServerLogPath() @@ -144,9 +145,9 @@ func (c serveCmd) Run(ctx context.Context, run *command.Context, args []string, } if *daemon { - return serveBackground(run, cfg, globals, *logPath, *pidPath, *logLevel) + return serveBackground(run, cfg, globals, *logPath, *pidPath, *logLevel, *noBrowser) } - return serveForegroundWithConfigPath(ctx, run, cfg, globals.Config, globals.Output) + return serveForegroundWithConfigPath(ctx, run, cfg, globals.Config, globals.Output, serveOptions{NoBrowser: *noBrowser}) } func (stopCmd) Name() string { @@ -217,6 +218,7 @@ func (c internalServeCmd) Run(ctx context.Context, run *command.Context, args [] pidPath := fs.String("pid", "", "pid file path") configPathFlag := fs.String("config", globals.Config, "config file path") logLevel := fs.String("log-level", "info", "log level: debug, info, warn, error") + noBrowser := fs.Bool("no-browser", false, "do not open the browser after startup") if err := fs.Parse(args); err != nil { return err } @@ -259,14 +261,18 @@ func (c internalServeCmd) Run(ctx context.Context, run *command.Context, args [] if err != nil { return err } - return startServerWithConfigPath(ctx, run, cfg, svc, imSvc, imBus, feishuSvc, *configPathFlag, globals.Output) + return startServerWithConfigPath(ctx, run, cfg, svc, imSvc, imBus, feishuSvc, *configPathFlag, globals.Output, serveOptions{NoBrowser: *noBrowser}) } func serveForeground(ctx context.Context, run *command.Context, cfg config.Config, output string) error { return serveForegroundWithConfigPath(ctx, run, cfg, "", output) } -func serveForegroundWithConfigPath(ctx context.Context, run *command.Context, cfg config.Config, configPath string, output string) error { +type serveOptions struct { + NoBrowser bool +} + +func serveForegroundWithConfigPath(ctx context.Context, run *command.Context, cfg config.Config, configPath string, output string, opts ...serveOptions) error { _ = preflightDefaultModelProvider(ctx, cfg) imBus := im.NewBus() feishuProvider, feishuSvc, err := buildFeishuComponents(configPath) @@ -300,10 +306,10 @@ func serveForegroundWithConfigPath(ctx context.Context, run *command.Context, cf fmt.Fprintf(run.Stdout, "CSGClaw IM is available at: %s\n", imURL) } - return startServerWithConfigPath(ctx, run, cfg, svc, imSvc, imBus, feishuSvc, configPath, output) + return startServerWithConfigPath(ctx, run, cfg, svc, imSvc, imBus, feishuSvc, configPath, output, opts...) } -func serveBackground(run *command.Context, cfg config.Config, globals command.GlobalOptions, logPath, pidPath, logLevel string) error { +func serveBackground(run *command.Context, cfg config.Config, globals command.GlobalOptions, logPath, pidPath, logLevel string, noBrowser bool) error { exe, err := os.Executable() if err != nil { return fmt.Errorf("resolve executable: %w", err) @@ -324,6 +330,9 @@ func serveBackground(run *command.Context, cfg config.Config, globals command.Gl if strings.TrimSpace(logLevel) != "" { childArgs = append(childArgs, "--log-level", logLevel) } + if noBrowser { + childArgs = append(childArgs, "--no-browser") + } cmd := exec.Command(exe, childArgs...) cmd.Stdout = logFile cmd.Stderr = logFile @@ -411,7 +420,11 @@ func startServer(ctx context.Context, run *command.Context, cfg config.Config, s return startServerWithConfigPath(ctx, run, cfg, svc, imSvc, imBus, feishuSvc, "", output) } -func startServerWithConfigPath(ctx context.Context, run *command.Context, cfg config.Config, svc *agent.Service, imSvc *im.Service, imBus *im.Bus, feishuSvc *feishu.Service, configPath, output string) error { +func startServerWithConfigPath(ctx context.Context, run *command.Context, cfg config.Config, svc *agent.Service, imSvc *im.Service, imBus *im.Bus, feishuSvc *feishu.Service, configPath, output string, opts ...serveOptions) error { + serveOpts := serveOptions{} + if len(opts) > 0 { + serveOpts = opts[0] + } _ = EnsureCLIProxy(ctx) defer func() { shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -493,7 +506,7 @@ func startServerWithConfigPath(ctx context.Context, run *command.Context, cfg co OnReady: func(handler *api.Handler, router chi.Router) { deliver := channelwiring.WireNotificationParticipantPull(ctx, participantSvc, imSvc, apiURL, cfg.Server.AccessToken) handler.SetNotificationDeliver(deliver) - if output != "json" && run != nil { + if !serveOpts.NoBrowser && output != "json" && run != nil { go func() { if err := WaitForHealthy(apiURL, 5*time.Second); err != nil { fmt.Fprintln(run.Stdout, "Open this URL in your browser after startup.") diff --git a/cli/serve/serve_test.go b/cli/serve/serve_test.go index 0ac1a773..ac79d722 100644 --- a/cli/serve/serve_test.go +++ b/cli/serve/serve_test.go @@ -305,6 +305,64 @@ func TestServeForegroundOpensIMURLWhenBrowserAllowed(t *testing.T) { } } +func TestServeRunNoBrowserFlagSuppressesBrowserOpen(t *testing.T) { + restore := stubServeDependencies(t) + defer restore() + + origRunServer := RunServer + t.Cleanup(func() { + RunServer = origRunServer + }) + RunServer = func(opts server.Options) error { + if opts.OnReady != nil { + opts.OnReady(nil, nil) + } + return nil + } + + configPath := filepath.Join(t.TempDir(), "config.toml") + cfg := config.Config{ + Server: config.ServerConfig{ + ListenAddr: "127.0.0.1:18080", + AdvertiseBaseURL: "http://example.test/base", + AccessToken: "pc-secret", + }, + Sandbox: config.SandboxConfig{Provider: config.DefaultSandboxProvider}, + } + if err := cfg.Save(configPath); err != nil { + t.Fatalf("Save() error = %v", err) + } + + openedCh := make(chan string, 1) + waitedCh := make(chan string, 1) + WaitForHealthy = func(rawURL string, _ time.Duration) error { + waitedCh <- rawURL + return nil + } + OpenBrowser = func(rawURL string) error { + openedCh <- rawURL + return nil + } + + run := testContext() + if err := NewServeCmd().Run(context.Background(), run, []string{"--no-browser"}, command.GlobalOptions{Config: configPath}); err != nil { + t.Fatalf("Run() error = %v", err) + } + select { + case rawURL := <-waitedCh: + t.Fatalf("WaitForHealthy called for %q, want no browser startup path", rawURL) + default: + } + select { + case rawURL := <-openedCh: + t.Fatalf("OpenBrowser called for %q, want suppressed", rawURL) + default: + } + if got := run.Stdout.(*bytes.Buffer).String(); !strings.Contains(got, "CSGClaw IM is available at: http://example.test/base/") { + t.Fatalf("stdout missing IM URL:\n%s", got) + } +} + func TestServeForegroundPrintsManualIMURLWhenBrowserNotAllowed(t *testing.T) { restore := stubServeDependencies(t) defer restore() diff --git a/internal/api/handler.go b/internal/api/handler.go index 87645311..5f452c20 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -11,6 +11,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "csgclaw/internal/agent" @@ -54,6 +55,9 @@ type Handler struct { localRuntimeImages func(context.Context, config.Config) ([]string, error) notificationDeliver notification.Fanouter activityDecider ActivityDecider + + participantActivityTurnsMu sync.Mutex + participantActivityTurns map[string]participantActivityTurn } const createOperationTimeout = 10 * time.Minute diff --git a/internal/api/handler_test.go b/internal/api/handler_test.go index c0afc520..90c5f251 100644 --- a/internal/api/handler_test.go +++ b/internal/api/handler_test.go @@ -3631,6 +3631,164 @@ func TestHandleBotSendMessageAcceptsPicoClawThreadContext(t *testing.T) { } } +func TestHandleParticipantSendMessageReplacementRefreshesThreadRootSummary(t *testing.T) { + now := time.Now().UTC() + imSvc := im.NewServiceFromBootstrap(im.Bootstrap{ + CurrentUserID: "u-admin", + Users: []im.User{ + {ID: "u-admin", Name: "admin", Handle: "admin"}, + {ID: "manager", Name: "manager", Handle: "manager"}, + }, + Rooms: []im.Room{ + { + ID: "room-1", + IsDirect: true, + Members: []string{"u-admin", "manager"}, + Messages: []im.Message{{ID: "msg-user", SenderID: "u-admin", Content: "run it", CreatedAt: now}}, + }, + }, + }) + srv := &Handler{im: imSvc, participantBridge: im.NewParticipantBridge(""), serverNoAuth: true} + send := func(t *testing.T, body string) string { + t.Helper() + req := httptest.NewRequest(http.MethodPost, "/api/v1/channels/csgclaw/participants/manager/messages", strings.NewReader(body)) + rec := httptest.NewRecorder() + srv.Routes().ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body=%s", rec.Code, http.StatusOK, rec.Body.String()) + } + var sent struct { + MessageID string `json:"message_id"` + } + if err := json.NewDecoder(rec.Body).Decode(&sent); err != nil { + t.Fatalf("decode send response: %v", err) + } + return sent.MessageID + } + + rootID := send(t, `{"room_id":"room-1","message_id":"assistant-turn-1","text":"\u200b"}`) + send(t, `{"room_id":"room-1","message_id":"assistant-turn-1-tool-1","thread_root_id":"assistant-turn-1","text":"tool activity"}`) + finalID := send(t, `{"room_id":"room-1","message_id":"assistant-turn-1","text":"final answer"}`) + if rootID != "assistant-turn-1" || finalID != rootID { + t.Fatalf("root/final message ids = %q / %q, want assistant-turn-1", rootID, finalID) + } + + timeline, err := imSvc.ListMessages("room-1") + if err != nil { + t.Fatalf("ListMessages() error = %v", err) + } + var root im.Message + for _, message := range timeline { + if message.ID == "assistant-turn-1-tool-1" { + t.Fatalf("timeline = %+v, want tool reply hidden from top-level messages", timeline) + } + if message.ID == rootID { + root = message + } + } + if root.Content != "final answer" { + t.Fatalf("root.Content = %q, want final answer", root.Content) + } + if root.Thread == nil || root.Thread.Context.RootExcerpt != "final answer" || root.Thread.ReplyCount != 1 { + t.Fatalf("root.Thread = %+v, want refreshed summary with one reply", root.Thread) + } +} + +func TestHandleParticipantSendMessageThreadsTopLevelToolCallsUnderFinalResponse(t *testing.T) { + for _, tc := range []struct { + name string + isDirect bool + }{ + {name: "dm", isDirect: true}, + {name: "room"}, + } { + t.Run(tc.name, func(t *testing.T) { + now := time.Now().UTC() + imSvc := im.NewServiceFromBootstrap(im.Bootstrap{ + CurrentUserID: "u-admin", + Users: []im.User{ + {ID: "u-admin", Name: "admin", Handle: "admin"}, + {ID: "manager", Name: "manager", Handle: "manager"}, + }, + Rooms: []im.Room{ + { + ID: "room-1", + IsDirect: tc.isDirect, + Members: []string{"u-admin", "manager"}, + Messages: []im.Message{{ID: "msg-user", SenderID: "u-admin", Content: "use some tools", CreatedAt: now}}, + }, + }, + }) + srv := &Handler{im: imSvc, participantBridge: im.NewParticipantBridge(""), serverNoAuth: true} + send := func(t *testing.T, body string) string { + t.Helper() + req := httptest.NewRequest(http.MethodPost, "/api/v1/channels/csgclaw/participants/manager/messages", strings.NewReader(body)) + rec := httptest.NewRecorder() + srv.Routes().ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body=%s", rec.Code, http.StatusOK, rec.Body.String()) + } + var sent struct { + MessageID string `json:"message_id"` + } + if err := json.NewDecoder(rec.Body).Decode(&sent); err != nil { + t.Fatalf("decode send response: %v", err) + } + return sent.MessageID + } + + firstToolID := send(t, `{"room_id":"room-1","text":"🔧 `+"`list_dir`"+`\n`+"```"+`\n{\"path\":\"/workspace\"}\n`+"```"+`"}`) + secondToolID := send(t, `{"room_id":"room-1","text":"🔧 `+"`exec`"+`\n`+"```"+`\n{\"command\":\"pwd\"}\n`+"```"+`"}`) + finalID := send(t, `{"room_id":"room-1","text":"Used two tools."}`) + if finalID == "" || finalID == firstToolID || finalID == secondToolID { + t.Fatalf("message ids first=%q second=%q final=%q, want final root distinct from tool replies", firstToolID, secondToolID, finalID) + } + + timeline, err := imSvc.ListMessages("room-1") + if err != nil { + t.Fatalf("ListMessages() error = %v", err) + } + var root im.Message + for _, message := range timeline { + if message.ID == firstToolID || message.ID == secondToolID { + t.Fatalf("timeline = %+v, want tool replies hidden from top-level messages", timeline) + } + if message.ID == finalID { + root = message + } + } + if root.ID == "" { + t.Fatalf("timeline = %+v, want final root %q", timeline, finalID) + } + if root.Content != "Used two tools." { + t.Fatalf("root.Content = %q, want final response", root.Content) + } + if root.Thread == nil || root.Thread.ReplyCount != 2 || root.Thread.Context.RootExcerpt != "Used two tools." { + t.Fatalf("root.Thread = %+v, want refreshed summary with two replies", root.Thread) + } + + thread, err := imSvc.GetThread("room-1", root.ID) + if err != nil { + t.Fatalf("GetThread() error = %v", err) + } + if len(thread.Replies) != 2 { + t.Fatalf("thread replies = %+v, want two tool replies", thread.Replies) + } + if thread.Replies[0].ID != firstToolID || thread.Replies[1].ID != secondToolID { + t.Fatalf("reply ids = %q / %q, want %q / %q", thread.Replies[0].ID, thread.Replies[1].ID, firstToolID, secondToolID) + } + for _, reply := range thread.Replies { + if reply.RelatesTo == nil || reply.RelatesTo.RelType != im.RelationTypeThread || reply.RelatesTo.EventID != root.ID { + t.Fatalf("reply.RelatesTo = %+v, want m.thread -> %s", reply.RelatesTo, root.ID) + } + if !strings.HasPrefix(strings.TrimSpace(reply.Content), "🔧 ") { + t.Fatalf("reply.Content = %q, want legacy tool call", reply.Content) + } + } + }) + } +} + func TestPublishParticipantEventQueuesUntilParticipantSubscribes(t *testing.T) { now := time.Now().UTC() imSvc := im.NewServiceFromBootstrap(im.Bootstrap{ diff --git a/internal/api/participant_bridge.go b/internal/api/participant_bridge.go index ea4b26e6..38885037 100644 --- a/internal/api/participant_bridge.go +++ b/internal/api/participant_bridge.go @@ -21,8 +21,19 @@ import ( const ( participantReplayWindow = 30 * time.Minute participantHeartbeatInterval = 15 * time.Second + + participantActivityTurnTTL = 10 * time.Minute + participantActivityTurnPlaceholderText = "\u200b" + participantAgentActivityType = "com.opencsg.csgclaw.agent.activity" + participantAgentToolMsgType = "com.opencsg.csgclaw.agent.tool" + participantAgentActionMsgType = "com.opencsg.csgclaw.agent.action" ) +type participantActivityTurn struct { + rootMessageID string + updatedAt time.Time +} + func (h *Handler) PublishParticipantEvent(evt im.Event) { if h.participantBridge == nil || h.im == nil { return @@ -537,19 +548,140 @@ func (h *Handler) handleParticipantSendMessage(w http.ResponseWriter, r *http.Re roomID := req.ResolvedRoomID() text := req.ResolvedText() threadRootID := req.ResolvedThreadRootID() + messageID := strings.TrimSpace(req.MessageID) + clearActivityTurn := false + + if strings.TrimSpace(threadRootID) == "" { + if isParticipantActivityThreadMessage(text) { + rootID, err := h.ensureParticipantActivityTurnRoot(roomID, participantID) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + threadRootID = rootID + } else if rootID, ok := h.participantActivityTurnRoot(roomID, participantID); ok { + messageID = rootID + clearActivityTurn = true + } + } message, err := h.im.DeliverMessage(im.DeliverMessageRequest{ RoomID: roomID, SenderID: participantID, Content: text, - MessageID: req.MessageID, + MessageID: messageID, ThreadRootID: threadRootID, }) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + if clearActivityTurn { + h.clearParticipantActivityTurnRoot(roomID, participantID) + } h.publishMessageCreated(roomID, participantID, message) h.publishThreadUpdated(roomID, message) writeJSON(w, http.StatusOK, map[string]string{"message_id": message.ID}) } + +func (h *Handler) ensureParticipantActivityTurnRoot(roomID, participantID string) (string, error) { + if rootID, ok := h.participantActivityTurnRoot(roomID, participantID); ok { + h.setParticipantActivityTurnRoot(roomID, participantID, rootID) + return rootID, nil + } + message, err := h.im.DeliverMessage(im.DeliverMessageRequest{ + RoomID: roomID, + SenderID: participantID, + Content: participantActivityTurnPlaceholderText, + }) + if err != nil { + return "", err + } + rootID := strings.TrimSpace(message.ID) + if rootID == "" { + return "", fmt.Errorf("create participant activity turn root: empty message id") + } + h.setParticipantActivityTurnRoot(roomID, participantID, rootID) + h.publishMessageCreated(roomID, participantID, message) + return rootID, nil +} + +func (h *Handler) participantActivityTurnRoot(roomID, participantID string) (string, bool) { + key := participantActivityTurnKey(roomID, participantID) + if key == "" { + return "", false + } + + now := time.Now() + h.participantActivityTurnsMu.Lock() + defer h.participantActivityTurnsMu.Unlock() + + turn, ok := h.participantActivityTurns[key] + if !ok || strings.TrimSpace(turn.rootMessageID) == "" { + return "", false + } + if now.Sub(turn.updatedAt) > participantActivityTurnTTL { + delete(h.participantActivityTurns, key) + return "", false + } + return turn.rootMessageID, true +} + +func (h *Handler) setParticipantActivityTurnRoot(roomID, participantID, rootMessageID string) { + key := participantActivityTurnKey(roomID, participantID) + rootMessageID = strings.TrimSpace(rootMessageID) + if key == "" || rootMessageID == "" { + return + } + + h.participantActivityTurnsMu.Lock() + defer h.participantActivityTurnsMu.Unlock() + if h.participantActivityTurns == nil { + h.participantActivityTurns = make(map[string]participantActivityTurn) + } + h.participantActivityTurns[key] = participantActivityTurn{ + rootMessageID: rootMessageID, + updatedAt: time.Now(), + } +} + +func (h *Handler) clearParticipantActivityTurnRoot(roomID, participantID string) { + key := participantActivityTurnKey(roomID, participantID) + if key == "" { + return + } + + h.participantActivityTurnsMu.Lock() + defer h.participantActivityTurnsMu.Unlock() + delete(h.participantActivityTurns, key) +} + +func participantActivityTurnKey(roomID, participantID string) string { + roomID = strings.TrimSpace(roomID) + participantID = strings.TrimSpace(participantID) + if roomID == "" || participantID == "" { + return "" + } + return roomID + "\x00" + participantID +} + +func isParticipantActivityThreadMessage(content string) bool { + trimmed := strings.TrimSpace(content) + if strings.HasPrefix(trimmed, "🔧 ") { + return true + } + if !strings.HasPrefix(trimmed, "{") { + return false + } + var payload struct { + Type string `json:"type"` + Content struct { + MsgType string `json:"msgtype"` + } `json:"content"` + } + if err := json.Unmarshal([]byte(trimmed), &payload); err != nil { + return false + } + return payload.Type == participantAgentActivityType && + (payload.Content.MsgType == participantAgentToolMsgType || payload.Content.MsgType == participantAgentActionMsgType) +} diff --git a/internal/channel/codexbridge/bridge.go b/internal/channel/codexbridge/bridge.go index be8cc638..bfccf129 100644 --- a/internal/channel/codexbridge/bridge.go +++ b/internal/channel/codexbridge/bridge.go @@ -20,6 +20,8 @@ const ( defaultSeenWindow = 256 defaultPromptSettle = 150 * time.Millisecond localChannel = csgclawchannel.ChannelID + turnPlaceholderText = "\u200b" + turnCompleteText = "Done." ) type Binding struct { @@ -255,6 +257,32 @@ func (w *worker) handleEvent(ctx context.Context, evt BotEvent, runtimeEvents <- Meta: cloneMeta(w.binding.PromptMeta), } renderer := runtimebridge.NewTurnRenderer() + turnRootID := strings.TrimSpace(evt.ThreadRootID) + var generatedRootID string + + ensureActivityThreadRoot := func() (string, error) { + if turnRootID != "" { + return turnRootID, nil + } + if generatedRootID != "" { + return generatedRootID, nil + } + messageID, err := w.sendMessage(ctx, evt.RoomID, "", turnPlaceholderText) + if err != nil { + return "", err + } + generatedRootID = strings.TrimSpace(messageID) + if generatedRootID == "" { + return "", fmt.Errorf("create turn root message: empty message id") + } + return generatedRootID, nil + } + flushTurn := func() (string, error) { + if generatedRootID != "" { + return w.flushTurnWithFirstMessageID(ctx, evt.RoomID, "", generatedRootID, renderer) + } + return w.flushTurn(ctx, evt.RoomID, turnRootID, renderer) + } type promptResult struct { err error @@ -287,26 +315,30 @@ func (w *worker) handleEvent(ctx context.Context, evt BotEvent, runtimeEvents <- continue } if renderedActivity, ok := renderer.RenderActivity(event, localChannel, evt.RoomID, w.binding.BotID); ok { - if err := w.sendActivity(ctx, evt.RoomID, evt.ThreadRootID, renderedActivity); err != nil { + threadRootID, err := ensureActivityThreadRoot() + if err != nil { + return err + } + if err := w.sendActivity(ctx, evt.RoomID, threadRootID, renderedActivity); err != nil { return err } } renderer.ApplyText(event) if isTerminalEvent(event.Kind) && promptReturned { - _, err := w.flushTurn(ctx, evt.RoomID, evt.ThreadRootID, renderer) + _, err := flushTurn() return err } case result := <-promptDone: promptReturned = true if result.err != nil { renderer.SetPromptError(result.err.Error()) - _, err := w.flushTurn(ctx, evt.RoomID, evt.ThreadRootID, renderer) + _, err := flushTurn() return err } settleTimer.Reset(w.service.promptSettle) case <-settleTimer.C: if promptReturned { - _, err := w.flushTurn(ctx, evt.RoomID, evt.ThreadRootID, renderer) + _, err := flushTurn() return err } } @@ -355,17 +387,39 @@ func (w *worker) clearContextCache(conversationKey string) { } func (w *worker) flushTurn(ctx context.Context, roomID, threadRootID string, renderer *runtimebridge.TurnRenderer) (string, error) { - var firstMessageID string - for _, text := range renderer.FinalMessages() { - messageID, err := w.sendMessage(ctx, roomID, threadRootID, text) + return w.flushTurnWithFirstMessageID(ctx, roomID, threadRootID, "", renderer) +} + +func (w *worker) flushTurnWithFirstMessageID(ctx context.Context, roomID, threadRootID, firstMessageID string, renderer *runtimebridge.TurnRenderer) (string, error) { + var firstSentMessageID string + replaceMessageID := strings.TrimSpace(firstMessageID) + messages := renderer.FinalMessages() + if len(messages) == 0 && replaceMessageID != "" { + messages = []string{turnCompleteText} + } + for idx, text := range messages { + req := SendMessageRequest{ + RoomID: roomID, + Text: text, + ThreadRootID: strings.TrimSpace(threadRootID), + } + if idx == 0 && replaceMessageID != "" { + req.MessageID = replaceMessageID + req.ThreadRootID = "" + } + messageID, err := w.sendMessageRequest(ctx, req) if err != nil { return "", err } - if firstMessageID == "" { - firstMessageID = messageID + if firstSentMessageID == "" { + if req.MessageID != "" { + firstSentMessageID = req.MessageID + } else { + firstSentMessageID = messageID + } } } - return firstMessageID, nil + return firstSentMessageID, nil } func (w *worker) sendMessage(ctx context.Context, roomID, threadRootID, text string) (string, error) { diff --git a/internal/channel/codexbridge/bridge_test.go b/internal/channel/codexbridge/bridge_test.go index c581498b..6aabc984 100644 --- a/internal/channel/codexbridge/bridge_test.go +++ b/internal/channel/codexbridge/bridge_test.go @@ -342,13 +342,22 @@ func TestServiceUsesConversationScopedSessionsAndThreadReplies(t *testing.T) { }) } -func TestServiceProjectsToolCallsAsActivityCardsAlongsideResponse(t *testing.T) { - t.Parallel() +func TestServiceThreadsTopLevelToolCallsUnderFinalResponse(t *testing.T) { + for _, chatType := range []string{"direct", "group"} { + chatType := chatType + t.Run(chatType, func(t *testing.T) { + t.Parallel() + assertServiceThreadsTopLevelToolCallsUnderFinalResponse(t, chatType) + }) + } +} +func assertServiceThreadsTopLevelToolCallsUnderFinalResponse(t *testing.T, chatType string) { + t.Helper() stream := make(chan BotEvent, 1) errs := make(chan error) close(errs) - stream <- BotEvent{MessageID: "m-1", RoomID: "room-1", Text: "run it"} + stream <- BotEvent{MessageID: "m-1", RoomID: "room-1", ChatType: chatType, Text: "run it"} sink := runtimecodex.NewEventSink() client := &fakeBotClient{ @@ -402,17 +411,21 @@ func TestServiceProjectsToolCallsAsActivityCardsAlongsideResponse(t *testing.T) waitFor(t, func() bool { records := client.sentRecords() - return len(records) == 3 && + return len(records) == 4 && records[0].RoomID == "room-1" && records[0].ThreadRootID == "" && - strings.Contains(records[0].Text, runtimebridge.AgentToolMsgType) && + records[0].Text == turnPlaceholderText && records[1].RoomID == "room-1" && - records[1].ThreadRootID == "" && + records[1].ThreadRootID == "sent-1" && strings.Contains(records[1].Text, runtimebridge.AgentToolMsgType) && - strings.Contains(records[1].Text, "command output") && records[2].RoomID == "room-1" && - records[2].ThreadRootID == "" && - records[2].Text == "done" + records[2].ThreadRootID == "sent-1" && + strings.Contains(records[2].Text, runtimebridge.AgentToolMsgType) && + strings.Contains(records[2].Text, "command output") && + records[3].RoomID == "room-1" && + records[3].MessageID == "sent-1" && + records[3].ThreadRootID == "" && + records[3].Text == "done" }) } @@ -818,9 +831,19 @@ func TestServiceProjectsToolEventsAsAgentActivity(t *testing.T) { defer svc.Close() waitFor(t, func() bool { - return len(client.sentTexts()) == 1 + return len(client.sentRecords()) == 3 }) - text := client.sentTexts()[0] + records := client.sentRecords() + if records[0].Text != turnPlaceholderText || records[0].ThreadRootID != "" { + t.Fatalf("placeholder record = %+v, want top-level blank root", records[0]) + } + if records[1].ThreadRootID != "sent-1" { + t.Fatalf("tool activity ThreadRootID = %q, want sent-1", records[1].ThreadRootID) + } + if records[2].Text != turnCompleteText || records[2].MessageID != "sent-1" || records[2].ThreadRootID != "" { + t.Fatalf("completion record = %+v, want root placeholder replacement", records[2]) + } + text := records[1].Text if strings.Contains(text, "Running tool:") { t.Fatalf("tool event rendered as plain text: %s", text) } @@ -908,8 +931,15 @@ func TestServiceProjectsPermissionEventsAsAgentActivity(t *testing.T) { defer svc.Close() waitFor(t, func() bool { - return len(client.sentTexts()) == 1 + return len(client.sentRecords()) == 2 }) + records := client.sentRecords() + if records[0].Text != turnPlaceholderText || records[0].ThreadRootID != "" { + t.Fatalf("placeholder record = %+v, want top-level blank root", records[0]) + } + if records[1].ThreadRootID != "sent-1" { + t.Fatalf("permission activity ThreadRootID = %q, want sent-1", records[1].ThreadRootID) + } var payload struct { Type string `json:"type"` Channel string `json:"channel"` @@ -925,7 +955,7 @@ func TestServiceProjectsPermissionEventsAsAgentActivity(t *testing.T) { } `json:"action"` } `json:"content"` } - if err := json.Unmarshal([]byte(client.sentTexts()[0]), &payload); err != nil { + if err := json.Unmarshal([]byte(records[1].Text), &payload); err != nil { t.Fatalf("permission activity json decode: %v", err) } if payload.Type != runtimebridge.AgentActivityType || payload.Content.MsgType != runtimebridge.AgentActionMsgType { @@ -1016,14 +1046,20 @@ func TestServiceUsesStableMessageIDForPermissionDecisionActivity(t *testing.T) { defer svc.Close() waitFor(t, func() bool { - return len(client.sentRecords()) == 2 + return len(client.sentRecords()) == 3 }) sent := client.sentRecords() - if sent[0].MessageID == "" || sent[0].MessageID != sent[1].MessageID { - t.Fatalf("permission message ids = %q / %q, want stable non-empty id", sent[0].MessageID, sent[1].MessageID) + if sent[0].Text != turnPlaceholderText || sent[0].ThreadRootID != "" { + t.Fatalf("placeholder record = %+v, want top-level blank root", sent[0]) + } + if sent[1].MessageID == "" || sent[1].MessageID != sent[2].MessageID { + t.Fatalf("permission message ids = %q / %q, want stable non-empty id", sent[1].MessageID, sent[2].MessageID) + } + if sent[1].ThreadRootID != "sent-1" || sent[2].ThreadRootID != "sent-1" { + t.Fatalf("permission thread roots = %q / %q, want sent-1", sent[1].ThreadRootID, sent[2].ThreadRootID) } - if !strings.Contains(sent[1].Text, `"status":"allowed"`) { - t.Fatalf("decision activity = %s, want allowed status", sent[1].Text) + if !strings.Contains(sent[2].Text, `"status":"allowed"`) { + t.Fatalf("decision activity = %s, want allowed status", sent[2].Text) } } diff --git a/internal/im/service.go b/internal/im/service.go index 00f16d6a..0d48e005 100644 --- a/internal/im/service.go +++ b/internal/im/service.go @@ -1472,6 +1472,7 @@ func (s *Service) DeliverMessage(req DeliverMessageRequest) (Message, error) { } message.CreatedAt = room.Messages[idx].CreatedAt room.Messages[idx] = message + s.rebuildThreadStatesLocked(room) if err := s.saveLocked(); err != nil { return Message{}, err } diff --git a/internal/im/service_thread_test.go b/internal/im/service_thread_test.go index 7225a279..6e7896b9 100644 --- a/internal/im/service_thread_test.go +++ b/internal/im/service_thread_test.go @@ -145,6 +145,52 @@ func TestCreateThreadReplyHidesFromMainTimelineAndUpdatesSummary(t *testing.T) { } } +func TestDeliverMessageReplacementRefreshesThreadRootSummary(t *testing.T) { + svc := NewServiceFromBootstrap(threadTestBootstrap()) + root, err := svc.DeliverMessage(DeliverMessageRequest{ + RoomID: "room-1", + SenderID: "manager", + MessageID: "assistant-turn-1", + Content: "\u200b", + }) + if err != nil { + t.Fatalf("DeliverMessage(root placeholder) error = %v", err) + } + if _, err := svc.DeliverMessage(DeliverMessageRequest{ + RoomID: "room-1", + SenderID: "manager", + MessageID: "assistant-turn-1-tool-1", + Content: "tool activity", + ThreadRootID: root.ID, + }); err != nil { + t.Fatalf("DeliverMessage(thread reply) error = %v", err) + } + + updated, err := svc.DeliverMessage(DeliverMessageRequest{ + RoomID: "room-1", + SenderID: "manager", + MessageID: root.ID, + Content: "final answer", + }) + if err != nil { + t.Fatalf("DeliverMessage(root replacement) error = %v", err) + } + if updated.Content != "final answer" { + t.Fatalf("updated.Content = %q, want final answer", updated.Content) + } + if updated.Thread == nil || updated.Thread.Context.RootExcerpt != "final answer" { + t.Fatalf("updated.Thread = %+v, want refreshed root excerpt", updated.Thread) + } + + view, err := svc.GetThread("room-1", root.ID) + if err != nil { + t.Fatalf("GetThread() error = %v", err) + } + if view.Summary.Context.RootExcerpt != "final answer" { + t.Fatalf("thread root excerpt = %q, want final answer", view.Summary.Context.RootExcerpt) + } +} + func TestStartThreadContextSnapshotRespectsPayloadCap(t *testing.T) { bootstrap := threadTestBootstrap() large := strings.Repeat("large-context ", 1800) diff --git a/web/app/src/components/business/MessageContent/MessageContent.css b/web/app/src/components/business/MessageContent/MessageContent.css index 1827e9d8..d8701e0d 100644 --- a/web/app/src/components/business/MessageContent/MessageContent.css +++ b/web/app/src/components/business/MessageContent/MessageContent.css @@ -112,6 +112,54 @@ background: var(--panel-strong); } +.message-loading-dots { + display: inline-flex; + min-width: 28px; + min-height: 1em; + align-items: center; + gap: 4px; + color: var(--muted); + vertical-align: middle; +} + +.message-loading-dot { + width: 6px; + height: 6px; + border-radius: 999px; + background: currentColor; + opacity: 0.35; + animation: message-loading-dot 1.1s ease-in-out infinite; +} + +.message-loading-dot:nth-child(2) { + animation-delay: 0.14s; +} + +.message-loading-dot:nth-child(3) { + animation-delay: 0.28s; +} + +@keyframes message-loading-dot { + 0%, + 80%, + 100% { + opacity: 0.35; + transform: translateY(0); + } + + 40% { + opacity: 0.9; + transform: translateY(-2px); + } +} + +@media (prefers-reduced-motion: reduce) { + .message-loading-dot { + animation: none; + opacity: 0.65; + } +} + .structured-message-code, .structured-message-json { border-color: var(--line); diff --git a/web/app/src/components/business/MessageContent/MessageContent.tsx b/web/app/src/components/business/MessageContent/MessageContent.tsx index 60cfd29c..1b36231e 100644 --- a/web/app/src/components/business/MessageContent/MessageContent.tsx +++ b/web/app/src/components/business/MessageContent/MessageContent.tsx @@ -14,7 +14,11 @@ import "./MessageContent.css"; export function MessageContent({ content, message, actionBusy, actionError, onAction }: MessageContentProps) { const containerRef = useRef(null); - const activity = useMemo(() => parseAgentActivity(content), [content]); + const blankTurnPlaceholder = isBlankTurnPlaceholder(content); + const activity = useMemo( + () => (blankTurnPlaceholder ? null : parseAgentActivity(content)), + [blankTurnPlaceholder, content], + ); const slashCommand = useMemo(() => (activity ? null : parseSlashCommand(content)), [activity, content]); const slashCommandText = useMemo(() => renderSlashCommandText(slashCommand), [slashCommand]); const structured = useMemo( @@ -45,6 +49,16 @@ export function MessageContent({ content, message, actionBusy, actionError, onAc }; }, [markup]); + if (blankTurnPlaceholder) { + return ( + +