Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/features/search.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ python -m src.main agent chat "найди сообщения про блокче
```bash
python -m src.main search-query list
python -m src.main search-query add "ключевое слово"
python -m src.main search-query add "ключевое слово" --chats "@chat1, -1001234567890"
python -m src.main search-query run 1 # разовый запуск
python -m src.main search-query stats 1 # статистика совпадений
```

=== "Web"
`GET /search-queries/` · `POST /search-queries/add`

Поле `Чаты` у сохранённого запроса необязательно. Пустое значение ищет по всем чатам; непустое ограничивает поиск списком `channel_id`, `@username`, `username` или ссылок `t.me`, разделённых пробелами или запятыми.

Уведомления содержат ссылку на оригинальное сообщение (`t.me/channel/message_id`).
25 changes: 22 additions & 3 deletions src/agent/tools/search_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ async def list_search_queries(args):
if getattr(sq, "notify_on_collect", False):
flags.append("notify")
flags_str = f" [{', '.join(flags)}]" if flags else ""
chats_str = f" chats={sq.chat_filter}" if getattr(sq, "chat_filter", "") else " chats=all"
lines.append(
f"- id={sq.id}: '{sq.query}' interval={sq.interval_minutes}m "
f"{status}{flags_str}"
f"{status}{flags_str}{chats_str}"
)
return _text_response("\n".join(lines))
except Exception as e:
Expand Down Expand Up @@ -76,7 +77,11 @@ async def get_search_query(args):
f"is_regex: {sq.is_regex}",
f"is_fts: {sq.is_fts}",
f"notify_on_collect: {sq.notify_on_collect}",
f"chat_filter: {sq.chat_filter or 'all'}",
]
warning = (await svc.validate_chat_filter(sq.chat_filter)).warning_text()
if warning:
lines.append(f"warning: {warning}")
return _text_response("\n".join(lines))
except Exception as e:
return _text_response(f"Ошибка получения поискового запроса: {e}")
Expand All @@ -101,6 +106,7 @@ async def get_search_query(args):
"track_stats": Annotated[bool, "Записывать ежедневную статистику совпадений"],
"exclude_patterns": Annotated[str, "Паттерны исключения через запятую"],
"max_length": Annotated[int, "Максимальная длина сообщения для совпадения"],
"chat_filter": Annotated[str, "Список чатов через запятую или пробел"],
"confirm": Annotated[bool, "Установите true для подтверждения действия"],
},
)
Expand All @@ -122,6 +128,7 @@ async def add_search_query(args):
track_stats = bool(args.get("track_stats", True))
exclude_patterns = args.get("exclude_patterns", "")
max_length = args.get("max_length")
chat_filter = args.get("chat_filter", args.get("chats", ""))
sq_id = await svc.add(
query,
interval_minutes=interval,
Expand All @@ -131,8 +138,13 @@ async def add_search_query(args):
track_stats=track_stats,
exclude_patterns=exclude_patterns or "",
max_length=int(max_length) if max_length is not None else None,
chat_filter=chat_filter or "",
)
return _text_response(f"Поисковый запрос создан (id={sq_id}).")
warning = (await svc.validate_chat_filter(chat_filter or "")).warning_text()
text = f"Поисковый запрос создан (id={sq_id})."
if warning:
text += f"\nПредупреждение: {warning}"
return _text_response(text)
except Exception as e:
return _text_response(f"Ошибка добавления поискового запроса: {e}")

Expand All @@ -151,6 +163,7 @@ async def add_search_query(args):
"track_stats": Annotated[bool, "Записывать ежедневную статистику совпадений"],
"exclude_patterns": Annotated[str, "Паттерны исключения через запятую"],
"max_length": Annotated[int, "Максимальная длина сообщения для совпадения"],
"chat_filter": Annotated[str, "Список чатов через запятую или пробел"],
"confirm": Annotated[bool, "Установите true для подтверждения действия"],
},
)
Expand All @@ -176,6 +189,7 @@ async def edit_search_query(args):
track_stats = bool(args.get("track_stats", getattr(existing, "track_stats", True)))
exclude_patterns = args.get("exclude_patterns", getattr(existing, "exclude_patterns", ""))
max_length_raw = args.get("max_length", getattr(existing, "max_length", None))
chat_filter = args.get("chat_filter", args.get("chats", getattr(existing, "chat_filter", "")))
ok = await svc.update(
int(sq_id),
query,
Expand All @@ -186,9 +200,14 @@ async def edit_search_query(args):
track_stats=track_stats,
exclude_patterns=exclude_patterns or "",
max_length=int(max_length_raw) if max_length_raw is not None else None,
chat_filter=chat_filter or "",
)
if ok:
return _text_response(f"Поисковый запрос id={sq_id} обновлён.")
warning = (await svc.validate_chat_filter(chat_filter or "")).warning_text()
text = f"Поисковый запрос id={sq_id} обновлён."
if warning:
text += f"\nПредупреждение: {warning}"
return _text_response(text)
return _text_response(f"Не удалось обновить запрос id={sq_id}.")
except Exception as e:
return _text_response(f"Ошибка редактирования поискового запроса: {e}")
Expand Down
42 changes: 42 additions & 0 deletions src/cli/commands/search_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import asyncio
import inspect

from pydantic import ValidationError

Expand All @@ -10,6 +11,20 @@
from src.services.search_query_service import SearchQueryService


async def _chat_filter_warning(svc: SearchQueryService, chat_filter: str) -> str:
validator = getattr(svc, "validate_chat_filter", None)
if validator is None:
return ""
try:
result = validator(chat_filter)
validation = await result if inspect.isawaitable(result) else result
warning_text = getattr(validation, "warning_text", None)
text = warning_text() if callable(warning_text) else ""
return text if isinstance(text, str) else ""
except Exception:
return ""


def run(args: argparse.Namespace) -> None:
async def _run() -> None:
_, db = await runtime.init_db(args.config)
Expand All @@ -35,6 +50,11 @@ async def _run() -> None:
(item["last_run"] or "—")[:20],
)
)
chat_filter = getattr(sq, "chat_filter", "")
if chat_filter:
print(f" chats: {chat_filter}")
if item.get("chat_filter_warnings"):
print(f" warning: {item['chat_filter_warnings']}")

elif args.search_query_action == "get":
sq = await svc.get(args.id)
Expand All @@ -51,6 +71,11 @@ async def _run() -> None:
print(f"Track stats: {sq.track_stats}")
print(f"Max length: {sq.max_length if sq.max_length is not None else '—'}")
print(f"Exclude patterns: {sq.exclude_patterns or '—'}")
chat_filter = getattr(sq, "chat_filter", "")
print(f"Chats: {chat_filter or 'all'}")
warning = await _chat_filter_warning(svc, chat_filter)
if warning:
print(f"Warning: {warning}")

elif args.search_query_action == "add":
exclude = (
Expand All @@ -66,11 +91,15 @@ async def _run() -> None:
track_stats=args.track_stats,
exclude_patterns=exclude,
max_length=args.max_length,
chat_filter=getattr(args, "chats", ""),
)
except ValidationError as e:
print(f"Error: {e.errors()[0]['msg']}")
return
print(f"Added search query id={sq_id}: {args.query}")
warning = await _chat_filter_warning(svc, getattr(args, "chats", ""))
if warning:
print(f"Warning: {warning}")

elif args.search_query_action == "edit":
sq = await svc.get(args.id)
Expand Down Expand Up @@ -101,11 +130,24 @@ async def _run() -> None:
track_stats=tstats,
exclude_patterns=exclude,
max_length=max_len,
chat_filter=(
args.chats
if getattr(args, "chats", None) is not None
else getattr(sq, "chat_filter", "")
),
)
except ValidationError as e:
print(f"Error: {e.errors()[0]['msg']}")
return
print(f"Updated search query id={args.id}")
warning = await _chat_filter_warning(
svc,
args.chats
if getattr(args, "chats", None) is not None
else getattr(sq, "chat_filter", ""),
)
if warning:
print(f"Warning: {warning}")

elif args.search_query_action == "delete":
await svc.delete(args.id)
Expand Down
3 changes: 3 additions & 0 deletions src/cli/parser_domains/search_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def register(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser
"--exclude-patterns", default="", help="Exclude patterns, one per line (use \\n)"
)
sq_add.add_argument("--max-length", type=int, default=None, help="Max message text length")
sq_add.add_argument("--chats", default="", help="Chat filter: IDs, usernames or t.me links")

sq_edit = sq_sub.add_parser("edit", help="Edit search query")
sq_edit.add_argument("id", type=int, help="Search query id")
Expand All @@ -38,6 +39,8 @@ def register(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser
sq_edit.add_argument("--exclude-patterns", default=None, help="Exclude patterns (use \\n)")
sq_edit.add_argument("--max-length", type=int, default=None, help="Max message text length")
sq_edit.add_argument("--no-max-length", dest="max_length", action="store_const", const=-1)
sq_edit.add_argument("--chats", default=None, help="Chat filter: IDs, usernames or t.me links")
sq_edit.add_argument("--clear-chats", dest="chats", action="store_const", const="")

sq_del = sq_sub.add_parser("delete", help="Delete search query")
sq_del.add_argument("id", type=int, help="Search query id")
Expand Down
8 changes: 7 additions & 1 deletion src/database/bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,12 @@ async def get_recent_searches(self, limit: int = 20) -> list[dict]:
class SearchQueryBundle:
search_queries: SearchQueriesRepository
messages: MessagesRepository
channels: ChannelsRepository | None = None

@classmethod
def from_database(cls, db: "Database") -> "SearchQueryBundle":
repos = db.repos
return cls(repos.search_queries, repos.messages)
return cls(repos.search_queries, repos.messages, repos.channels)

async def add(self, sq: SearchQuery) -> int:
return await self.search_queries.add(sq)
Expand Down Expand Up @@ -809,6 +810,11 @@ async def get_last_recorded_at(self, query_id: int) -> str | None:
async def get_last_recorded_at_all(self) -> dict[int, str]:
return await self.search_queries.get_last_recorded_at_all()

async def get_channels(self) -> list[Channel]:
if self.channels is None:
return []
return await self.channels.get_channels()


@dataclass(frozen=True)
class PipelineBundle:
Expand Down
1 change: 1 addition & 0 deletions src/database/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"track_stats": "track_stats INTEGER DEFAULT 1",
"exclude_patterns": "exclude_patterns TEXT DEFAULT ''",
"max_length": "max_length INTEGER DEFAULT NULL",
"chat_filter": "chat_filter TEXT DEFAULT ''",
},
"notification_bots": {
"tg_username": "tg_username TEXT",
Expand Down
18 changes: 18 additions & 0 deletions src/database/repositories/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from src.models import Message, SearchQuery
from src.utils.datetime import parse_datetime, parse_required_datetime
from src.utils.search_query_chat_filter import parse_chat_filter

logger = logging.getLogger(__name__)
_DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
Expand Down Expand Up @@ -767,6 +768,23 @@ def _build_extra_conditions(sq: SearchQuery) -> tuple[list[str], list]:
continue
conditions.append("m.text NOT LIKE ?")
params.append(f"%{stripped}%")
chat_filter = parse_chat_filter(sq.chat_filter)
if chat_filter.has_filter:
chat_parts = []
if chat_filter.numeric_values:
placeholders = ", ".join("?" for _ in chat_filter.numeric_values)
chat_parts.append(f"m.channel_id IN ({placeholders})")
params.extend(chat_filter.numeric_values)
chat_parts.append(f"c.id IN ({placeholders})")
params.extend(chat_filter.numeric_values)
if chat_filter.usernames:
placeholders = ", ".join("?" for _ in chat_filter.usernames)
chat_parts.append(f"LOWER(c.username) IN ({placeholders})")
params.extend(chat_filter.usernames)
if chat_parts:
conditions.append("(" + " OR ".join(chat_parts) + ")")
else:
conditions.append("0 = 1")
return conditions, params

def _build_sq_parts(self, sq: SearchQuery) -> tuple[str, list[str], list]:
Expand Down
9 changes: 6 additions & 3 deletions src/database/repositories/search_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ async def add(self, sq: SearchQuery) -> int:
cur = await self._db.execute(
"INSERT INTO search_queries "
"(name, query, is_regex, is_fts, is_active, notify_on_collect, "
"track_stats, interval_minutes, exclude_patterns, max_length) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"track_stats, interval_minutes, exclude_patterns, max_length, chat_filter) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
sq.name,
sq.query,
Expand All @@ -27,6 +27,7 @@ async def add(self, sq: SearchQuery) -> int:
sq.interval_minutes,
sq.exclude_patterns,
sq.max_length,
sq.chat_filter,
),
)
await self._db.commit()
Expand Down Expand Up @@ -56,7 +57,7 @@ async def update(self, sq_id: int, sq: SearchQuery) -> None:
await self._db.execute(
"UPDATE search_queries SET name = ?, query = ?, is_regex = ?, is_fts = ?, "
"notify_on_collect = ?, track_stats = ?, interval_minutes = ?, "
"exclude_patterns = ?, max_length = ? "
"exclude_patterns = ?, max_length = ?, chat_filter = ? "
"WHERE id = ?",
(
sq.name,
Expand All @@ -68,6 +69,7 @@ async def update(self, sq_id: int, sq: SearchQuery) -> None:
sq.interval_minutes,
sq.exclude_patterns,
sq.max_length,
sq.chat_filter,
sq_id,
),
)
Expand Down Expand Up @@ -163,5 +165,6 @@ def _row_to_model(row) -> SearchQuery:
interval_minutes=row["interval_minutes"],
exclude_patterns=row["exclude_patterns"] or "",
max_length=row["max_length"],
chat_filter=row["chat_filter"] if "chat_filter" in row.keys() and row["chat_filter"] else "",
created_at=parse_datetime(row["created_at"]),
)
1 change: 1 addition & 0 deletions src/database/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
interval_minutes INTEGER NOT NULL DEFAULT 60,
exclude_patterns TEXT DEFAULT '',
max_length INTEGER DEFAULT NULL,
chat_filter TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
);

Expand Down
1 change: 1 addition & 0 deletions src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ class SearchQuery(BaseModel):
interval_minutes: int = Field(60, ge=1)
exclude_patterns: str = ""
max_length: int | None = None
chat_filter: str = ""
created_at: datetime | None = None

@model_validator(mode="after")
Expand Down
8 changes: 6 additions & 2 deletions src/services/notification_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import logging
import re

from src.models import Message, SearchQuery
from src.models import Channel, Message, SearchQuery
from src.telegram.notifier import Notifier
from src.utils.search_query_chat_filter import chat_filter_matches_message

logger = logging.getLogger(__name__)


class NotificationMatcher:
"""Match messages against notification queries and send batched notifications."""

def __init__(self, notifier: Notifier):
def __init__(self, notifier: Notifier, *, channels: list[Channel] | None = None):
self._notifier = notifier
self._channels = channels or []

async def match_and_notify(
self,
Expand All @@ -30,6 +32,8 @@ async def match_and_notify(
if not msg.text:
continue
for sq in queries:
if not chat_filter_matches_message(sq.chat_filter, msg, channels=self._channels):
continue
if sq.max_length is not None and len(msg.text) >= sq.max_length:
continue
if any(p.lower() in msg.text.lower() for p in sq.exclude_patterns_list):
Expand Down
Loading
Loading