diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index d060ce1c3d..f61dfabba6 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -535,6 +535,14 @@ "mattermost_bot_token": "", "mattermost_reconnect_delay": 5.0, }, + "Ntfy": { + "id": "ntfy", + "type": "ntfy", + "enable": True, + "ntfy_server_url": "https://ntfy.sh", + "ntfy_topic": "", + "ntfy_access_token": "", + }, # "WebChat": { # "id": "webchat", # "type": "webchat", @@ -994,6 +1002,21 @@ "type": "int", "hint": "允许的最大连续失败次数,超过后停止重试。", }, + "ntfy_server_url": { + "description": "ntfy Server URL", + "type": "string", + "hint": "ntfy 服务器地址,例如 https://ntfy.sh 或您的自建实例地址。", + }, + "ntfy_topic": { + "description": "ntfy Topic", + "type": "string", + "hint": "用于收发消息的唯一订阅主题名称 (请确保其足够私密)。", + }, + "ntfy_access_token": { + "description": "Access Token (Optional)", + "type": "string", + "hint": "如果您的 ntfy 服务器开启了身份验证,请在此输入 Bearer Token。", + }, }, }, "platform_settings": { diff --git a/astrbot/core/platform/manager.py b/astrbot/core/platform/manager.py index 22409c0f83..80cf7f6af2 100644 --- a/astrbot/core/platform/manager.py +++ b/astrbot/core/platform/manager.py @@ -194,6 +194,10 @@ async def load_platform(self, platform_config: dict) -> None: from .sources.mattermost.mattermost_adapter import ( MattermostPlatformAdapter, # noqa: F401 ) + case "ntfy": + from .sources.ntfy.ntfy_adapter import ( + NtfyPlatformAdapter, # noqa: F401 + ) except (ImportError, ModuleNotFoundError) as e: logger.error( f"加载平台适配器 {platform_config['type']} 失败,原因:{e}。请检查依赖库是否安装。提示:可以在 管理面板->平台日志->安装Pip库 中安装依赖库。", diff --git a/astrbot/core/platform/sources/ntfy/ntfy_adapter.py b/astrbot/core/platform/sources/ntfy/ntfy_adapter.py new file mode 100644 index 0000000000..e9f0a03a61 --- /dev/null +++ b/astrbot/core/platform/sources/ntfy/ntfy_adapter.py @@ -0,0 +1,274 @@ +import asyncio +import time +import uuid +from typing import Any, cast + +from astrbot.api import logger +from astrbot.api.event import MessageChain +from astrbot.api.message_components import At, File, Image, Plain, Record, Video +from astrbot.api.platform import ( + AstrBotMessage, + MessageMember, + MessageType, + Platform, + PlatformMetadata, +) +from astrbot.core.platform.astr_message_event import MessageSesion + +from ...register import register_platform_adapter +from .ntfy_api import NtfyAPIClient +from .ntfy_event import NtfyMessageEvent + +NTFY_CONFIG_METADATA = { + "ntfy_server_url": { + "description": "ntfy Server URL", + "type": "string", + "hint": "ntfy 服务器地址,例如 https://ntfy.sh 或您的自建实例地址。", + }, + "ntfy_topic": { + "description": "ntfy Topic", + "type": "string", + "hint": "用于收发消息的唯一订阅主题名称 (请确保其足够私密)。", + }, + "ntfy_access_token": { + "description": "Access Token (Optional)", + "type": "string", + "hint": "如果您的 ntfy 服务器开启了身份验证,请在此输入 Bearer Token。", + }, +} + +NTFY_I18N_RESOURCES = { + "zh-CN": { + "ntfy_server_url": { + "description": "ntfy 服务器地址", + "hint": "ntfy 服务器地址,例如 https://ntfy.sh", + }, + "ntfy_topic": { + "description": "订阅主题 (Topic)", + "hint": "用于收发消息的唯一订阅主题名称。", + }, + "ntfy_access_token": { + "description": "访问令牌 (可选)", + "hint": "有访问权限控制的服务器需要填写此 Token。", + }, + }, + "en-US": { + "ntfy_server_url": { + "description": "ntfy Server URL", + "hint": "The ntfy instance server base URL, e.g., https://ntfy.sh", + }, + "ntfy_topic": { + "description": "Subscription Topic", + "hint": "The secret unique topic used to listen and publish messages.", + }, + "ntfy_access_token": { + "description": "Access Token (Optional)", + "hint": "Bearer token if your ntfy server requires authentication.", + }, + }, +} + + +@register_platform_adapter( + "ntfy", + "ntfy 消息通知适配器", + support_streaming_message=False, + config_metadata=NTFY_CONFIG_METADATA, + i18n_resources=NTFY_I18N_RESOURCES, +) +class NtfyPlatformAdapter(Platform): + def __init__( + self, + platform_config: dict, + platform_settings: dict, + event_queue: asyncio.Queue, + ) -> None: + super().__init__(platform_config, event_queue) + self.settings = platform_settings + self._event_id_timestamps: dict[str, float] = {} + self.shutdown_event = asyncio.Event() + + server_url = str( + platform_config.get("ntfy_server_url") or "https://ntfy.sh" + ).strip() + topic = str(platform_config.get("ntfy_topic") or "").strip() + access_token = str(platform_config.get("ntfy_access_token") or "").strip() + + logger.info(platform_config) + + if not topic: + raise ValueError("ntfy 适配器必须配置有效的订阅主题 (topic)。") + + self.ntfy_api = NtfyAPIClient( + server_url=server_url, + topic=topic, + access_token=access_token if access_token else None, + ) + self._listener_task: asyncio.Task | None = None + + async def send_by_session( + self, + session: MessageSesion, + message_chain: MessageChain, + ) -> None: + """Sends messages back using the active session tracking wrapper.""" + # Delegating formatting pipelines downstream into event instance layer + pass + + def meta(self) -> PlatformMetadata: + return PlatformMetadata( + name="ntfy", + description="ntfy 消息通知适配器", + id=cast(str, self.config.get("id", "ntfy")), + support_streaming_message=False, + ) + + async def run(self) -> None: + """Launches the background long-polling subscription stream client loop.""" + logger.info( + "[ntfy] Instantiating subscriber client on topic: %s", self.ntfy_api.topic + ) + self._listener_task = asyncio.create_task(self._stream_listener_loop()) + await self.shutdown_event.wait() + + async def terminate(self) -> None: + self.shutdown_event.set() + if self._listener_task: + self._listener_task.cancel() + await self.ntfy_api.close() + + async def _stream_listener_loop(self) -> None: + """Monitors incoming real-time notifications with an automatic reconnection strategy.""" + while not self.shutdown_event.is_set(): + try: + async for raw_event in self.ntfy_api.get_stream(): + if self.shutdown_event.is_set(): + break + + if str(raw_event.get("event", "")) != "message": + continue + + # Skips notifications marked with a 'robot' tag to prevent endless feedback loops + if "robot" in raw_event.get( + "tags", [] + ) or "AstrBot" in raw_event.get("title", ""): + continue + + event_id = str(raw_event.get("id", "")) + if event_id and self._is_duplicate_event(event_id): + continue + + abm = await self.convert_message(raw_event) + if abm is None: + continue + await self.handle_msg(abm) + + except asyncio.CancelledError: + break + except Exception as e: + if not self.shutdown_event.is_set(): + logger.warning( + "[ntfy] Stream pipe disconnected (%s). Reconnecting in 3s...", e + ) + await asyncio.sleep(3) + + async def convert_message(self, event: dict[str, Any]) -> AstrBotMessage | None: + message_text = str(event.get("message", "")) + topic = str(event.get("topic", "unknown")) + + abm = AstrBotMessage() + abm.self_id = self.meta().id + abm.message = [] + abm.raw_message = event + abm.message_id = str(event.get("id") or uuid.uuid4().hex) + + event_timestamp = event.get("time") + abm.timestamp = ( + int(event_timestamp) + if isinstance(event_timestamp, int) + else int(time.time()) + ) + + abm.type = MessageType.FRIEND_MESSAGE + abm.session_id = topic + abm.sender = MessageMember( + user_id="ntfy_client", nickname=f"ntfy ({topic[:6]})" + ) + + components = await self._parse_ntfy_message_components(message_text, event) + if not components: + return None + + abm.message = components + abm.message_str = self._build_message_str(components) + return abm + + async def _parse_ntfy_message_components( + self, text: str, event: dict[str, Any] + ) -> list: + components = [] + if text: + components.append(Plain(text=text)) + + attachment = event.get("attachment") + if isinstance(attachment, dict): + file_url = str(attachment.get("url", "")).strip() + mime_type = str(attachment.get("type", "")).lower().strip() + filename = str(attachment.get("name", "")).strip() or "attachment.bin" + + if file_url: + if mime_type.startswith("image/"): + components.append(Image.fromURL(file_url)) + elif mime_type.startswith("video/"): + components.append(Video.fromURL(file_url)) + elif mime_type.startswith("audio/"): + components.append(Record.fromURL(file_url)) + else: + components.append(File(name=filename, file=file_url, url=file_url)) + + return components + + @staticmethod + def _build_message_str(components: list) -> str: + parts: list[str] = [] + for comp in components: + if isinstance(comp, Plain): + parts.append(comp.text) + elif isinstance(comp, At): + parts.append(f"@{comp.name or comp.qq}") + elif isinstance(comp, Image): + parts.append("[image]") + elif isinstance(comp, Video): + parts.append("[video]") + elif isinstance(comp, Record): + parts.append("[audio]") + elif isinstance(comp, File): + parts.append(str(comp.name or "[file]")) + return " ".join(i for i in parts if i).strip() + + def _clean_expired_events(self) -> None: + current = time.time() + expired = [ + ev_id + for ev_id, ts in self._event_id_timestamps.items() + if current - ts > 1800 + ] + for ev_id in expired: + del self._event_id_timestamps[ev_id] + + def _is_duplicate_event(self, event_id: str) -> bool: + self._clean_expired_events() + if event_id in self._event_id_timestamps: + return True + self._event_id_timestamps[event_id] = time.time() + return False + + async def handle_msg(self, abm: AstrBotMessage) -> None: + event = NtfyMessageEvent( + message_str=abm.message_str, + message_obj=abm, + platform_meta=self.meta(), + session_id=abm.session_id, + ntfy_api=self.ntfy_api, + ) + self._event_queue.put_nowait(event) diff --git a/astrbot/core/platform/sources/ntfy/ntfy_api.py b/astrbot/core/platform/sources/ntfy/ntfy_api.py new file mode 100644 index 0000000000..5149e2e5cb --- /dev/null +++ b/astrbot/core/platform/sources/ntfy/ntfy_api.py @@ -0,0 +1,185 @@ +import base64 +import json +from typing import Any +from urllib.parse import unquote + +import aiohttp + +from astrbot.api import logger + + +class NtfyAPIClient: + def __init__( + self, + *, + server_url: str, + topic: str, + access_token: str | None = None, + timeout_seconds: int = 30, + ) -> None: + self.server_url = server_url.strip().rstrip("/") + self.topic = topic.strip() + self.timeout = aiohttp.ClientTimeout(total=timeout_seconds) + self._session: aiohttp.ClientSession | None = None + + self.headers = {} + if access_token: + self.headers["Authorization"] = f"Bearer {access_token.strip()}" + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession(timeout=self.timeout) + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + + @property + def _base_url(self) -> str: + return f"{self.server_url}/{self.topic}" + + async def get_stream(self) -> Any: + """Yields live JSON structures from the persistent ntfy event pipe.""" + url = f"{self._base_url}/json" + session = await self._get_session() + try: + async with session.get(url, headers=self.headers, timeout=None) as resp: + if resp.status != 200: + logger.error( + "[ntfy-api] Streaming connection failed: status=%s", resp.status + ) + return + async for line in resp.content: + if line: + try: + yield json.loads(line.decode("utf-8")) + except json.JSONDecodeError: + continue + except Exception as e: + logger.error("[ntfy-api] Exception encountered in stream: %s", e) + raise e + + async def send_notification( + self, + message: str, + *, + title: str | None = "AstrBot", + tags: list[str] | None = None, + click_url: str | None = None, + actions: list[dict[str, Any]] | None = None, + ) -> bool: + """Sends a standard text notification payload.""" + headers = {**self.headers} + # if title: + # headers["X-Title"] = title + # if tags: + # headers["X-Tags"] = ",".join(tags) + headers["X-Title"] = "AstrBot" + headers["X-Tags"] = "robot" + if click_url: + headers["X-Click"] = click_url + if actions: + headers["X-Actions"] = json.dumps(actions) + + session = await self._get_session() + try: + async with session.post( + self._base_url, data=message.encode("utf-8"), headers=headers + ) as resp: + if resp.status < 400: + return True + body = await resp.text() + logger.error( + "[ntfy-api] Post message failed: status=%s body=%s", + resp.status, + body, + ) + return False + except Exception as e: + logger.error("[ntfy-api] Post message request failed: %s", e) + return False + + async def send_file( + self, + file_bytes: bytes, + filename: str, + message: str | None = None, + ) -> bool: + """Uploads a rich attachment asset via PUT binary stream.""" + + def encode_non_ascii(text: str) -> str: + if text.isascii(): + return text + else: + try: + text.encode("ascii") + return text + except UnicodeEncodeError: + return f"=?utf-8?B?{base64.b64encode(text.encode('utf-8')).decode('ascii')}?=" + + headers = { + **self.headers, + "X-Title": "AstrBot", + "X-Tags": "robot", + "X-Filename": encode_non_ascii(filename), + } + if message: + headers["X-Message"] = encode_non_ascii(message) + + session = await self._get_session() + try: + async with session.put( + self._base_url, data=file_bytes, headers=headers + ) as resp: + if resp.status < 400: + return True + body = await resp.text() + logger.error( + "[ntfy-api] Upload file failed: status=%s body=%s", + resp.status, + body, + ) + return False + except Exception as e: + logger.error("[ntfy-api] Upload file request failed: %s", e) + return False + + async def get_message_content( + self, url: str + ) -> tuple[bytes, str | None, str | None] | None: + """Downloads external/incoming attachment binary targets if needed.""" + session = await self._get_session() + try: + async with session.get(url, headers=self.headers) as resp: + if resp.status != 200: + body = await resp.text() + logger.warning( + "[ntfy-api] Content download failed: status=%s body=%s", + resp.status, + body, + ) + return None + + content = await resp.read() + content_type = resp.headers.get("Content-Type") + disposition = resp.headers.get("Content-Disposition") + filename = self._extract_filename_from_disposition(disposition) + return content, content_type, filename + except Exception as e: + logger.error("[ntfy-api] Content retrieval exception: %s", e) + return None + + def _extract_filename_from_disposition(self, disposition: str | None) -> str | None: + if not disposition: + return None + for part in disposition.split(";"): + token = part.strip() + if token.startswith("filename*="): + val = token.split("=", 1)[1].strip().strip('"') + if val.lower().startswith("utf-8''"): + val = val[7:] + return unquote(val) + if token.startswith("filename="): + return token.split("=", 1)[1].strip().strip('"') + return None diff --git a/astrbot/core/platform/sources/ntfy/ntfy_event.py b/astrbot/core/platform/sources/ntfy/ntfy_event.py new file mode 100644 index 0000000000..0f4adbb357 --- /dev/null +++ b/astrbot/core/platform/sources/ntfy/ntfy_event.py @@ -0,0 +1,120 @@ +import asyncio +import os +import re +from collections.abc import AsyncGenerator +from typing import Any + +from astrbot.api import logger +from astrbot.api.event import AstrMessageEvent, MessageChain +from astrbot.api.message_components import ( + At, + BaseMessageComponent, + File, + Image, + Plain, + Record, + Video, +) + +from .ntfy_api import NtfyAPIClient + + +class NtfyMessageEvent(AstrMessageEvent): + def __init__( + self, + message_str: str, + message_obj: Any, + platform_meta: Any, + session_id: str, + ntfy_api: NtfyAPIClient, + ) -> None: + super().__init__(message_str, message_obj, platform_meta, session_id) + self.ntfy_api = ntfy_api + + async def send(self, message: MessageChain) -> None: + """Dispatches rich content payloads or raw fallback string configurations.""" + text_payload = "" + file_component: BaseMessageComponent | None = None + + for segment in message.chain: + if isinstance(segment, Plain): + text_payload += segment.text + elif isinstance(segment, At): + name = str(segment.name or segment.qq or "").strip() + if name: + text_payload += f" @{name} " + elif isinstance(segment, (Image, File, Video, Record)): + file_component = segment + + if file_component: + try: + file_path = await file_component.convert_to_file_path() + if file_path and os.path.exists(file_path): + filename = getattr( + file_component, "name", None + ) or os.path.basename(file_path) + with open(file_path, "rb") as f: + file_bytes = f.read() + + await self.ntfy_api.send_file( + file_bytes=file_bytes, + filename=filename, + message=text_payload.strip() if text_payload else None, + ) + await super().send(message) + return + except Exception as e: + logger.error( + "[ntfy-event] Failed resolving attachment binary sequence: %s", e + ) + + if text_payload.strip(): + await self.ntfy_api.send_notification(text_payload.strip()) + + await super().send(message) + + async def send_streaming( + self, + generator: AsyncGenerator, + use_fallback: bool = False, + ) -> Any: + """Pipes live execution generator data using a structured pattern tokenizer buffer.""" + if not use_fallback: + buffer = None + async for chain in generator: + if not buffer: + buffer = chain + else: + buffer.chain.extend(chain.chain) + if not buffer: + return None + buffer.squash_plain() + await self.send(buffer) + return await super().send_streaming(generator, use_fallback) + + buffer = "" + pattern = re.compile(r"[^。?!~…]+[。?!~…]+") + + async for chain in generator: + if isinstance(chain, MessageChain): + for comp in chain.chain: + if isinstance(comp, Plain): + buffer += comp.text + if any(p in buffer for p in "。?!~…"): + buffer = await self.process_buffer(buffer, pattern) + else: + await self.send(MessageChain(chain=[comp])) + await asyncio.sleep(1.5) + + if buffer.strip(): + await self.send(MessageChain([Plain(buffer)])) + return await super().send_streaming(generator, use_fallback) + + async def process_buffer(self, buffer: str, pattern: re.Pattern) -> str: + matches = pattern.findall(buffer) + if matches: + for match in matches: + await self.send(MessageChain([Plain(match)])) + await asyncio.sleep(1.0) + return pattern.sub("", buffer) + return buffer diff --git a/astrbot/core/star/filter/platform_adapter_type.py b/astrbot/core/star/filter/platform_adapter_type.py index 48c5bc8ff5..dae0e3fa27 100644 --- a/astrbot/core/star/filter/platform_adapter_type.py +++ b/astrbot/core/star/filter/platform_adapter_type.py @@ -27,6 +27,7 @@ class PlatformAdapterType(enum.Flag): WEIXIN_OC = enum.auto() MATTERMOST = enum.auto() WEBCHAT = enum.auto() + NTFY = enum.auto() ALL = enum.auto() @@ -51,6 +52,7 @@ class PlatformAdapterType(enum.Flag): "weixin_oc": PlatformAdapterType.WEIXIN_OC, "mattermost": PlatformAdapterType.MATTERMOST, "webchat": PlatformAdapterType.WEBCHAT, + "ntfy": PlatformAdapterType.NTFY, } diff --git a/dashboard/src/assets/images/platform_logos/ntfy.svg b/dashboard/src/assets/images/platform_logos/ntfy.svg new file mode 100644 index 0000000000..223a3f7812 --- /dev/null +++ b/dashboard/src/assets/images/platform_logos/ntfy.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/dashboard/src/i18n/locales/en-US/features/config-metadata.json b/dashboard/src/i18n/locales/en-US/features/config-metadata.json index a79746c9db..bfd5d07bd6 100644 --- a/dashboard/src/i18n/locales/en-US/features/config-metadata.json +++ b/dashboard/src/i18n/locales/en-US/features/config-metadata.json @@ -749,7 +749,19 @@ "description": "Max Consecutive Failures", "type": "int", "hint": "Maximum allowed consecutive failures; retries will stop if exceeded." - } + }, + "ntfy_server_url": { + "description": "ntfy Server URL", + "hint": "The ntfy instance server base URL, e.g., https://ntfy.sh" + }, + "ntfy_topic": { + "description": "Subscription Topic", + "hint": "The secret unique topic used to listen and publish messages." + }, + "ntfy_access_token": { + "description": "Access Token (Optional)", + "hint": "Bearer token if your ntfy server requires authentication." + } }, "general": { "description": "General", diff --git a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json index 75ce4fd931..10071df2dd 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json +++ b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json @@ -751,7 +751,19 @@ "description": "最大连续失败次数", "type": "int", "hint": "允许的最大连续失败次数,超过后停止重试" - } + }, + "ntfy_server_url": { + "description": "ntfy 服务器地址", + "hint": "ntfy 服务器地址,例如 https://ntfy.sh" + }, + "ntfy_topic": { + "description": "订阅主题 (Topic)", + "hint": "用于收发消息的唯一订阅主题名称。" + }, + "ntfy_access_token": { + "description": "访问令牌 (可选)", + "hint": "有访问权限控制的服务器需要填写此 Token。" + } }, "general": { "description": "基本", diff --git a/dashboard/src/utils/platformUtils.js b/dashboard/src/utils/platformUtils.js index d69af9237e..af12162cd3 100644 --- a/dashboard/src/utils/platformUtils.js +++ b/dashboard/src/utils/platformUtils.js @@ -42,6 +42,8 @@ export function getPlatformIcon(name) { return new URL('@/assets/images/platform_logos/matrix.svg', import.meta.url).href } else if (name === 'mattermost') { return new URL('@/assets/images/platform_logos/mattermost.svg', import.meta.url).href + } else if (name == "ntfy") { + return new URL('@/assets/images/platform_logos/ntfy.svg', import.meta.url).href } }