-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery-session.py
More file actions
executable file
·3828 lines (3379 loc) · 139 KB
/
query-session.py
File metadata and controls
executable file
·3828 lines (3379 loc) · 139 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
query-session.py — Search the Copilot/Claude session knowledge base
Usage:
python query-session.py "search terms" # Full-text search
python query-session.py "search terms" --semantic # Hybrid: FTS5 + vector
python query-session.py "search terms" --type checkpoint # Filter by doc type
python query-session.py "search terms" --source claude # Filter by source
python query-session.py --list # List all sessions
python query-session.py --list --source copilot # List only Copilot sessions
python query-session.py --session <uuid> # Show session details
python query-session.py --recent # Show recent activity
python query-session.py "search" --limit 5 # Limit results
python query-session.py "search" --verbose # Show full content
python query-session.py --mistakes # Show past mistakes
python query-session.py --patterns # Show learned patterns
python query-session.py --decisions # Show tech decisions
python query-session.py --detail <id> # Full detail of entry
python query-session.py --context <id> # Entry + related context
python query-session.py --history <id> # Show version history timeline for an entry
python query-session.py --related <id> # Show knowledge graph relations
python query-session.py --graph "spring boot" # Mini knowledge graph for topic
python query-session.py --relate "entity" # Query entity relations (new graph)
python query-session.py --wings # List wings with counts
python query-session.py --rooms # List rooms with counts
python query-session.py --rooms backend # Rooms in a specific wing
python query-session.py --graph-stats # Knowledge graph statistics
python query-session.py "search" --export json # Export as JSON
python query-session.py "search" --export markdown # Export as Markdown
python query-session.py --file src/auth.py # Entries touching a file
python query-session.py --module auth # Entries for a module/directory
python query-session.py --diff # Entries for current git diff files
python query-session.py --task memory-surface # Entries for a specific task ID
python query-session.py "search" --budget 2000 # Cap output to 2000 chars
python query-session.py --file src/auth.py --compact # Titles-only with ~token hint
python query-session.py --task my-task --compact # Compact task recall
python query-session.py "search" --since 2025-01-01 # Only entries seen on/after date
python query-session.py "search" --days 30 # Only entries seen in last 30 days
python query-session.py --why 42 # Explain why entry #42 was scored
python query-session.py --why 42 --json # --why output as JSON
python query-session.py --feedback 42 good # Mark entry #42 as good (+1)
python query-session.py --feedback 42 bad # Mark entry #42 as bad (-1)
python query-session.py --feedback 42 neutral # Mark entry #42 as neutral (0)
python query-session.py "search" --explain # Show score breakdown per result
Doc types: checkpoint, research, artifact, plan, claude-session
Knowledge categories: mistake, pattern, decision, tool
Sources: copilot, claude, all (default: all)
Semantic search requires: python embed.py --setup && python embed.py --build
"""
import io
import json
import math
import os
import sqlite3
import sys
import textwrap
import time
from contextlib import redirect_stdout
from pathlib import Path
# Fix Windows console encoding for Unicode output
if os.name == "nt":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
SESSION_STATE = Path.home() / ".copilot" / "session-state"
DB_PATH = Path(os.environ.get("SK_DB_PATH", str(SESSION_STATE / "knowledge.db"))).expanduser()
def _safe_int_list(values) -> list[int]:
out = []
for value in values:
try:
iv = int(value)
except (TypeError, ValueError):
continue
out.append(iv)
return out
def _estimate_tokens(output_chars: int) -> int:
return int(math.ceil(output_chars / 4)) if output_chars > 0 else 0
def _record_recall_event(
event_kind: str,
surface: str,
raw_query: str,
rewritten_query: str,
task_id: str,
selected_entry_ids: list[int],
hit_count: int,
output_chars: int,
opened_entry_id: int | None = None,
) -> None:
payload = (
time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()),
event_kind,
"query-session",
surface,
"",
(raw_query or "")[:500],
(rewritten_query or "")[:500],
(task_id or "")[:200],
"[]",
json.dumps(_safe_int_list(selected_entry_ids), ensure_ascii=False),
"[]",
opened_entry_id,
max(0, int(hit_count or 0)),
max(0, int(output_chars or 0)),
_estimate_tokens(max(0, int(output_chars or 0))),
)
db = None
try:
db = sqlite3.connect(str(DB_PATH))
db.execute(
"""
INSERT INTO recall_events (
created_at, event_kind, tool, surface, mode,
raw_query, rewritten_query, task_id, files,
selected_entry_ids, selected_snippet_ids, opened_entry_id,
hit_count, output_chars, output_est_tokens
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
payload,
)
db.commit()
except sqlite3.OperationalError:
pass
finally:
if db is not None:
db.close()
def _run_with_capture(func, *args, **kwargs):
buf = io.StringIO()
with redirect_stdout(buf):
result = func(*args, **kwargs)
output = buf.getvalue()
sys.stdout.write(output)
return output, result
def _coerce_recall_meta(meta, default: dict) -> dict:
if not isinstance(meta, dict):
return dict(default)
merged = dict(default)
merged.update(meta)
return merged
# ANSI colors — auto-detect terminal support
def _supports_color() -> bool:
"""Check if terminal supports ANSI colors (cross-platform)."""
import os
import sys
if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty():
return False
if os.name == "nt":
# Windows 10+ supports ANSI via VT mode
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Enable VIRTUAL_TERMINAL_PROCESSING
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
return True
except Exception:
return os.environ.get("WT_SESSION") is not None # Windows Terminal
return True
_COLOR = _supports_color()
BOLD = "\033[1m" if _COLOR else ""
DIM = "\033[2m" if _COLOR else ""
CYAN = "\033[36m" if _COLOR else ""
GREEN = "\033[32m" if _COLOR else ""
YELLOW = "\033[33m" if _COLOR else ""
MAGENTA = "\033[35m" if _COLOR else ""
RESET = "\033[0m" if _COLOR else ""
import re as _re_module
# ── Status-note suppression (issue #377) ──────────────────────────────────────
# Mirrors the same regex in briefing.py so both modules apply universal suppression.
_STATUS_NOTE_RE = _re_module.compile(
r"""
^(?:
Wave[-\s]?\d+\b # "Wave14 …" or "Wave-14 …"
.*?\b(?:verification\s+is\s+complete|phase[-\s\d]+\s+verification\s+is\s+complete)
| # OR
Wave[-\s]?\d+\b # "Wave11 planner recommendation (not yet implemented)"
.*?\(not\s+yet\s+implemented\)
| # OR
wave\d+[-\w]+\s+completed # "wave6-pretooluse-deny completed …"
| # OR
\[rust-wave # "[rust-wave7-hook-parity] …"
)
""",
_re_module.VERBOSE | _re_module.IGNORECASE,
)
# ── Synonym expansion (issue #371) ────────────────────────────────────────────
_SYNONYM_MAP: dict[str, list[str]] = {
"auth": ["auth", "authentication", "login", "token"],
"authentication": ["authentication", "auth", "login", "token"],
"error": ["error", "bug", "exception", "failure"],
"bug": ["bug", "error", "issue", "defect"],
"config": ["config", "configuration", "settings", "env"],
"configuration": ["configuration", "config", "settings", "env"],
"db": ["db", "database", "sqlite", "sql"],
"database": ["database", "db", "sqlite", "sql"],
"deploy": ["deploy", "deployment", "release", "publish"],
"deployment": ["deployment", "deploy", "release", "publish"],
"test": ["test", "tests", "testing", "spec"],
"testing": ["testing", "test", "tests", "spec"],
"perf": ["perf", "performance", "speed", "latency", "slow"],
"performance": ["performance", "perf", "speed", "latency", "slow"],
"cache": ["cache", "caching", "redis", "ttl", "invalidation"],
"caching": ["caching", "cache", "redis", "ttl"],
"migration": ["migration", "migrate", "schema", "upgrade"],
"migrate": ["migrate", "migration", "schema", "upgrade"],
"security": ["security", "vulnerability", "injection", "credential"],
"search": ["search", "query", "retrieval", "fts", "fulltext"],
"retrieval": ["retrieval", "search", "query", "recall", "fts"],
"index": ["index", "indexing", "fts", "fts5"],
"indexing": ["indexing", "index", "fts", "fts5"],
"embedding": ["embedding", "vector", "semantic", "similarity"],
"vector": ["vector", "embedding", "semantic", "similarity"],
"session": ["session", "sessions", "history", "conversation"],
"hook": ["hook", "hooks", "preToolUse", "postToolUse", "trigger"],
"hooks": ["hooks", "hook", "preToolUse", "postToolUse", "trigger"],
"api": ["api", "endpoint", "route", "rest", "http"],
"endpoint": ["endpoint", "api", "route", "rest", "http"],
"log": ["log", "logging", "logs", "output", "stderr"],
"logging": ["logging", "log", "logs", "output"],
"sync": ["sync", "synchronize", "push", "pull", "remote"],
"synchronize": ["synchronize", "sync", "push", "pull", "remote"],
"skill": ["skill", "skills", "plugin", "extension"],
"skills": ["skills", "skill", "plugin", "extension"],
"briefing": ["briefing", "context", "recall", "knowledge"],
"knowledge": ["knowledge", "briefing", "recall", "learning"],
"refactor": ["refactor", "refactoring", "rewrite", "cleanup"],
"refactoring": ["refactoring", "refactor", "rewrite", "cleanup"],
}
def _expand_synonyms(query: str) -> str:
"""Expand query terms using domain synonym map (issue #371).
Conservative expansion: only 1-6 token queries are expanded.
Returns a space-joined string of unique expanded terms.
Use _expand_synonyms_fts for FTS queries that need OR conjunction.
"""
tokens = query.strip().split()
if not tokens or len(tokens) > 6:
return query.strip()
expanded: list[str] = []
seen: set[str] = set()
for tok in tokens:
clean = tok.lower().strip(" \t\"'.,;!?")
synonyms = _SYNONYM_MAP.get(clean)
if synonyms:
for s in synonyms:
if s not in seen:
expanded.append(s)
seen.add(s)
else:
if clean not in seen:
expanded.append(clean)
seen.add(clean)
return " ".join(expanded) if expanded else query.strip()
def _expand_synonyms_fts(query: str) -> str:
"""Build an FTS5 OR query from synonym-expanded terms (issue #371).
Unlike _expand_synonyms which returns space-joined terms (AND semantics
in FTS5), this function builds an explicit OR-conjunction query so that
any of the expanded synonyms triggers a match. Safe to pass directly to
a ``WHERE ke_fts MATCH ?`` parameter.
Bypasses _sanitize_fts_query intentionally since it strips OR operators.
"""
expanded = _expand_synonyms(query)
# Strip reserved FTS5 operators that would break the MATCH expression
_STRIP_OPS = frozenset({"OR", "AND", "NOT", "NEAR"})
terms = [t for t in expanded.split() if t and t.upper() not in _STRIP_OPS]
# Remove FTS5 special chars per-term
_FTS_SPECIAL = set('"*(){}:^')
clean_terms = ["".join(c for c in t if c not in _FTS_SPECIAL) for t in terms]
clean_terms = [t for t in clean_terms if t]
if not clean_terms:
return '""'
# Return OR-joined prefix query: any synonym triggers a hit
return " OR ".join(f'"{t}"*' for t in clean_terms)
def get_db() -> sqlite3.Connection:
"""Connect to the knowledge database."""
if not DB_PATH.exists():
print(f"Error: Knowledge database not found at {DB_PATH}")
print("Run 'python build-session-index.py' first to build the index.")
sys.exit(1)
db = sqlite3.connect(str(DB_PATH))
db.execute("PRAGMA journal_mode=WAL")
db.execute("PRAGMA busy_timeout=5000")
db.row_factory = sqlite3.Row
return db
def _sanitize_fts_query(query: str, max_length: int = 500) -> str:
"""Sanitize user input for FTS5 MATCH queries."""
query = query.strip()[:max_length]
# Strip FTS5 special operators and syntax characters
fts_special = set('"*(){}:^')
cleaned = "".join(c if c not in fts_special else " " for c in query)
# Remove FTS5 boolean operators used as standalone words
terms = []
for t in cleaned.split():
if t.upper() not in ("OR", "AND", "NOT", "NEAR"):
terms.append(t)
if not terms:
return '""'
# Wrap each term in quotes for safe prefix matching
return " ".join(f'"{t}"*' for t in terms)
def _analyze_query_strictness(query: str) -> str:
"""Classify query retrieval strictness from lightweight signals.
Returns 'strict', 'medium', or 'broad'.
- 'strict': 1-2 terms, or has file/path separators or extensions, or
high average word length (domain-specific technical terms).
Callers use exact token matching and a tighter confidence threshold.
- 'broad': 6+ words with 2+ natural-language stopwords present.
Callers use OR-conjunction matching and a relaxed threshold.
- 'medium': Everything else — the default prefix-match behaviour.
No network or LLM calls. Pure Python stdlib.
"""
import re as _re
words = query.strip().split()
if not words:
return "medium"
wc = len(words)
strict_score = 0
broad_score = 0
if wc <= 2:
strict_score += 2
elif wc >= 6:
broad_score += 2
# Technical path/identifier signals (file extensions, separators, long numeric IDs)
_tech = _re.compile(r"\.[a-z]{1,5}(?:\b|$)|[/\\]|\d{4,}|_[a-z]")
if any(_tech.search(w) for w in words):
strict_score += 2
avg_len = sum(len(w) for w in words) / wc
if avg_len >= 7:
strict_score += 1
elif avg_len <= 3.5:
broad_score += 1
# Natural-language stopwords → query reads like a sentence → broad recall
_STOPWORDS = frozenset(
{
"the",
"for",
"and",
"with",
"that",
"this",
"when",
"how",
"what",
"why",
"should",
"use",
"using",
"from",
"into",
"over",
"not",
"does",
"have",
"are",
"was",
"we",
"our",
"they",
"them",
"it",
"its",
"by",
"as",
"at",
"an",
"a",
"is",
"in",
"on",
"to",
"be",
"or",
"do",
"so",
"if",
}
)
stopword_count = sum(1 for w in words if w.lower() in _STOPWORDS)
if stopword_count >= 2:
broad_score += 2
if strict_score > broad_score:
return "strict"
if broad_score > strict_score:
return "broad"
return "medium"
def _build_adaptive_fts_query(query: str) -> tuple:
"""Build an FTS5 query and confidence-threshold delta based on query strictness.
Returns:
(fts_query: str, strictness: str, confidence_delta: float)
Strictness effects:
'strict' — exact token match (no trailing ``*``); confidence += 0.2.
Callers should fall back to prefix match when 0 results returned.
'medium' — prefix match ``"term"*`` (current default); delta = 0.0.
'broad' — OR-conjunction prefix match for higher recall; confidence -= 0.2.
Common stopwords stripped from the OR terms to reduce noise.
The confidence_delta is intended to be added to the caller's min_confidence
(clamped to [0.0, 1.0]) so adaptive logic is non-breaking to existing contracts.
"""
strictness = _analyze_query_strictness(query)
base = _sanitize_fts_query(query)
if base == '""':
return base, strictness, 0.0
terms = base.split() # ["\"term\"*", ...]
if strictness == "strict":
# Strip trailing * to require exact token, not prefix
fts_query = " ".join(t.rstrip("*") for t in terms)
confidence_delta = 0.2
elif strictness == "broad" and len(terms) > 1:
# OR-conjunction: any term match is sufficient (recall over precision)
_BROAD_STOPWORDS = frozenset(
{
"the",
"for",
"and",
"with",
"that",
"this",
"when",
"how",
"what",
"why",
"should",
"use",
"using",
"from",
"into",
"over",
"not",
"does",
"have",
"are",
"was",
"we",
"our",
"they",
"them",
"it",
"its",
"by",
"as",
"at",
"an",
"a",
"is",
"in",
"on",
"to",
"be",
"or",
"do",
"so",
"if",
}
)
content_terms = [t for t in terms if t.strip('"*').lower() not in _BROAD_STOPWORDS]
fts_query = " OR ".join(content_terms if content_terms else terms)
confidence_delta = -0.2
else:
fts_query = base
confidence_delta = 0.0
return fts_query, strictness, confidence_delta
def _rewrite_query_local(query: str, max_terms: int = 15) -> str:
"""Conservative local query condensation while preserving technical tokens."""
import re as _re
if not query.strip():
return query
_SHORT_TECH = frozenset(
{
"go",
"db",
"ui",
"js",
"py",
"io",
"rx",
"vm",
"os",
"ci",
"cd",
"tf",
"qa",
}
)
_FILLER = frozenset(
{
"please",
"help",
"me",
"i",
"need",
"to",
"for",
"the",
"a",
"an",
"and",
"or",
"with",
"without",
"that",
"this",
"these",
"those",
"in",
"on",
"at",
"of",
"from",
"by",
"about",
"into",
"it",
"is",
"are",
"be",
"can",
"should",
"would",
"could",
"how",
"what",
"why",
"when",
"where",
"which",
"want",
}
)
raw_tokens = query.split()
condensed = []
seen = set()
def _is_technical_token(tok: str) -> bool:
if not tok:
return False
if any(c in tok for c in "/\\._-:#"):
return True
if any(c.isdigit() for c in tok):
return True
if _re.search(r"[a-z][A-Z]|[A-Z][a-z]", tok):
return True
if tok.isupper() and len(tok) > 1:
return True
return False
for tok in raw_tokens:
clean = tok.strip(" \t\r\n\"'`()[]{}<>.,;!?")
if not clean:
continue
clean_lower = clean.lower()
keep_short_tech = len(clean) == 2 and clean_lower in _SHORT_TECH
if not keep_short_tech and clean_lower in _FILLER:
continue
keep_exact = _is_technical_token(clean)
if not keep_exact and not keep_short_tech and len(clean) < 3:
continue
out_tok = clean if keep_exact else clean_lower
if out_tok not in seen:
condensed.append(out_tok)
seen.add(out_tok)
if len(condensed) >= max_terms:
break
return " ".join(condensed) if condensed else query.strip()
# ──────────────────────────────────────────────
# Batch C: sessions_fts search helpers
# ──────────────────────────────────────────────
# Column name → (fts5_col_name, snippet_col_index)
# Indices: 0=session_id(UNINDEXED), 1=title, 2=user_messages, 3=assistant_messages, 4=tool_names
# Empirically verified against real DB (see _fts5_empirical.py).
_SESSION_COL_MAP: dict = {
"user": ("user_messages", 2),
"assistant": ("assistant_messages", 3),
"tools": ("tool_names", 4),
"title": ("title", 1),
}
def _build_column_scoped_query(sanitized_term: str, columns: list) -> str:
"""Build a column-scoped FTS5 query using {col}: terms syntax.
Syntax empirically verified: {col1 col2}: "term"* works with quoted prefix terms.
sanitized_term: output of _sanitize_fts_query() — already quoted/prefixed.
columns: list of FTS5 column names to restrict search to.
Reuses the _sanitize_fts_query output (C security rule: sanitize before column wrap).
"""
if not columns:
return sanitized_term
col_filter = " ".join(columns)
return f"{{{col_filter}}}: {sanitized_term}"
def search_sessions_fts(
query: str,
*,
session_id_filter: str | None = None,
col_name: str | None = None,
snippet_col: int = 2,
show_snippet: bool = True,
limit: int = 10,
retrieval_query: str | None = None,
) -> list:
"""Search sessions_fts with BM25 ranking and optional column-scoped filter.
BM25 weights (positional, all 5 columns):
bm25(sessions_fts, 0, 2.0, 3.0, 1.0, 1.0)
→ session_id=0 (UNINDEXED, ignored), title=2.0, user_messages=3.0,
assistant_messages=1.0, tool_names=1.0
Weights verified empirically against real DB.
Returns list of dicts tagged origin='session'.
"""
db = get_db()
query_for_retrieval = retrieval_query if retrieval_query is not None else query
raw = _sanitize_fts_query(query_for_retrieval)
if col_name:
fts_query = _build_column_scoped_query(raw, [col_name])
else:
fts_query = raw
if show_snippet:
snippet_expr = f"snippet(sessions_fts, {snippet_col}, '<mark>', '</mark>', '\u2026', 12)"
else:
snippet_expr = "''"
sql = (
f"SELECT session_id, title,"
f" bm25(sessions_fts, 0, 2.0, 3.0, 1.0, 1.0) AS score,"
f" {snippet_expr} AS excerpt"
f" FROM sessions_fts WHERE sessions_fts MATCH ?"
)
params: list = [fts_query]
if session_id_filter:
sql += " AND session_id = ?"
params.append(session_id_filter)
sql += " ORDER BY score LIMIT ?"
params.append(limit)
try:
rows = db.execute(sql, params).fetchall()
except Exception as exc:
print(f"{DIM}(sessions_fts unavailable: {exc}){RESET}", file=sys.stderr)
db.close()
return []
results = []
for r in rows:
results.append(
{
"origin": "session",
"session_id": r[0],
"title": r[1],
"score": r[2],
"excerpt": r[3] or "",
}
)
db.close()
return results
def show_session_raw(session_id_prefix: str):
"""Dump raw session content from documents for a given session ID prefix."""
db = get_db()
row = db.execute("SELECT id, summary FROM sessions WHERE id LIKE ?", (f"{session_id_prefix}%",)).fetchone()
if not row:
print(f"No session found matching: {session_id_prefix}")
db.close()
return
full_id = row[0]
summary = row[1] or "(no summary)"
print(f"\n{BOLD}Session raw content: {full_id[:8]}...{RESET}")
print(f"{DIM}Summary: {summary[:100]}{RESET}\n")
docs = db.execute(
"""SELECT d.doc_type, d.title, s.section_name, s.content
FROM documents d
JOIN sections s ON s.document_id = d.id
WHERE d.session_id = ?
ORDER BY d.doc_type, d.seq, s.id""",
(full_id,),
).fetchall()
if not docs:
print(f"No documents indexed for session {full_id[:8]}...")
else:
for doc in docs:
print(f"--- {doc[0]}: {doc[1]} [{doc[2]}] ---")
print(doc[3][:2000])
if len(doc[3]) > 2000:
print(f"{DIM}... ({len(doc[3])} chars total){RESET}")
print()
db.close()
def search(
query: str,
doc_type: str = None,
limit: int = 10,
verbose: bool = False,
source_filter: str = None,
session_id_filter: str = None,
retrieval_query: str = None,
):
"""Full-text search across all indexed content."""
db = get_db()
query_for_retrieval = retrieval_query if retrieval_query is not None else query
# Build FTS5 query - sanitize and wrap terms for prefix matching
fts_query = _sanitize_fts_query(query_for_retrieval)
sql = """
SELECT
fts.title,
fts.section_name,
fts.doc_type,
fts.session_id,
fts.document_id,
snippet(knowledge_fts, 2, '>>>', '<<<', '...', 64) as excerpt,
d.file_path,
d.size_bytes,
COALESCE(d.source, 'copilot') as doc_source,
rank
FROM knowledge_fts fts
JOIN documents d ON fts.document_id = d.id
WHERE knowledge_fts MATCH ?
"""
params = [fts_query]
if doc_type:
sql += " AND fts.doc_type = ?"
params.append(doc_type)
if source_filter and source_filter != "all":
sql += " AND COALESCE(d.source, 'copilot') = ?"
params.append(source_filter)
if session_id_filter:
sql += " AND fts.session_id = ?"
params.append(session_id_filter)
sql += f" ORDER BY rank LIMIT {limit}"
try:
results = db.execute(sql, params).fetchall()
except sqlite3.OperationalError as e:
print(f"Search error: {e}")
print("Try simpler search terms or use quotes for exact phrases.")
db.close()
return {"hit_count": 0, "selected_entry_ids": []}
# Fallback: substring LIKE search when FTS returns nothing
if not results:
like_query_text = query.strip() or query_for_retrieval
like_sql = """
SELECT d.title, s.section_name, d.doc_type, d.session_id,
d.id as document_id,
SUBSTR(s.content, MAX(1, INSTR(LOWER(s.content), LOWER(?)) - 40), 128) as excerpt,
d.file_path, d.size_bytes,
COALESCE(d.source, 'copilot') as doc_source
FROM sections s
JOIN documents d ON s.document_id = d.id
WHERE LOWER(s.content) LIKE ?
"""
like_params = [like_query_text, f"%{like_query_text.lower()}%"]
if doc_type:
like_sql += " AND d.doc_type = ?"
like_params.append(doc_type)
if source_filter and source_filter != "all":
like_sql += " AND COALESCE(d.source, 'copilot') = ?"
like_params.append(source_filter)
like_sql += f" LIMIT {limit}"
try:
results = db.execute(like_sql, like_params).fetchall()
if results:
print(f"{DIM}(FTS returned 0 — showing substring matches){RESET}")
except sqlite3.OperationalError:
pass
if not results:
print(f"No results for: {query}")
print("Tip: Try broader terms or check with --list for available sessions.")
db.close()
return {"hit_count": 0, "selected_entry_ids": []}
print(f"\n{BOLD}Found {len(results)} result(s) for: {query}{RESET}\n")
for i, r in enumerate(results, 1):
sid = r["session_id"][:8]
doc_source = r["doc_source"] if "doc_source" in r.keys() else "copilot"
type_color = {
"checkpoint": CYAN,
"research": GREEN,
"artifact": YELLOW,
"plan": MAGENTA,
"claude-session": CYAN,
}.get(r["doc_type"], "")
source_badge = f" {DIM}[{doc_source}]{RESET}" if doc_source != "copilot" else ""
if verbose:
print(f"{BOLD}{i}. {r['title']}{RESET}{source_badge}")
print(
f" {DIM}Session:{RESET} {sid}... "
f"{type_color}{r['doc_type']}{RESET} "
f"{DIM}Section:{RESET} {r['section_name']} "
f"{DIM}Size:{RESET} {r['size_bytes'] // 1024}KB"
)
excerpt = r["excerpt"]
excerpt = excerpt.replace(">>>", f"{BOLD}{YELLOW}").replace("<<<", f"{RESET}")
wrapped = textwrap.fill(excerpt, width=90, initial_indent=" ", subsequent_indent=" ")
print(wrapped)
print(f" {DIM}Path: {r['file_path']}{RESET}")
print()
else:
# Compact: title + type + short excerpt
excerpt_text = r["excerpt"].replace(">>>", "").replace("<<<", "")
short = excerpt_text[:80].replace("\n", " ").strip()
print(f" {BOLD}{i}.{RESET} {r['title'][:55]} {type_color}{r['doc_type']}{RESET}{source_badge}")
print(f" {DIM}{short}{RESET}")
if not verbose:
print(f"\n{DIM}Use --verbose for full excerpts{RESET}")
db.close()
return {"hit_count": len(results), "selected_entry_ids": []}
def list_sessions(source_filter: str = None):
"""List all indexed sessions."""
db = get_db()
print(f"\n{BOLD}Indexed Sessions{RESET}")
if source_filter and source_filter != "all":
print(f"{DIM}(filtered: source={source_filter}){RESET}")
print()
print(f"{'ID':10s} {'Src':>7s} {'CP':>3s} {'Res':>4s} {'Files':>5s} {'Plan':>4s} Summary")
print(f"{'-' * 10} {'-' * 7} {'-' * 3} {'-' * 4} {'-' * 5} {'-' * 4} {'-' * 50}")
sql = """
SELECT id, total_checkpoints, total_research, total_files, has_plan,
SUBSTR(summary, 1, 80) as summary,
COALESCE(source, 'copilot') as source,
COALESCE(label, '') as label
FROM sessions
"""
params = []
if source_filter and source_filter != "all":
sql += " WHERE COALESCE(source, 'copilot') = ?"
params.append(source_filter)
sql += " ORDER BY indexed_at DESC"
for row in db.execute(sql, params):
sid = row["id"][:8] + ".."
plan = "Yes" if row["has_plan"] else "-"
summary = (row["summary"] or "(no summary)")[:50]
src = row["source"][:7]
label_suffix = f" [{row['label']}]" if row["label"] else ""
print(
f"{sid:10s} {src:>7s} {row['total_checkpoints']:3d} {row['total_research']:4d} "
f"{row['total_files']:5d} {plan:>4s} {summary}{label_suffix}"
)
total = db.execute("SELECT COUNT(*) FROM sessions").fetchone()[0]
docs = db.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
print(f"\n{DIM}Total: {total} sessions, {docs} documents{RESET}")
db.close()
def show_session(session_prefix: str):
"""Show details for a specific session."""
db = get_db()
row = db.execute("SELECT * FROM sessions WHERE id LIKE ?", (f"{session_prefix}%",)).fetchone()
if not row:
print(f"No session found matching: {session_prefix}")
db.close()
return
print(f"\n{BOLD}Session: {row['id']}{RESET}")
print(f"Path: {row['path']}")
print(
f"Checkpoints: {row['total_checkpoints']} Research: {row['total_research']} "
f"Files: {row['total_files']} Plan: {'Yes' if row['has_plan'] else 'No'}"
)
print(f"\n{BOLD}Summary:{RESET}")
print(textwrap.fill(row["summary"] or "(no summary)", width=90, initial_indent=" ", subsequent_indent=" "))
print(f"\n{BOLD}Documents:{RESET}")
for doc in db.execute(
"""
SELECT doc_type, seq, title, size_bytes
FROM documents WHERE session_id = ?
ORDER BY doc_type, seq
""",
(row["id"],),
):
type_color = {"checkpoint": CYAN, "research": GREEN, "artifact": YELLOW, "plan": MAGENTA}.get(
doc["doc_type"], ""
)
seq = f"#{doc['seq']:02d}" if doc["seq"] > 0 else " "
print(f" {type_color}{doc['doc_type']:12s}{RESET} {seq} {doc['title']} ({doc['size_bytes'] // 1024}KB)")
db.close()
def set_session_label(prefix: str, label: str) -> None:
"""Set (or clear) a label on a session matched by ID prefix."""
db = get_db()
row = db.execute("SELECT id FROM sessions WHERE id LIKE ?", (f"{prefix}%",)).fetchone()
if not row:
print(f"No session found matching prefix: {prefix}")
db.close()
return
db.execute("UPDATE sessions SET label = ? WHERE id = ?", (label.strip(), row["id"]))
db.commit()
action = "cleared" if not label.strip() else f"set to {label.strip()!r}"
print(f"Session {row['id'][:12]}.. label {action}")
db.close()
def get_session_label(prefix: str) -> None:
"""Show the label for a session matched by ID prefix."""
db = get_db()
row = db.execute("SELECT id, COALESCE(label,'') as label FROM sessions WHERE id LIKE ?", (f"{prefix}%",)).fetchone()
if not row:
print(f"No session found matching prefix: {prefix}")
db.close()
return
label = row["label"] or "(no label)"