diff --git a/query-session.py b/query-session.py index 2f4872af..f548391b 100755 --- a/query-session.py +++ b/query-session.py @@ -1022,6 +1022,227 @@ def list_session_labels() -> None: db.close() +def cmd_digest(prefix: str, as_json: bool = False) -> int: + """Show a digest of a session matched by ID prefix. + + Outputs four sections: header, knowledge added, files touched, checkpoints. + Returns 0 on success, 1 if no session matches. + """ + import json as _json_dig + + if not DB_PATH.exists(): + print(f"No session found matching '{prefix}'") + return 1 + + db = get_db() + row = db.execute("SELECT * FROM sessions WHERE id LIKE ?||'%'", (prefix,)).fetchone() + if not row: + print(f"No session found matching '{prefix}'") + db.close() + return 1 + + session_id = row["id"] + + # Detect optional columns added by later migrations. + session_cols = {r[1] for r in db.execute("PRAGMA table_info(sessions)").fetchall()} + label = row["label"] if "label" in session_cols else "" + cost = row["cost_usd_est"] if "cost_usd_est" in session_cols else None + created_at = (row["indexed_at"] or "")[:19] + + # Knowledge entries + ke_rows = db.execute( + "SELECT title, category FROM knowledge_entries WHERE session_id = ? ORDER BY id", + (session_id,), + ).fetchall() + + # Files touched (table may not exist yet) + sf_rows = [] + try: + sf_rows = db.execute( + "SELECT file_path, tool_name FROM session_files WHERE session_id = ?", + (session_id,), + ).fetchall() + except sqlite3.OperationalError: + pass + + # Checkpoints (table may not exist; fall back to documents) + cp_rows = [] + try: + cp_rows = db.execute( + "SELECT checkpoint_number, title FROM checkpoints WHERE session_id = ? ORDER BY checkpoint_number", + (session_id,), + ).fetchall() + except sqlite3.OperationalError: + try: + cp_rows = db.execute( + "SELECT seq AS checkpoint_number, title FROM documents" + " WHERE session_id = ? AND doc_type = 'checkpoint' ORDER BY seq", + (session_id,), + ).fetchall() + except sqlite3.OperationalError: + pass + + db.close() + + if as_json: + result = { + "id": session_id, + "summary": (row["summary"] or "").strip(), + "label": label, + "created_at": created_at, + "cost_usd_est": cost, + "knowledge": [{"title": r["title"], "category": r["category"]} for r in ke_rows], + "files": [{"file_path": r["file_path"], "tool_name": r["tool_name"]} for r in sf_rows], + "checkpoints": [{"checkpoint_number": r["checkpoint_number"], "title": r["title"]} for r in cp_rows], + } + print(_json_dig.dumps(result, indent=2)) + return 0 + + print(f"\n{BOLD}Session: {session_id}{RESET}") + print(f"Summary: {(row['summary'] or '').strip()[:200]}") + if label: + print(f"Label: {label}") + print(f"Created: {created_at}") + if cost is not None: + print(f"Cost: ${cost:.4f}") + + print(f"\n{BOLD}Knowledge added ({len(ke_rows)}){RESET}") + if ke_rows: + for r in ke_rows: + print(f" [{r['category']:12s}] {r['title']}") + else: + print(" (none)") + + print(f"\n{BOLD}Files touched ({len(sf_rows)}){RESET}") + if sf_rows: + for r in sf_rows: + print(f" {(r['tool_name'] or ''):10s} {r['file_path']}") + else: + print(" (none)") + + print(f"\n{BOLD}Checkpoints ({len(cp_rows)}){RESET}") + if cp_rows: + for r in cp_rows: + print(f" #{r['checkpoint_number']:02d} {r['title']}") + else: + print(" (none)") + + return 0 + + +def cmd_stats(args: list) -> int: + """Show session stats grouped by a chosen dimension. + + Args supported: --by label|branch|day|week --since DAYS --limit N --json + Returns 0 always (empty result is not an error). + """ + import datetime as _dt_stats + import json as _json_stats + + by = "day" + since_days = 30 + limit = 20 + as_json = "--json" in args + + i = 0 + while i < len(args): + if args[i] == "--by" and i + 1 < len(args): + by = args[i + 1] + i += 2 + elif args[i] == "--since" and i + 1 < len(args): + try: + since_days = int(args[i + 1]) + except ValueError: + pass + i += 2 + elif args[i] == "--limit" and i + 1 < len(args): + try: + limit = int(args[i + 1]) + except ValueError: + pass + i += 2 + else: + i += 1 + + valid_by = ("label", "branch", "day", "week") + if by not in valid_by: + print(f"Error: --by must be one of {', '.join(valid_by)}", file=sys.stderr) + return 1 + + since_date = (_dt_stats.datetime.now(_dt_stats.timezone.utc) - _dt_stats.timedelta(days=since_days)).strftime( + "%Y-%m-%d" + ) + + if not DB_PATH.exists(): + if as_json: + print("[]") + else: + print("No sessions found (knowledge database not yet initialized).") + return 0 + + db = get_db() + session_cols = {r[1] for r in db.execute("PRAGMA table_info(sessions)").fetchall()} + has_cost = "cost_usd_est" in session_cols + has_label = "label" in session_cols + has_branch = "branch" in session_cols + + if by == "label": + dim_expr = "COALESCE(s.label, '')" if has_label else "''" + elif by == "branch": + dim_expr = "COALESCE(s.branch, '')" if has_branch else "''" + elif by == "week": + dim_expr = "strftime('%Y-W%W', s.indexed_at)" + else: # day + dim_expr = "substr(s.indexed_at, 1, 10)" + + cost_expr = "COALESCE(SUM(s.cost_usd_est), 0.0)" if has_cost else "0.0" + + sql = ( + "SELECT " + dim_expr + " AS dimension," + " COUNT(DISTINCT s.id) AS sessions," + " COUNT(ke.id) AS entries," + " " + cost_expr + " AS total_cost," + " MAX(s.indexed_at) AS last_active" + " FROM sessions s" + " LEFT JOIN knowledge_entries ke ON ke.session_id = s.id" + " WHERE s.indexed_at >= ?" + " GROUP BY " + dim_expr + " ORDER BY last_active DESC" + " LIMIT ?" + ) + + rows = db.execute(sql, (since_date, limit)).fetchall() + db.close() + + if as_json: + result = [ + { + "dimension": r["dimension"], + "sessions": r["sessions"], + "entries": r["entries"], + "total_cost": r["total_cost"], + "last_active": r["last_active"], + } + for r in rows + ] + print(_json_stats.dumps(result, indent=2)) + return 0 + + if not rows: + print(f"No sessions found in the last {since_days} days.") + return 0 + + print(f"\n{BOLD}Session Stats — by {by} (last {since_days} days){RESET}\n") + print(f"{'Dimension':25s} {'Sessions':>8s} {'Entries':>8s} {'Cost':>9s} Last Active") + print(f"{'-' * 25} {'-' * 8} {'-' * 8} {'-' * 9} {'-' * 19}") + for r in rows: + dim = (r["dimension"] or "(none)")[:25] + cost_str = f"${r['total_cost']:.4f}" if r["total_cost"] else "$0.0000" + last = (r["last_active"] or "")[:19] + print(f"{dim:25s} {r['sessions']:>8d} {r['entries']:>8d} {cost_str:>9s} {last}") + print(f"\n{DIM}Total: {len(rows)} row(s){RESET}") + return 0 + + def show_recent(limit: int = 10): """Show recently indexed documents.""" db = get_db() @@ -2782,6 +3003,18 @@ def _run(args: list, compact: bool = False): get_session_label(prefix) return + if args and args[0] == "digest": + rest_digest = [a for a in args[1:] if not a.startswith("--")] + if not rest_digest: + print("Usage: query-session.py digest [--json]") + return + sys.exit(cmd_digest(rest_digest[0], as_json="--json" in args)) + return + + if args and args[0] == "stats": + sys.exit(cmd_stats(args[1:])) + return + if "--recent" in args: limit = 10 if "--limit" in args: diff --git a/sk.py b/sk.py index 233ce5a1..1e488fa6 100644 --- a/sk.py +++ b/sk.py @@ -231,6 +231,12 @@ "retry": { "stats": "retry-stats.py", }, + "session": { + "label": "query-session.py", + "labels": "query-session.py", + "digest": "query-session.py", + "stats": "query-session.py", + }, "knowledge": { "freshness": "knowledge-health.py", "health": "knowledge-health.py", @@ -991,6 +997,8 @@ def main(argv: list[str] | None = None) -> int: return 2 if cmd == "context" and sub in ("upsert", "remove"): return _run(_GROUPS[cmd][sub], [sub] + sub_rest) + if cmd == "session" and sub in ("label", "labels", "digest", "stats"): + return _run(_GROUPS[cmd][sub], [sub] + sub_rest) if cmd == "knowledge" and sub == "freshness": return _run(_GROUPS[cmd][sub], ["--freshness"] + sub_rest) if cmd == "knowledge" and sub == "evict": diff --git a/test_fixes.py b/test_fixes.py index 83a729c2..30ce9677 100755 --- a/test_fixes.py +++ b/test_fixes.py @@ -9377,6 +9377,402 @@ def _make_i720_db(db_path: Path) -> None: except Exception as _e718_br: test("I718-17: briefing.py badge source check", False, str(_e718_br)) +# === I724: Session Digest+Stats === +print("\nšŸ” I724: Session Digest+Stats") + +# I724-1: cmd_digest and cmd_stats exist in query-session.py +try: + import importlib.util as _ilu724 + + _spec724 = _ilu724.spec_from_file_location("qs724", REPO / "query-session.py") + _qs724 = _ilu724.module_from_spec(_spec724) + _spec724.loader.exec_module(_qs724) + test("I724-1a: cmd_digest exists", hasattr(_qs724, "cmd_digest")) + test("I724-1b: cmd_stats exists", hasattr(_qs724, "cmd_stats")) +except Exception as _e724_1: + test("I724-1: cmd_digest/cmd_stats exist", False, str(_e724_1)) + +# I724-2: cmd_digest uses parameterized SQL +try: + _qs_src724 = (REPO / "query-session.py").read_text(encoding="utf-8") + _digest_body = _qs_src724.split("def cmd_digest")[1].split("\ndef cmd_stats")[0] + test("I724-2a: cmd_digest uses ? placeholder", "?" in _digest_body) + test("I724-2b: cmd_digest no f-string SQL", 'f"SELECT' not in _digest_body and "f'SELECT" not in _digest_body) + test( + "I724-2c: cmd_digest handles missing session_files gracefully", + "OperationalError" in _digest_body or "try" in _digest_body, + ) +except Exception as _e724_2: + test("I724-2: cmd_digest source checks", False, str(_e724_2)) + +# I724-3: cmd_stats uses parameterized SQL and has required args +try: + _stats_body = _qs_src724.split("def cmd_stats")[1].split("\ndef show_recent")[0] + test("I724-3a: cmd_stats uses ? placeholder", "?" in _stats_body) + test( + "I724-3b: cmd_stats no f-string SQL with user input", + 'f"SELECT' not in _stats_body and "f'SELECT" not in _stats_body, + ) + test("I724-3c: cmd_stats handles --by arg", '"--by"' in _stats_body or "'--by'" in _stats_body) + test("I724-3d: cmd_stats handles --since arg", '"--since"' in _stats_body or "'--since'" in _stats_body) + test("I724-3e: cmd_stats handles --limit arg", '"--limit"' in _stats_body or "'--limit'" in _stats_body) +except Exception as _e724_3: + test("I724-3: cmd_stats source checks", False, str(_e724_3)) + +# I724-4: dispatch in _run +try: + _run_body = _qs_src724.split("def _run(")[1].split("\ndef main(")[0] + test("I724-4a: _run dispatches digest subcommand", '"digest"' in _run_body or "'digest'" in _run_body) + test("I724-4b: _run dispatches stats subcommand", '"stats"' in _run_body or "'stats'" in _run_body) +except Exception as _e724_4: + test("I724-4: _run dispatch checks", False, str(_e724_4)) + +# I724-5: sk.py session namespace +try: + _sk_src724 = (REPO / "sk.py").read_text(encoding="utf-8") + test("I724-5a: sk.py has session group", '"session"' in _sk_src724 or "'session'" in _sk_src724) + test("I724-5b: sk.py session has digest entry", '"digest"' in _sk_src724) + test("I724-5c: sk.py session has stats entry", '"stats"' in _sk_src724) + test( + "I724-5d: sk.py session routing passes subcommand name", + "[sub] + sub_rest" in _sk_src724 or "sub] + sub_rest" in _sk_src724, + ) +except Exception as _e724_5: + test("I724-5: sk.py session namespace checks", False, str(_e724_5)) + +# I724-6: digest no-match exits 1 +try: + import subprocess as _sp724a + + _r724_nm = _sp724a.run( + ["python3", str(REPO / "query-session.py"), "digest", "zzznotexist999abc"], + capture_output=True, + text=True, + timeout=15, + ) + test("I724-6a: digest no-match exits with code 1", _r724_nm.returncode == 1, f"rc={_r724_nm.returncode}") + test( + "I724-6b: digest no-match prints helpful message", + "No session found matching" in _r724_nm.stdout, + f"stdout={_r724_nm.stdout[:100]}", + ) +except Exception as _e724_6: + test("I724-6: digest no-match behavior", False, str(_e724_6)) + +# I724-7: digest valid prefix returns all 4 sections +try: + import pathlib as _pl724b + import subprocess as _sp724b + + _db724 = _pl724b.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724.exists(): + import sqlite3 as _sq724b + + _c724b = _sq724b.connect(str(_db724)) + _sid724 = _c724b.execute("SELECT id FROM sessions ORDER BY indexed_at DESC LIMIT 1").fetchone() + _c724b.close() + if _sid724: + _prefix724 = _sid724[0][:8] + _r724_ok = _sp724b.run( + ["python3", str(REPO / "query-session.py"), "digest", _prefix724], + capture_output=True, + text=True, + timeout=15, + ) + test( + "I724-7a: digest valid prefix exits 0", + _r724_ok.returncode == 0, + f"rc={_r724_ok.returncode} stderr={_r724_ok.stderr[:200]}", + ) + test( + "I724-7b: digest output has Session header", + "Session:" in _r724_ok.stdout, + f"stdout={_r724_ok.stdout[:200]}", + ) + test( + "I724-7c: digest output has Knowledge section", + "Knowledge added" in _r724_ok.stdout, + f"stdout={_r724_ok.stdout[:200]}", + ) + test( + "I724-7d: digest output has Files touched section", + "Files touched" in _r724_ok.stdout, + f"stdout={_r724_ok.stdout[:200]}", + ) + test( + "I724-7e: digest output has Checkpoints section", + "Checkpoints" in _r724_ok.stdout, + f"stdout={_r724_ok.stdout[:200]}", + ) + else: + for _lbl724 in ("I724-7a", "I724-7b", "I724-7c", "I724-7d", "I724-7e"): + test(f"{_lbl724}: no sessions (skip)", True, "") + else: + for _lbl724 in ("I724-7a", "I724-7b", "I724-7c", "I724-7d", "I724-7e"): + test(f"{_lbl724}: no knowledge.db (skip)", True, "") +except Exception as _e724_7: + test("I724-7: digest valid prefix", False, str(_e724_7)) + +# I724-8: digest --json is a valid dict with required keys +try: + import json as _json724c + import pathlib as _pl724c + import sqlite3 as _sq724c + import subprocess as _sp724c + + _db724c = _pl724c.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724c.exists(): + _c724c = _sq724c.connect(str(_db724c)) + _sid724c = _c724c.execute("SELECT id FROM sessions ORDER BY indexed_at DESC LIMIT 1").fetchone() + _c724c.close() + if _sid724c: + _prefix724c = _sid724c[0][:8] + _r724_json = _sp724c.run( + ["python3", str(REPO / "query-session.py"), "digest", _prefix724c, "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test("I724-8a: digest --json exits 0", _r724_json.returncode == 0, f"rc={_r724_json.returncode}") + try: + _parsed724 = _json724c.loads(_r724_json.stdout) + test("I724-8b: digest --json is a dict", isinstance(_parsed724, dict)) + test("I724-8c: digest --json has id key", "id" in _parsed724) + test("I724-8d: digest --json has knowledge key", "knowledge" in _parsed724) + test("I724-8e: digest --json has files key", "files" in _parsed724) + test("I724-8f: digest --json has checkpoints key", "checkpoints" in _parsed724) + except Exception as _ep724: + test("I724-8b: digest --json parse failed", False, str(_ep724)) + for _lbl724j in ("I724-8c", "I724-8d", "I724-8e", "I724-8f"): + test(f"{_lbl724j}: placeholder", True, "") + else: + for _lbl724j2 in ("I724-8a", "I724-8b", "I724-8c", "I724-8d", "I724-8e", "I724-8f"): + test(f"{_lbl724j2}: no sessions (skip)", True, "") + else: + for _lbl724j3 in ("I724-8a", "I724-8b", "I724-8c", "I724-8d", "I724-8e", "I724-8f"): + test(f"{_lbl724j3}: no knowledge.db (skip)", True, "") +except Exception as _e724_8: + test("I724-8: digest --json", False, str(_e724_8)) + +# I724-9: stats --by day groups correctly +try: + import json as _json724d + import subprocess as _sp724d + + _r724_day = _sp724d.run( + ["python3", str(REPO / "query-session.py"), "stats", "--by", "day", "--since", "365", "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test( + "I724-9a: stats --by day exits 0", + _r724_day.returncode == 0, + f"rc={_r724_day.returncode} stderr={_r724_day.stderr[:200]}", + ) + try: + _rows724d = _json724d.loads(_r724_day.stdout) + test("I724-9b: stats --by day returns list", isinstance(_rows724d, list)) + if _rows724d: + test( + "I724-9c: stats --by day dimension looks like a date", + len(_rows724d[0].get("dimension", "")) >= 8 and "-" in _rows724d[0].get("dimension", "x"), + f"dimension={_rows724d[0].get('dimension')}", + ) + test("I724-9d: stats row has sessions key", "sessions" in _rows724d[0]) + test("I724-9e: stats row has entries key", "entries" in _rows724d[0]) + else: + test("I724-9c: stats empty (skip)", True, "") + test("I724-9d: placeholder", True, "") + test("I724-9e: placeholder", True, "") + except Exception as _ep724d: + test("I724-9b: stats --by day JSON parse", False, str(_ep724d)) + test("I724-9c: placeholder", True, "") + test("I724-9d: placeholder", True, "") + test("I724-9e: placeholder", True, "") +except Exception as _e724_9: + test("I724-9: stats --by day", False, str(_e724_9)) + +# I724-10: stats --by label groups by label dimension +try: + import json as _json724e + import pathlib as _pl724e + import subprocess as _sp724e + + _db724e = _pl724e.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724e.exists(): + _r724_lbl = _sp724e.run( + ["python3", str(REPO / "query-session.py"), "stats", "--by", "label", "--since", "365", "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test( + "I724-10a: stats --by label exits 0", + _r724_lbl.returncode == 0, + f"rc={_r724_lbl.returncode} stderr={_r724_lbl.stderr[:200]}", + ) + try: + _rows724e = _json724e.loads(_r724_lbl.stdout) + test("I724-10b: stats --by label returns list", isinstance(_rows724e, list)) + except Exception as _ep724e: + test("I724-10b: stats --by label JSON parse", False, str(_ep724e)) + else: + test("I724-10a: no knowledge.db (skip)", True, "") + test("I724-10b: placeholder", True, "") +except Exception as _e724_10: + test("I724-10: stats --by label", False, str(_e724_10)) + +# I724-11: stats --since 7 limits to 7-day window +try: + import json as _json724f + import pathlib as _pl724f + import subprocess as _sp724f + + _db724f = _pl724f.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724f.exists(): + _r724_7 = _sp724f.run( + ["python3", str(REPO / "query-session.py"), "stats", "--by", "day", "--since", "7", "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test("I724-11a: stats --since 7 exits 0", _r724_7.returncode == 0, f"rc={_r724_7.returncode}") + try: + _rows724f = _json724f.loads(_r724_7.stdout) + test( + "I724-11b: stats --since 7 returns at most 7 day buckets", + isinstance(_rows724f, list) and len(_rows724f) <= 7, + f"got {len(_rows724f)} rows", + ) + except Exception as _ep724f: + test("I724-11b: stats --since 7 JSON parse", False, str(_ep724f)) + else: + test("I724-11a: no knowledge.db (skip)", True, "") + test("I724-11b: placeholder", True, "") +except Exception as _e724_11: + test("I724-11: stats --since 7", False, str(_e724_11)) + +# I724-12: stats --by week groups by week +try: + import json as _json724g + import pathlib as _pl724g + import subprocess as _sp724g + + _db724g = _pl724g.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724g.exists(): + _r724_wk = _sp724g.run( + ["python3", str(REPO / "query-session.py"), "stats", "--by", "week", "--since", "365", "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test("I724-12a: stats --by week exits 0", _r724_wk.returncode == 0, f"rc={_r724_wk.returncode}") + try: + _rows724g = _json724g.loads(_r724_wk.stdout) + test("I724-12b: stats --by week returns list", isinstance(_rows724g, list)) + if _rows724g: + _dim724g = _rows724g[0].get("dimension", "") + test( + "I724-12c: week dimension looks like week string", + "W" in _dim724g or "-" in _dim724g, + f"dimension={_dim724g}", + ) + else: + test("I724-12c: week empty result (skip)", True, "") + except Exception as _ep724g: + test("I724-12b: stats --by week JSON parse", False, str(_ep724g)) + test("I724-12c: placeholder", True, "") + else: + test("I724-12a: no knowledge.db (skip)", True, "") + test("I724-12b: placeholder", True, "") + test("I724-12c: placeholder", True, "") +except Exception as _e724_12: + test("I724-12: stats --by week", False, str(_e724_12)) + +# I724-13: stats invalid --by exits non-zero +try: + import subprocess as _sp724h + + _r724_inv = _sp724h.run( + ["python3", str(REPO / "query-session.py"), "stats", "--by", "invalidoption"], + capture_output=True, + text=True, + timeout=15, + ) + test("I724-13a: stats invalid --by exits non-zero", _r724_inv.returncode != 0, f"rc={_r724_inv.returncode}") +except Exception as _e724_13: + test("I724-13: stats invalid --by", False, str(_e724_13)) + +# I724-14: _GROUPS session entries in sk.py +try: + _sk_src724b = (REPO / "sk.py").read_text(encoding="utf-8") + _groups_block = _sk_src724b.split("_GROUPS:")[1].split("def _resolve_tools_dir")[0] + test("I724-14a: _GROUPS has session.digest", '"digest"' in _groups_block) + test("I724-14b: _GROUPS has session.stats", '"stats"' in _groups_block) + test("I724-14c: _GROUPS has session.label", '"label"' in _groups_block) + test("I724-14d: _GROUPS has session.labels", '"labels"' in _groups_block) +except Exception as _e724_14: + test("I724-14: _GROUPS session entries", False, str(_e724_14)) + +# I724-15: sk session digest routing (no match) +try: + import pathlib as _pl724i + import subprocess as _sp724i + + _db724i = _pl724i.Path.home() / ".copilot" / "session-state" / "knowledge.db" + _r724_skdig = _sp724i.run( + ["python3", str(REPO / "sk.py"), "session", "digest", "zzznotexist999"], + capture_output=True, + text=True, + timeout=15, + ) + test( + "I724-15a: sk session digest no-match exits non-zero", + _r724_skdig.returncode != 0, + f"rc={_r724_skdig.returncode}", + ) + if _db724i.exists(): + test( + "I724-15b: sk session digest no-match prints message", + "No session found matching" in _r724_skdig.stdout, + f"stdout={_r724_skdig.stdout[:100]}", + ) + else: + test("I724-15b: no knowledge.db (skip)", True, "") +except Exception as _e724_15: + test("I724-15: sk session digest routing", False, str(_e724_15)) + +# I724-16: sk session stats routing +try: + import json as _json724k + import pathlib as _pl724k + import subprocess as _sp724k + + _db724k = _pl724k.Path.home() / ".copilot" / "session-state" / "knowledge.db" + if _db724k.exists(): + _r724_skst = _sp724k.run( + ["python3", str(REPO / "sk.py"), "session", "stats", "--by", "day", "--since", "7", "--json"], + capture_output=True, + text=True, + timeout=15, + ) + test( + "I724-16a: sk session stats exits 0", + _r724_skst.returncode == 0, + f"rc={_r724_skst.returncode} stderr={_r724_skst.stderr[:200]}", + ) + try: + _rows724k = _json724k.loads(_r724_skst.stdout) + test("I724-16b: sk session stats returns list", isinstance(_rows724k, list)) + except Exception as _ep724k: + test("I724-16b: sk session stats JSON parse", False, str(_ep724k)) + else: + test("I724-16a: no knowledge.db (skip)", True, "") + test("I724-16b: placeholder", True, "") +except Exception as _e724_16: + test("I724-16: sk session stats routing", False, str(_e724_16)) + # --------------------------------------------------------------------------- if FAIL == 0: print("šŸŽ‰ All tests passed!")