generated from langbot-app/HelloPlugin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathforward.py
More file actions
138 lines (125 loc) · 4.49 KB
/
forward.py
File metadata and controls
138 lines (125 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# forward.py
# auther: https://github.com/Hanschase
import json
import re
import aiohttp
import os
from typing import List, Dict
class ForwardMessage:
def __init__(self, host: str, port: int):
self.base_url = f"http://{host}:{port}"
async def send_forward(
self,
launcher_id: str,
messages: List[Dict],
prompt: str = "默认提示",
summary: str = "默认摘要",
source: str = "默认来源",
user_id: str = "100000",
nickname: str = "消息助手",
mode: str = "single" # 新增模式参数
) -> dict:
"""发送合并转发消息"""
# 构建消息节点
if mode == "single":
nodes = self._build_single_node(messages, user_id, nickname)
item_count = '1'
else:
nodes = self._build_nodes(messages, user_id, nickname)
item_count = len(nodes)
# 自动追加统计信息
formatted_summary = f"{summary} | 共{item_count}条内容"
message_data = {
"group_id": launcher_id,
"messages": nodes,
"prompt": prompt,
"summary": formatted_summary,
"source": source
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/send_forward_msg",
json=message_data,
headers={'Content-Type': 'application/json'}
) as resp:
return await resp.json()
def _build_single_node(self, messages: List[Dict], user_id: str, nickname: str) -> List[Dict]:
"""构建单节点消息"""
return [{
"type": "node",
"data": {
"user_id": user_id,
"nickname": nickname,
"content": self._parse_contents(messages)
}
}]
def _build_nodes(self, messages: List[Dict],user_id: str, nickname: str) -> List[Dict]:
"""构建消息节点"""
nodes = []
for idx, msg in enumerate(messages):
node = {
"type": "node",
"data": {
"user_id": user_id,
"nickname": nickname,
"content": self._parse_content(msg)
}
}
nodes.append(node)
return nodes
def _parse_contents(self, messages: Dict) -> List[Dict]:
"""解析全部消息内容"""
contents = []
for msg in messages:
contents.extend(self._parse_content(msg)) # 使用_parse_content来处理每条消息
return contents
def _parse_content(self, msg: Dict) -> List[Dict]:
"""解析消息内容"""
# content = []
# if 'image' in msg:
# content.append({
# "type": "image",
# "data": {"file": self._get_media_path(msg['image'])}
# })
# if 'text' in msg:
# content.append({
# "type": "text",
# "data": {"text": msg['text']}
# })
return msg.get("content", [])
# return content
def _get_media_path(self, path: str) -> str:
"""获取合法媒体路径"""
if path.startswith(('http://', 'https://')):
return path
if os.path.isfile(path):
return f"file:///{os.path.abspath(path)}"
return ""
def convert_to_forward(self, raw_message: str) -> list[dict]:
"""升级版消息解析,按块分组,保留图文顺序"""
messages = []
for block in raw_message.split('\n---\n'):
block = block.strip()
if not block:
continue
content = []
elements = re.split(r'(!\[.*?\]\(.*?\))', block)
for elem in elements:
elem = elem.strip()
if not elem:
continue
if elem.startswith('!['): # 图片
match = re.match(r'!\[.*?\]\((.*?)\)', elem)
if match:
content.append({
"type": "image",
"data": {"file": match.group(1)}
})
else: # 文本
content.append({
"type": "text",
"data": {"text": elem}
})
if content:
messages.append({"content": content})
return messages