From 2db067b0d2d7245f59e08575ae333a89dc8018f8 Mon Sep 17 00:00:00 2001 From: shark Date: Tue, 26 May 2026 18:21:14 +0800 Subject: [PATCH] fix(stats): use summary API for episode day stats --- scripts/episode_day_stats.sh | 194 ++++++++++++++++++----------------- 1 file changed, 98 insertions(+), 96 deletions(-) diff --git a/scripts/episode_day_stats.sh b/scripts/episode_day_stats.sh index 90fc321..4c76696 100755 --- a/scripts/episode_day_stats.sh +++ b/scripts/episode_day_stats.sh @@ -3,12 +3,13 @@ # # SPDX-License-Identifier: MulanPSL-2.0 # -# Episode 统计:当天(STATS_TZ)0 点~当前 created_at 区间 + 全量;可选 POST 飞书自动化 Webhook。 -# 部分线上 Keystone 会忽略 list 的 created_at_* 查询参数,故默认「全量分页拉取后在本地按 created_at 过滤」再汇总 partial。 +# Episode 统计:调用管理后台 summary 接口,按 STATS_TZ 计算今日/昨日/总计;可选 POST 飞书自动化 Webhook。 # 逻辑在下方 Python;本文件仅注入环境变量。 # # KEYSTONE_BASE=http://127.0.0.1:9999 TOKEN=… ./scripts/episode_day_stats.sh +# KEYSTONE_BASE=http://127.0.0.1:9999 ./scripts/episode_day_stats.sh # ./scripts/episode_day_stats.sh 'https://www.feishu.cn/flow/api/trigger-webhook/…' +# TOKEN 为空时会用 KEYSTONE_ADMIN_USERNAME/KEYSTONE_ADMIN_PASSWORD 登录,默认 admin/admin123。 # 不传参数则只打印统计、不发飞书(忽略父 shell 里的 FEISHU_WEBHOOK_URL)。飞书 JSON 含今日/总计及昨日 last_* 字段。 # # Requires: python3 (stdlib only) @@ -41,7 +42,10 @@ done export KEYSTONE_BASE="${KEYSTONE_BASE:-http://127.0.0.1:9999}" export TOKEN="${TOKEN:-}" +export KEYSTONE_ADMIN_USERNAME="${KEYSTONE_ADMIN_USERNAME:-admin}" +export KEYSTONE_ADMIN_PASSWORD="${KEYSTONE_ADMIN_PASSWORD:-admin123}" export STATS_TZ="${STATS_TZ:-Asia/Shanghai}" +export STATS_TOTAL_START="${STATS_TOTAL_START:-1970-01-01T00:00:00Z}" export FEISHU_WEBHOOK_URL="${HOOK}" exec python3 - <<'PY' @@ -55,11 +59,12 @@ import sys import urllib.error import urllib.parse import urllib.request -from datetime import datetime, timedelta, timezone -from typing import Dict, List, Optional +from datetime import datetime, timedelta +from typing import Dict, Tuple from zoneinfo import ZoneInfo -LIMIT = 100 +SUMMARY_PATH = "/api/v1/admin/statistics/data-production/summary" +LOGIN_PATH = "/api/v1/auth/login" def rfc3339_z(dt_utc: datetime) -> str: @@ -69,9 +74,18 @@ def rfc3339_z(dt_utc: datetime) -> str: return s + "Z" -def fmt_size_gb2(b: int) -> str: - """Decimal GB (1e9 bytes), two fractional digits.""" - return "%.2fGB" % (int(b) / 1_000_000_000.0) +def fmt_size_binary(b: int) -> str: + """1024-based size scaling, matching Synapse display semantics.""" + value = float(int(b)) + units = ["B", "KB", "MB", "GB", "TB", "PB"] + unit = units[0] + for unit in units: + if abs(value) < 1024 or unit == units[-1]: + break + value /= 1024.0 + if unit == "B": + return "%.0f%s" % (value, unit) + return "%.2f%s" % (value, unit) def fmt_hours2(sec: float) -> str: @@ -80,8 +94,8 @@ def fmt_hours2(sec: float) -> str: def fmt_json_size(b: int) -> str: - """Same text as terminal: raw bytes + GB in existing string field.""" - return "%d 字节 (%s)" % (int(b), fmt_size_gb2(b)) + """Same text as terminal: raw bytes + 1024-based human size.""" + return "%d 字节 (%s)" % (int(b), fmt_size_binary(b)) def fmt_json_duration(sec: float) -> str: @@ -96,79 +110,90 @@ def feishu_webhook_url() -> str: def http_get_json(url: str, headers: Dict[str, str]) -> dict: req = urllib.request.Request(url, headers=headers, method="GET") - with urllib.request.urlopen(req, timeout=120) as resp: - return json.loads(resp.read().decode()) - - -def parse_created_at_utc(raw: object) -> Optional[datetime]: - if raw is None or not isinstance(raw, str): - return None - s = raw.strip() - if not s: - return None - if s.endswith("Z"): - s = s[:-1] + "+00:00" try: - dt = datetime.fromisoformat(s) - except ValueError: - return None - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.astimezone(timezone.utc) + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body = e.read().decode(errors="replace")[:500] + sys.stderr.write("HTTP %s GET %s: %s\n" % (e.code, url, body)) + sys.exit(1) -def fetch_all_episodes(base: str, token: str) -> List[dict]: - """GET /episodes with no created_at filter; paginate until exhausted.""" - base = base.rstrip("/") - headers = {"Accept": "application/json"} +def http_post_json(url: str, body: dict, headers: Dict[str, str]) -> dict: + payload = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + url, + data=payload, + headers={**headers, "Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body_text = e.read().decode(errors="replace")[:500] + sys.stderr.write("HTTP %s POST %s: %s\n" % (e.code, url, body_text)) + sys.exit(1) + + +def resolve_token(base: str, token: str) -> str: + token = token.strip() if token: - headers["Authorization"] = "Bearer " + token + return token - out: List[dict] = [] - offset = 0 - total = 0 + account = os.environ.get("KEYSTONE_ADMIN_USERNAME", "admin").strip() + password = os.environ.get("KEYSTONE_ADMIN_PASSWORD", "admin123") + if not account or not password: + sys.stderr.write("TOKEN is empty and admin login credentials are incomplete\n") + sys.exit(1) - sys.stderr.write( - "[episode_day_stats] fetch all pages (server filter NOT used)\n" + url = base.rstrip("/") + LOGIN_PATH + sys.stderr.write("[episode_day_stats] TOKEN empty; logging in as %s\n" % account) + data = http_post_json( + url, + {"account": account, "password": password}, + {"Accept": "application/json"}, ) + if data.get("error"): + sys.stderr.write("login error: %s\n" % data.get("error")) + sys.exit(1) + if data.get("role") != "admin": + sys.stderr.write("login role is %r, expected admin\n" % data.get("role")) + sys.exit(1) + access_token = str(data.get("access_token") or "").strip() + if not access_token: + sys.stderr.write("login response missing access_token\n") + sys.exit(1) + return access_token - while True: - q = urllib.parse.urlencode({"limit": str(LIMIT), "offset": str(offset)}) - url = base + "/api/v1/episodes?" + q - if offset == 0: - sys.stderr.write("[episode_day_stats] GET %s\n" % url) - data = http_get_json(url, headers) - if data.get("error"): - sys.stderr.write("API error: %s\n" % data.get("error")) - sys.exit(1) - items = data.get("items") or [] - total = int(data.get("total", 0)) - out.extend(items) - n = len(items) - offset += n - if n == 0 or offset >= total: - break +def fetch_summary(base: str, token: str, label: str, start_z: str, end_z: str) -> Tuple[int, int, float]: + headers = {"Accept": "application/json"} + if token: + headers["Authorization"] = "Bearer " + token + + params = urllib.parse.urlencode({"start_time": start_z, "end_time": end_z}) + url = base.rstrip("/") + SUMMARY_PATH + "?" + params sys.stderr.write( - "[episode_day_stats] fetch done total=%d rows=%d\n" % (total, len(out)) + "[episode_day_stats] GET summary %s window (UTC): %s .. %s\n" + % (label, start_z, end_z) ) - return out + data = http_get_json(url, headers) + if data.get("error"): + sys.stderr.write("API error: %s\n" % data.get("error")) + sys.exit(1) + count = int((data.get("count") or {}).get("total") or 0) + total_bytes = int((data.get("size") or {}).get("total_bytes") or 0) + total_ms = float((data.get("duration") or {}).get("total_ms") or 0) + return count, total_bytes, total_ms / 1000.0 -def sum_episodes(rows: List[dict]) -> tuple[int, int, float]: - b = 0 - d = 0.0 - for i in rows: - fs = i.get("file_size_bytes") - b += int(fs) if fs is not None else 0 - dv = i.get("duration_sec") - d += float(dv) if dv is not None else 0.0 - return len(rows), b, d def main() -> None: base = os.environ.get("KEYSTONE_BASE", "http://127.0.0.1:9999").strip() token = os.environ.get("TOKEN", "").strip() + total_start_z = os.environ.get("STATS_TOTAL_START", "1970-01-01T00:00:00Z").strip() raw_tz = (os.environ.get("STATS_TZ") or "Asia/Shanghai").strip() tz_name = raw_tz or "Asia/Shanghai" try: @@ -177,48 +202,25 @@ def main() -> None: sys.stderr.write("invalid STATS_TZ %r: %s\n" % (tz_name, e)) sys.exit(1) + token = resolve_token(base, token) + utc = ZoneInfo("UTC") now_local = datetime.now(tz) day = now_local.date() start_local = datetime(day.year, day.month, day.day, 0, 0, 0, 0, tzinfo=tz) from_z = rfc3339_z(start_local.astimezone(utc)) to_z = rfc3339_z(now_local.astimezone(utc)) - from_dt = start_local.astimezone(timezone.utc) - to_dt = now_local.astimezone(timezone.utc) - - sys.stderr.write( - "[episode_day_stats] partial window (UTC): %s .. %s\n" % (from_z, to_z) - ) - - all_eps = fetch_all_episodes(base, token) - - partial_eps: List[dict] = [] - for i in all_eps: - ct = parse_created_at_utc(i.get("created_at")) - if ct is None: - continue - if from_dt <= ct <= to_dt: - partial_eps.append(i) yesterday = day - timedelta(days=1) start_y = datetime( yesterday.year, yesterday.month, yesterday.day, 0, 0, 0, 0, tzinfo=tz ) - end_y = start_y + timedelta(days=1) - timedelta(microseconds=1) - from_y = start_y.astimezone(timezone.utc) - to_y = end_y.astimezone(timezone.utc) - - yesterday_eps: List[dict] = [] - for i in all_eps: - ct = parse_created_at_utc(i.get("created_at")) - if ct is None: - continue - if from_y <= ct <= to_y: - yesterday_eps.append(i) - - p_count, p_bytes, p_dur = sum_episodes(partial_eps) - y_count, y_bytes, y_dur = sum_episodes(yesterday_eps) - t_count, t_bytes, t_dur = sum_episodes(all_eps) + from_y_z = rfc3339_z(start_y.astimezone(utc)) + to_y_z = from_z + + p_count, p_bytes, p_dur = fetch_summary(base, token, "today", from_z, to_z) + y_count, y_bytes, y_dur = fetch_summary(base, token, "yesterday", from_y_z, to_y_z) + t_count, t_bytes, t_dur = fetch_summary(base, token, "total", total_start_z, to_z) print(">>>") print("今日数据量: %d条" % p_count)