diff --git a/pkg/channels/csgclaw/csgclaw.go b/pkg/channels/csgclaw/csgclaw.go index bbe108c640..43732f8684 100644 --- a/pkg/channels/csgclaw/csgclaw.go +++ b/pkg/channels/csgclaw/csgclaw.go @@ -44,13 +44,23 @@ type Channel struct { } type eventPayload struct { - MessageID string `json:"message_id"` - RoomID string `json:"room_id"` - ChatType string `json:"chat_type"` - Sender sender `json:"sender"` - Text string `json:"text"` - Timestamp string `json:"timestamp"` - Mentions []string `json:"mentions"` + MessageID string `json:"message_id"` + RoomID string `json:"room_id"` + ChatID string `json:"chat_id"` + ChatType string `json:"chat_type"` + ThreadRootID string `json:"thread_root_id"` + Sender sender `json:"sender"` + Text string `json:"text"` + Timestamp string `json:"timestamp"` + Mentions []string `json:"mentions"` + Context eventContext `json:"context"` +} + +type eventContext struct { + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + ChatType string `json:"chat_type"` + TopicID string `json:"topic_id"` } type sender struct { @@ -60,8 +70,16 @@ type sender struct { } type sendRequest struct { - RoomID string `json:"room_id"` - Text string `json:"text"` + RoomID string `json:"room_id"` + Text string `json:"text"` + TopicID string `json:"topic_id,omitempty"` + Context *sendContext `json:"context,omitempty"` +} + +type sendContext struct { + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` + TopicID string `json:"topic_id,omitempty"` } func NewChannel(cfg config.CSGClawConfig, messageBus *bus.MessageBus) (*Channel, error) { @@ -125,8 +143,7 @@ func (c *Channel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return channels.ErrNotRunning } - fmt.Printf("received msg: %+v\n", msg) - roomID := msg.ChatID + roomID, topicID := splitTopicChatID(msg.ChatID) if strings.TrimSpace(roomID) == "" { return fmt.Errorf("csgclaw chat ID is empty: %w", channels.ErrSendFailed) } @@ -134,10 +151,20 @@ func (c *Channel) Send(ctx context.Context, msg bus.OutboundMessage) error { return fmt.Errorf("csgclaw content is empty: %w", channels.ErrSendFailed) } - body, err := json.Marshal(sendRequest{ + payload := sendRequest{ RoomID: roomID, Text: msg.Content, - }) + } + if topicID != "" { + payload.TopicID = topicID + payload.Context = &sendContext{ + Channel: "csgclaw", + ChatID: roomID, + TopicID: topicID, + } + } + + body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("csgclaw marshal send payload: %w", channels.ErrSendFailed) } @@ -151,6 +178,7 @@ func (c *Channel) Send(ctx context.Context, msg bus.OutboundMessage) error { logger.InfoCF("csgclaw", "Sending outbound message", map[string]any{ "room_id": roomID, + "topic_id": topicID, "content_len": len(msg.Content), "endpoint_url": c.sendURL(), }) @@ -324,7 +352,11 @@ func (c *Channel) handleInboundEvent(evt eventPayload) { "event": evt, }) - if strings.TrimSpace(evt.RoomID) == "" || strings.TrimSpace(evt.Sender.ID) == "" { + roomID := resolvedRoomID(evt) + topicID := resolvedTopicID(evt) + chatID := topicChatID(roomID, topicID) + + if strings.TrimSpace(roomID) == "" || strings.TrimSpace(evt.Sender.ID) == "" { return } @@ -333,6 +365,9 @@ func (c *Channel) handleInboundEvent(evt eventPayload) { if strings.EqualFold(evt.ChatType, "group") { peerKind = "group" isMentioned := hasInboundBotAtMention(content, c.config.BotID) + if !isMentioned && hasInboundAtMention(content) { + return + } content = normalizeInboundAtMentions(content) shouldRespond, normalized := c.ShouldRespondInGroup(isMentioned, content) if !shouldRespond { @@ -352,17 +387,26 @@ func (c *Channel) handleInboundEvent(evt eventPayload) { metadata := map[string]string{ "timestamp": evt.Timestamp, "chat_type": evt.ChatType, + "room_id": roomID, } if len(evt.Mentions) > 0 { metadata["mentions"] = strings.Join(evt.Mentions, ",") } + if topicID != "" { + metadata["topic_id"] = topicID + metadata["parent_peer_kind"] = "topic" + metadata["parent_peer_id"] = topicID + } + if strings.TrimSpace(evt.ThreadRootID) != "" { + metadata["thread_root_id"] = strings.TrimSpace(evt.ThreadRootID) + } c.HandleMessage( c.ctx, - bus.Peer{Kind: peerKind, ID: evt.RoomID}, + bus.Peer{Kind: peerKind, ID: chatID}, evt.MessageID, evt.Sender.ID, - evt.RoomID, + chatID, content, nil, metadata, @@ -370,6 +414,44 @@ func (c *Channel) handleInboundEvent(evt eventPayload) { ) } +func resolvedRoomID(evt eventPayload) string { + if roomID := strings.TrimSpace(evt.RoomID); roomID != "" { + return roomID + } + if chatID := strings.TrimSpace(evt.ChatID); chatID != "" { + return chatID + } + return strings.TrimSpace(evt.Context.ChatID) +} + +func resolvedTopicID(evt eventPayload) string { + if topicID := strings.TrimSpace(evt.Context.TopicID); topicID != "" { + return topicID + } + return strings.TrimSpace(evt.ThreadRootID) +} + +func topicChatID(roomID, topicID string) string { + roomID = strings.TrimSpace(roomID) + topicID = strings.TrimSpace(topicID) + if roomID == "" || topicID == "" { + return roomID + } + return roomID + "/" + topicID +} + +func splitTopicChatID(chatID string) (roomID, topicID string) { + chatID = strings.TrimSpace(chatID) + if chatID == "" { + return "", "" + } + idx := strings.LastIndex(chatID, "/") + if idx <= 0 || idx == len(chatID)-1 { + return chatID, "" + } + return strings.TrimSpace(chatID[:idx]), strings.TrimSpace(chatID[idx+1:]) +} + func hasInboundBotAtMention(content, botID string) bool { content = strings.TrimSpace(content) botID = strings.TrimSpace(botID) @@ -396,6 +478,14 @@ func hasInboundBotAtMention(content, botID string) bool { } } +func hasInboundAtMention(content string) bool { + content = strings.TrimSpace(content) + if content == "" { + return false + } + return strings.Contains(content, `manager hi","context":{"topic_id":"msg-root"}}`) + + select { + case msg := <-mb.InboundChan(): + if msg.ChatID != "room-1/msg-root" { + t.Fatalf("inbound chat ID = %q, want %q", msg.ChatID, "room-1/msg-root") + } + if msg.Peer.Kind != "group" { + t.Fatalf("peer kind = %q, want group", msg.Peer.Kind) + } + if msg.Peer.ID != "room-1/msg-root" { + t.Fatalf("peer ID = %q, want %q", msg.Peer.ID, "room-1/msg-root") + } + if msg.Metadata["room_id"] != "room-1" { + t.Fatalf("metadata room_id = %q, want room-1", msg.Metadata["room_id"]) + } + if msg.Metadata["topic_id"] != "msg-root" { + t.Fatalf("metadata topic_id = %q, want msg-root", msg.Metadata["topic_id"]) + } + if msg.Metadata["parent_peer_kind"] != "topic" { + t.Fatalf("metadata parent_peer_kind = %q, want topic", msg.Metadata["parent_peer_kind"]) + } + if msg.Metadata["parent_peer_id"] != "msg-root" { + t.Fatalf("metadata parent_peer_id = %q, want msg-root", msg.Metadata["parent_peer_id"]) + } + case <-time.After(50 * time.Millisecond): + t.Fatal("timed out waiting for inbound message") + } +} + +func TestSendThreadMessageIncludesTopicContext(t *testing.T) { + var got map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/bots/u-manager/messages/send" { + http.NotFound(w, r) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("ReadAll() error = %v", err) + } + if err := json.Unmarshal(body, &got); err != nil { + t.Fatalf("Unmarshal(%s) error = %v", string(body), err) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + mb := bus.NewMessageBus() + defer mb.Close() + + ch, err := NewChannel(config.CSGClawConfig{ + BaseURL: server.URL, + BotID: "u-manager", + AccessToken: "secret", + }, mb) + if err != nil { + t.Fatalf("NewChannel() error = %v", err) + } + ch.SetRunning(true) + + if err := ch.Send(context.Background(), bus.OutboundMessage{ + ChatID: "room-1/msg-root", + Content: "thread answer", + }); err != nil { + t.Fatalf("Send() error = %v", err) + } + + if got["room_id"] != "room-1" { + t.Fatalf("room_id = %v, want room-1; payload=%v", got["room_id"], got) + } + if got["text"] != "thread answer" { + t.Fatalf("text = %v, want thread answer; payload=%v", got["text"], got) + } + if got["topic_id"] != "msg-root" { + t.Fatalf("topic_id = %v, want msg-root; payload=%v", got["topic_id"], got) + } + contextPayload, ok := got["context"].(map[string]any) + if !ok { + t.Fatalf("context = %T, want object; payload=%v", got["context"], got) + } + if contextPayload["chat_id"] != "room-1" { + t.Fatalf("context.chat_id = %v, want room-1; payload=%v", contextPayload["chat_id"], got) + } + if contextPayload["topic_id"] != "msg-root" { + t.Fatalf("context.topic_id = %v, want msg-root; payload=%v", contextPayload["topic_id"], got) + } +} + func TestHandleInboundEventConsumesRoomIDPayload(t *testing.T) { mb := bus.NewMessageBus() defer mb.Close()