Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 98 additions & 96 deletions scripts/episode_day_stats.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#
# SPDX-License-Identifier: MulanPSL-2.0
#
# Episode 统计:当天(STATS_TZ)0 点~当前 created_at 区间 + 全量;可选 POST 飞书自动化 Webhook。
# 部分线上 Keystone 会忽略 list 的 created_at_* 查询参数,故默认「全量分页拉取后在本地按 created_at 过滤」再汇总 partial。
# Episode 统计:调用管理后台 summary 接口,按 STATS_TZ 计算今日/昨日/总计;可选 POST 飞书自动化 Webhook。
# 逻辑在下方 Python;本文件仅注入环境变量。
#
# KEYSTONE_BASE=http://127.0.0.1:9999 TOKEN=… ./scripts/episode_day_stats.sh
# KEYSTONE_BASE=http://127.0.0.1:9999 ./scripts/episode_day_stats.sh
# ./scripts/episode_day_stats.sh 'https://www.feishu.cn/flow/api/trigger-webhook/…'
# TOKEN 为空时会用 KEYSTONE_ADMIN_USERNAME/KEYSTONE_ADMIN_PASSWORD 登录,默认 admin/admin123。
# 不传参数则只打印统计、不发飞书(忽略父 shell 里的 FEISHU_WEBHOOK_URL)。飞书 JSON 含今日/总计及昨日 last_* 字段。
#
# Requires: python3 (stdlib only)
Expand Down Expand Up @@ -41,7 +42,10 @@ done

export KEYSTONE_BASE="${KEYSTONE_BASE:-http://127.0.0.1:9999}"
export TOKEN="${TOKEN:-}"
export KEYSTONE_ADMIN_USERNAME="${KEYSTONE_ADMIN_USERNAME:-admin}"
export KEYSTONE_ADMIN_PASSWORD="${KEYSTONE_ADMIN_PASSWORD:-admin123}"
export STATS_TZ="${STATS_TZ:-Asia/Shanghai}"
export STATS_TOTAL_START="${STATS_TOTAL_START:-1970-01-01T00:00:00Z}"
export FEISHU_WEBHOOK_URL="${HOOK}"

exec python3 - <<'PY'
Expand All @@ -55,11 +59,12 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from typing import Dict, Tuple
from zoneinfo import ZoneInfo

LIMIT = 100
SUMMARY_PATH = "/api/v1/admin/statistics/data-production/summary"
LOGIN_PATH = "/api/v1/auth/login"


def rfc3339_z(dt_utc: datetime) -> str:
Expand All @@ -69,9 +74,18 @@ def rfc3339_z(dt_utc: datetime) -> str:
return s + "Z"


def fmt_size_gb2(b: int) -> str:
"""Decimal GB (1e9 bytes), two fractional digits."""
return "%.2fGB" % (int(b) / 1_000_000_000.0)
def fmt_size_binary(b: int) -> str:
"""1024-based size scaling, matching Synapse display semantics."""
value = float(int(b))
units = ["B", "KB", "MB", "GB", "TB", "PB"]
unit = units[0]
for unit in units:
if abs(value) < 1024 or unit == units[-1]:
break
value /= 1024.0
if unit == "B":
return "%.0f%s" % (value, unit)
return "%.2f%s" % (value, unit)


def fmt_hours2(sec: float) -> str:
Expand All @@ -80,8 +94,8 @@ def fmt_hours2(sec: float) -> str:


def fmt_json_size(b: int) -> str:
"""Same text as terminal: raw bytes + GB in existing string field."""
return "%d 字节 (%s)" % (int(b), fmt_size_gb2(b))
"""Same text as terminal: raw bytes + 1024-based human size."""
return "%d 字节 (%s)" % (int(b), fmt_size_binary(b))


def fmt_json_duration(sec: float) -> str:
Expand All @@ -96,79 +110,90 @@ def feishu_webhook_url() -> str:

def http_get_json(url: str, headers: Dict[str, str]) -> dict:
req = urllib.request.Request(url, headers=headers, method="GET")
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read().decode())


def parse_created_at_utc(raw: object) -> Optional[datetime]:
if raw is None or not isinstance(raw, str):
return None
s = raw.strip()
if not s:
return None
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(s)
except ValueError:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")[:500]
sys.stderr.write("HTTP %s GET %s: %s\n" % (e.code, url, body))
sys.exit(1)


def fetch_all_episodes(base: str, token: str) -> List[dict]:
"""GET /episodes with no created_at filter; paginate until exhausted."""
base = base.rstrip("/")
headers = {"Accept": "application/json"}
def http_post_json(url: str, body: dict, headers: Dict[str, str]) -> dict:
payload = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={**headers, "Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body_text = e.read().decode(errors="replace")[:500]
sys.stderr.write("HTTP %s POST %s: %s\n" % (e.code, url, body_text))
sys.exit(1)


def resolve_token(base: str, token: str) -> str:
token = token.strip()
if token:
headers["Authorization"] = "Bearer " + token
return token

out: List[dict] = []
offset = 0
total = 0
account = os.environ.get("KEYSTONE_ADMIN_USERNAME", "admin").strip()
password = os.environ.get("KEYSTONE_ADMIN_PASSWORD", "admin123")
if not account or not password:
sys.stderr.write("TOKEN is empty and admin login credentials are incomplete\n")
sys.exit(1)

sys.stderr.write(
"[episode_day_stats] fetch all pages (server filter NOT used)\n"
url = base.rstrip("/") + LOGIN_PATH
sys.stderr.write("[episode_day_stats] TOKEN empty; logging in as %s\n" % account)
data = http_post_json(
url,
{"account": account, "password": password},
{"Accept": "application/json"},
)
if data.get("error"):
sys.stderr.write("login error: %s\n" % data.get("error"))
sys.exit(1)
if data.get("role") != "admin":
sys.stderr.write("login role is %r, expected admin\n" % data.get("role"))
sys.exit(1)
access_token = str(data.get("access_token") or "").strip()
if not access_token:
sys.stderr.write("login response missing access_token\n")
sys.exit(1)
return access_token

while True:
q = urllib.parse.urlencode({"limit": str(LIMIT), "offset": str(offset)})
url = base + "/api/v1/episodes?" + q
if offset == 0:
sys.stderr.write("[episode_day_stats] GET %s\n" % url)
data = http_get_json(url, headers)
if data.get("error"):
sys.stderr.write("API error: %s\n" % data.get("error"))
sys.exit(1)
items = data.get("items") or []
total = int(data.get("total", 0))
out.extend(items)
n = len(items)
offset += n
if n == 0 or offset >= total:
break

def fetch_summary(base: str, token: str, label: str, start_z: str, end_z: str) -> Tuple[int, int, float]:
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = "Bearer " + token

params = urllib.parse.urlencode({"start_time": start_z, "end_time": end_z})
url = base.rstrip("/") + SUMMARY_PATH + "?" + params
sys.stderr.write(
"[episode_day_stats] fetch done total=%d rows=%d\n" % (total, len(out))
"[episode_day_stats] GET summary %s window (UTC): %s .. %s\n"
% (label, start_z, end_z)
)
return out
data = http_get_json(url, headers)
if data.get("error"):
sys.stderr.write("API error: %s\n" % data.get("error"))
sys.exit(1)

count = int((data.get("count") or {}).get("total") or 0)
total_bytes = int((data.get("size") or {}).get("total_bytes") or 0)
total_ms = float((data.get("duration") or {}).get("total_ms") or 0)
return count, total_bytes, total_ms / 1000.0

def sum_episodes(rows: List[dict]) -> tuple[int, int, float]:
b = 0
d = 0.0
for i in rows:
fs = i.get("file_size_bytes")
b += int(fs) if fs is not None else 0
dv = i.get("duration_sec")
d += float(dv) if dv is not None else 0.0
return len(rows), b, d


def main() -> None:
base = os.environ.get("KEYSTONE_BASE", "http://127.0.0.1:9999").strip()
token = os.environ.get("TOKEN", "").strip()
total_start_z = os.environ.get("STATS_TOTAL_START", "1970-01-01T00:00:00Z").strip()
raw_tz = (os.environ.get("STATS_TZ") or "Asia/Shanghai").strip()
tz_name = raw_tz or "Asia/Shanghai"
try:
Expand All @@ -177,48 +202,25 @@ def main() -> None:
sys.stderr.write("invalid STATS_TZ %r: %s\n" % (tz_name, e))
sys.exit(1)

token = resolve_token(base, token)

utc = ZoneInfo("UTC")
now_local = datetime.now(tz)
day = now_local.date()
start_local = datetime(day.year, day.month, day.day, 0, 0, 0, 0, tzinfo=tz)
from_z = rfc3339_z(start_local.astimezone(utc))
to_z = rfc3339_z(now_local.astimezone(utc))
from_dt = start_local.astimezone(timezone.utc)
to_dt = now_local.astimezone(timezone.utc)

sys.stderr.write(
"[episode_day_stats] partial window (UTC): %s .. %s\n" % (from_z, to_z)
)

all_eps = fetch_all_episodes(base, token)

partial_eps: List[dict] = []
for i in all_eps:
ct = parse_created_at_utc(i.get("created_at"))
if ct is None:
continue
if from_dt <= ct <= to_dt:
partial_eps.append(i)

yesterday = day - timedelta(days=1)
start_y = datetime(
yesterday.year, yesterday.month, yesterday.day, 0, 0, 0, 0, tzinfo=tz
)
end_y = start_y + timedelta(days=1) - timedelta(microseconds=1)
from_y = start_y.astimezone(timezone.utc)
to_y = end_y.astimezone(timezone.utc)

yesterday_eps: List[dict] = []
for i in all_eps:
ct = parse_created_at_utc(i.get("created_at"))
if ct is None:
continue
if from_y <= ct <= to_y:
yesterday_eps.append(i)

p_count, p_bytes, p_dur = sum_episodes(partial_eps)
y_count, y_bytes, y_dur = sum_episodes(yesterday_eps)
t_count, t_bytes, t_dur = sum_episodes(all_eps)
from_y_z = rfc3339_z(start_y.astimezone(utc))
to_y_z = from_z

p_count, p_bytes, p_dur = fetch_summary(base, token, "today", from_z, to_z)
y_count, y_bytes, y_dur = fetch_summary(base, token, "yesterday", from_y_z, to_y_z)
t_count, t_bytes, t_dur = fetch_summary(base, token, "total", total_start_z, to_z)

print(">>>")
print("今日数据量: %d条" % p_count)
Expand Down
Loading